View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.mode.repository.cluster.etcd;
19  
20  import com.google.common.base.Preconditions;
21  import com.google.common.base.Splitter;
22  import com.google.common.util.concurrent.ThreadFactoryBuilder;
23  import io.etcd.jetcd.ByteSequence;
24  import io.etcd.jetcd.Client;
25  import io.etcd.jetcd.KeyValue;
26  import io.etcd.jetcd.Watch;
27  import io.etcd.jetcd.options.DeleteOption;
28  import io.etcd.jetcd.options.GetOption;
29  import io.etcd.jetcd.options.OptionsUtil;
30  import io.etcd.jetcd.options.PutOption;
31  import io.etcd.jetcd.options.WatchOption;
32  import io.etcd.jetcd.support.Observers;
33  import io.etcd.jetcd.support.Util;
34  import io.etcd.jetcd.watch.WatchEvent;
35  import lombok.SneakyThrows;
36  import lombok.extern.slf4j.Slf4j;
37  import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
38  import org.apache.shardingsphere.mode.event.DataChangedEvent;
39  import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
40  import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
41  import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
42  import org.apache.shardingsphere.mode.repository.cluster.etcd.lock.EtcdDistributedLock;
43  import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
44  import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
45  import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
46  import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
47  
48  import java.nio.charset.StandardCharsets;
49  import java.util.List;
50  import java.util.Optional;
51  import java.util.concurrent.CompletableFuture;
52  import java.util.concurrent.ExecutionException;
53  import java.util.concurrent.ExecutorService;
54  import java.util.concurrent.Executors;
55  import java.util.stream.Collectors;
56  
57  /**
58   * Registry repository of ETCD.
59   */
60  @Slf4j
61  public final class EtcdRepository implements ClusterPersistRepository {
62      
63      private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Etcd-EventListener-%d").build());
64      
65      private Client client;
66      
67      private EtcdProperties etcdProps;
68      
69      @Override
70      public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
71          etcdProps = new EtcdProperties(config.getProps());
72          client = Client.builder().endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(config.getServerLists())))
73                  .namespace(ByteSequence.from(config.getNamespace(), StandardCharsets.UTF_8))
74                  .maxInboundMessageSize((int) 32e9)
75                  .build();
76      }
77      
78      @SneakyThrows({InterruptedException.class, ExecutionException.class})
79      @Override
80      public String query(final String key) {
81          List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
82          return keyValues.isEmpty() ? null : keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
83      }
84      
85      @SneakyThrows({InterruptedException.class, ExecutionException.class})
86      @Override
87      public List<String> getChildrenKeys(final String key) {
88          String prefix = key + PATH_SEPARATOR;
89          ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8);
90          GetOption getOption = GetOption.newBuilder().isPrefix(true).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).build();
91          List<KeyValue> keyValues = client.getKVClient().get(prefixByteSequence, getOption).get().getKvs();
92          return keyValues.stream().map(each -> getSubNodeKeyName(prefix, each.getKey().toString(StandardCharsets.UTF_8))).distinct().collect(Collectors.toList());
93      }
94      
95      @Override
96      public boolean isExisted(final String key) {
97          return false;
98      }
99      
100     private String getSubNodeKeyName(final String prefix, final String fullPath) {
101         String pathWithoutPrefix = fullPath.substring(prefix.length());
102         return pathWithoutPrefix.contains(PATH_SEPARATOR) ? pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(PATH_SEPARATOR)) : pathWithoutPrefix;
103     }
104     
105     @SneakyThrows({InterruptedException.class, ExecutionException.class})
106     @Override
107     public void persist(final String key, final String value) {
108         buildParentPath(key);
109         client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8), ByteSequence.from(value, StandardCharsets.UTF_8)).get();
110     }
111     
112     @Override
113     public void update(final String key, final String value) {
114         // TODO
115     }
116     
117     @SneakyThrows({InterruptedException.class, ExecutionException.class})
118     @Override
119     public void persistEphemeral(final String key, final String value) {
120         buildParentPath(key);
121         long leaseId = client.getLeaseClient().grant(etcdProps.getValue(EtcdPropertyKey.TIME_TO_LIVE_SECONDS)).get().getID();
122         client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
123         }));
124         client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8), ByteSequence.from(value, StandardCharsets.UTF_8), PutOption.newBuilder().withLeaseId(leaseId).build()).get();
125     }
126     
127     @Override
128     public boolean persistExclusiveEphemeral(final String key, final String value) {
129         persistEphemeral(key, value);
130         return true;
131     }
132     
133     @Override
134     public Optional<DistributedLock> getDistributedLock(final String lockKey) {
135         return Optional.of(new EtcdDistributedLock(lockKey, client, etcdProps));
136     }
137     
138     private void buildParentPath(final String key) throws ExecutionException, InterruptedException {
139         StringBuilder parentPath = new StringBuilder();
140         String[] partPath = key.split(PATH_SEPARATOR);
141         for (int index = 1; index < partPath.length - 1; index++) {
142             parentPath.append(PATH_SEPARATOR);
143             parentPath.append(partPath[index]);
144             String path = parentPath.toString();
145             List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(path, StandardCharsets.UTF_8)).get().getKvs();
146             if (keyValues.isEmpty()) {
147                 client.getKVClient().put(ByteSequence.from(path, StandardCharsets.UTF_8), ByteSequence.from("", StandardCharsets.UTF_8)).get();
148             }
149         }
150     }
151     
152     @Override
153     public void delete(final String key) {
154         client.getKVClient().delete(ByteSequence.from(key, StandardCharsets.UTF_8), DeleteOption.newBuilder().isPrefix(true).build());
155     }
156     
157     @Override
158     public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
159         Watch.Listener listener = Watch.listener(response -> {
160             for (WatchEvent each : response.getEvents()) {
161                 Type type = getEventChangedType(each);
162                 if (Type.IGNORED != type) {
163                     dispatchEvent(dataChangedEventListener, each, type);
164                 }
165             }
166         });
167         ByteSequence prefix = ByteSequence.from(key, StandardCharsets.UTF_8);
168         Preconditions.checkNotNull(prefix, "prefix should not be null");
169         client.getWatchClient().watch(prefix,
170                 WatchOption.newBuilder().withRange(OptionsUtil.prefixEndOf(prefix)).build(), listener);
171     }
172     
173     @Override
174     public void removeDataListener(final String key) {
175         // TODO
176     }
177     
178     private Type getEventChangedType(final WatchEvent event) {
179         if (1 == event.getKeyValue().getVersion()) {
180             return Type.ADDED;
181         }
182         switch (event.getEventType()) {
183             case PUT:
184                 return Type.UPDATED;
185             case DELETE:
186                 return Type.DELETED;
187             default:
188                 return Type.IGNORED;
189         }
190     }
191     
192     private void dispatchEvent(final DataChangedEventListener dataChangedEventListener, final WatchEvent event, final Type type) {
193         CompletableFuture.runAsync(() -> dataChangedEventListener.onChange(new DataChangedEvent(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8),
194                 event.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> {
195                     if (null != throwable) {
196                         log.error("Dispatch event failed", throwable);
197                     }
198                 });
199     }
200     
201     @Override
202     public void close() {
203         client.close();
204         EVENT_LISTENER_EXECUTOR.shutdown();
205     }
206     
207     @Override
208     public String getType() {
209         return "etcd";
210     }
211 }