1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.mode.repository.cluster.etcd;
19
20 import com.google.common.base.Preconditions;
21 import com.google.common.base.Splitter;
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
23 import io.etcd.jetcd.ByteSequence;
24 import io.etcd.jetcd.Client;
25 import io.etcd.jetcd.KeyValue;
26 import io.etcd.jetcd.Watch;
27 import io.etcd.jetcd.options.DeleteOption;
28 import io.etcd.jetcd.options.GetOption;
29 import io.etcd.jetcd.options.OptionsUtil;
30 import io.etcd.jetcd.options.PutOption;
31 import io.etcd.jetcd.options.WatchOption;
32 import io.etcd.jetcd.support.Observers;
33 import io.etcd.jetcd.support.Util;
34 import io.etcd.jetcd.watch.WatchEvent;
35 import lombok.SneakyThrows;
36 import lombok.extern.slf4j.Slf4j;
37 import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
38 import org.apache.shardingsphere.mode.event.DataChangedEvent;
39 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
40 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
41 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
42 import org.apache.shardingsphere.mode.repository.cluster.etcd.lock.EtcdDistributedLock;
43 import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
44 import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
45 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
46 import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
47
48 import java.nio.charset.StandardCharsets;
49 import java.util.List;
50 import java.util.Optional;
51 import java.util.concurrent.CompletableFuture;
52 import java.util.concurrent.ExecutionException;
53 import java.util.concurrent.ExecutorService;
54 import java.util.concurrent.Executors;
55 import java.util.stream.Collectors;
56
57
58
59
60 @Slf4j
61 public final class EtcdRepository implements ClusterPersistRepository {
62
63 private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Etcd-EventListener-%d").build());
64
65 private Client client;
66
67 private EtcdProperties etcdProps;
68
69 @Override
70 public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
71 etcdProps = new EtcdProperties(config.getProps());
72 client = Client.builder().endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(config.getServerLists())))
73 .namespace(ByteSequence.from(config.getNamespace(), StandardCharsets.UTF_8))
74 .maxInboundMessageSize((int) 32e9)
75 .build();
76 }
77
78 @SneakyThrows({InterruptedException.class, ExecutionException.class})
79 @Override
80 public String query(final String key) {
81 List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
82 return keyValues.isEmpty() ? null : keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
83 }
84
85 @SneakyThrows({InterruptedException.class, ExecutionException.class})
86 @Override
87 public List<String> getChildrenKeys(final String key) {
88 String prefix = key + PATH_SEPARATOR;
89 ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8);
90 GetOption getOption = GetOption.newBuilder().isPrefix(true).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).build();
91 List<KeyValue> keyValues = client.getKVClient().get(prefixByteSequence, getOption).get().getKvs();
92 return keyValues.stream().map(each -> getSubNodeKeyName(prefix, each.getKey().toString(StandardCharsets.UTF_8))).distinct().collect(Collectors.toList());
93 }
94
95 @Override
96 public boolean isExisted(final String key) {
97 return false;
98 }
99
100 private String getSubNodeKeyName(final String prefix, final String fullPath) {
101 String pathWithoutPrefix = fullPath.substring(prefix.length());
102 return pathWithoutPrefix.contains(PATH_SEPARATOR) ? pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(PATH_SEPARATOR)) : pathWithoutPrefix;
103 }
104
105 @SneakyThrows({InterruptedException.class, ExecutionException.class})
106 @Override
107 public void persist(final String key, final String value) {
108 buildParentPath(key);
109 client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8), ByteSequence.from(value, StandardCharsets.UTF_8)).get();
110 }
111
112 @Override
113 public void update(final String key, final String value) {
114
115 }
116
117 @SneakyThrows({InterruptedException.class, ExecutionException.class})
118 @Override
119 public void persistEphemeral(final String key, final String value) {
120 buildParentPath(key);
121 long leaseId = client.getLeaseClient().grant(etcdProps.getValue(EtcdPropertyKey.TIME_TO_LIVE_SECONDS)).get().getID();
122 client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
123 }));
124 client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8), ByteSequence.from(value, StandardCharsets.UTF_8), PutOption.newBuilder().withLeaseId(leaseId).build()).get();
125 }
126
127 @Override
128 public boolean persistExclusiveEphemeral(final String key, final String value) {
129 persistEphemeral(key, value);
130 return true;
131 }
132
133 @Override
134 public Optional<DistributedLock> getDistributedLock(final String lockKey) {
135 return Optional.of(new EtcdDistributedLock(lockKey, client, etcdProps));
136 }
137
138 private void buildParentPath(final String key) throws ExecutionException, InterruptedException {
139 StringBuilder parentPath = new StringBuilder();
140 String[] partPath = key.split(PATH_SEPARATOR);
141 for (int index = 1; index < partPath.length - 1; index++) {
142 parentPath.append(PATH_SEPARATOR);
143 parentPath.append(partPath[index]);
144 String path = parentPath.toString();
145 List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(path, StandardCharsets.UTF_8)).get().getKvs();
146 if (keyValues.isEmpty()) {
147 client.getKVClient().put(ByteSequence.from(path, StandardCharsets.UTF_8), ByteSequence.from("", StandardCharsets.UTF_8)).get();
148 }
149 }
150 }
151
152 @Override
153 public void delete(final String key) {
154 client.getKVClient().delete(ByteSequence.from(key, StandardCharsets.UTF_8), DeleteOption.newBuilder().isPrefix(true).build());
155 }
156
157 @Override
158 public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
159 Watch.Listener listener = Watch.listener(response -> {
160 for (WatchEvent each : response.getEvents()) {
161 Type type = getEventChangedType(each);
162 if (Type.IGNORED != type) {
163 dispatchEvent(dataChangedEventListener, each, type);
164 }
165 }
166 });
167 ByteSequence prefix = ByteSequence.from(key, StandardCharsets.UTF_8);
168 Preconditions.checkNotNull(prefix, "prefix should not be null");
169 client.getWatchClient().watch(prefix,
170 WatchOption.newBuilder().withRange(OptionsUtil.prefixEndOf(prefix)).build(), listener);
171 }
172
173 @Override
174 public void removeDataListener(final String key) {
175
176 }
177
178 private Type getEventChangedType(final WatchEvent event) {
179 if (1 == event.getKeyValue().getVersion()) {
180 return Type.ADDED;
181 }
182 switch (event.getEventType()) {
183 case PUT:
184 return Type.UPDATED;
185 case DELETE:
186 return Type.DELETED;
187 default:
188 return Type.IGNORED;
189 }
190 }
191
192 private void dispatchEvent(final DataChangedEventListener dataChangedEventListener, final WatchEvent event, final Type type) {
193 CompletableFuture.runAsync(() -> dataChangedEventListener.onChange(new DataChangedEvent(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8),
194 event.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> {
195 if (null != throwable) {
196 log.error("Dispatch event failed", throwable);
197 }
198 });
199 }
200
201 @Override
202 public void close() {
203 client.close();
204 EVENT_LISTENER_EXECUTOR.shutdown();
205 }
206
207 @Override
208 public String getType() {
209 return "etcd";
210 }
211 }