1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.mode.repository.cluster.etcd;
19
20 import com.google.common.base.Preconditions;
21 import com.google.common.base.Splitter;
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
23 import io.etcd.jetcd.ByteSequence;
24 import io.etcd.jetcd.Client;
25 import io.etcd.jetcd.KeyValue;
26 import io.etcd.jetcd.Watch;
27 import io.etcd.jetcd.options.DeleteOption;
28 import io.etcd.jetcd.options.GetOption;
29 import io.etcd.jetcd.options.OptionsUtil;
30 import io.etcd.jetcd.options.PutOption;
31 import io.etcd.jetcd.options.WatchOption;
32 import io.etcd.jetcd.support.Observers;
33 import io.etcd.jetcd.support.Util;
34 import io.etcd.jetcd.watch.WatchEvent;
35 import lombok.Getter;
36 import lombok.SneakyThrows;
37 import lombok.extern.slf4j.Slf4j;
38 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
39 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
40 import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
41 import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
42 import org.apache.shardingsphere.mode.event.DataChangedEvent;
43 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
44 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
45 import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
46
47 import java.nio.charset.StandardCharsets;
48 import java.util.List;
49 import java.util.concurrent.CompletableFuture;
50 import java.util.concurrent.ExecutionException;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors;
53 import java.util.stream.Collectors;
54
55
56
57
58 @Slf4j
59 public final class EtcdRepository implements ClusterPersistRepository {
60
61 private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Etcd-EventListener-%d").build());
62
63 private Client client;
64
65 private EtcdProperties etcdProps;
66
67 @Getter
68 private DistributedLockHolder distributedLockHolder;
69
70 @Override
71 public void init(final ClusterPersistRepositoryConfiguration config) {
72 etcdProps = new EtcdProperties(config.getProps());
73 client = Client.builder().endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(config.getServerLists())))
74 .namespace(ByteSequence.from(config.getNamespace(), StandardCharsets.UTF_8))
75 .maxInboundMessageSize((int) 32e9)
76 .build();
77 distributedLockHolder = new DistributedLockHolder(getType(), client, etcdProps);
78 }
79
80 @SneakyThrows({InterruptedException.class, ExecutionException.class})
81 @Override
82 public String getDirectly(final String key) {
83 List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
84 return keyValues.isEmpty() ? null : keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
85 }
86
87 @SneakyThrows({InterruptedException.class, ExecutionException.class})
88 @Override
89 public List<String> getChildrenKeys(final String key) {
90 String prefix = key + PATH_SEPARATOR;
91 ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8);
92 GetOption getOption = GetOption.newBuilder().isPrefix(true).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).build();
93 List<KeyValue> keyValues = client.getKVClient().get(prefixByteSequence, getOption).get().getKvs();
94 return keyValues.stream().map(each -> getSubNodeKeyName(prefix, each.getKey().toString(StandardCharsets.UTF_8))).distinct().collect(Collectors.toList());
95 }
96
97 @Override
98 public boolean isExisted(final String key) {
99 return false;
100 }
101
102 private String getSubNodeKeyName(final String prefix, final String fullPath) {
103 String pathWithoutPrefix = fullPath.substring(prefix.length());
104 return pathWithoutPrefix.contains(PATH_SEPARATOR) ? pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(PATH_SEPARATOR)) : pathWithoutPrefix;
105 }
106
107 @SneakyThrows({InterruptedException.class, ExecutionException.class})
108 @Override
109 public void persist(final String key, final String value) {
110 buildParentPath(key);
111 client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8), ByteSequence.from(value, StandardCharsets.UTF_8)).get();
112 }
113
114 @Override
115 public void update(final String key, final String value) {
116
117 }
118
119 @SneakyThrows({InterruptedException.class, ExecutionException.class})
120 @Override
121 public void persistEphemeral(final String key, final String value) {
122 buildParentPath(key);
123 long leaseId = client.getLeaseClient().grant(etcdProps.getValue(EtcdPropertyKey.TIME_TO_LIVE_SECONDS)).get().getID();
124 client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
125 }));
126 client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8), ByteSequence.from(value, StandardCharsets.UTF_8), PutOption.newBuilder().withLeaseId(leaseId).build()).get();
127 }
128
129 @Override
130 public void persistExclusiveEphemeral(final String key, final String value) {
131 persistEphemeral(key, value);
132 }
133
134 private void buildParentPath(final String key) throws ExecutionException, InterruptedException {
135 StringBuilder parentPath = new StringBuilder();
136 String[] partPath = key.split(PATH_SEPARATOR);
137 for (int index = 1; index < partPath.length - 1; index++) {
138 parentPath.append(PATH_SEPARATOR);
139 parentPath.append(partPath[index]);
140 String path = parentPath.toString();
141 List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(path, StandardCharsets.UTF_8)).get().getKvs();
142 if (keyValues.isEmpty()) {
143 client.getKVClient().put(ByteSequence.from(path, StandardCharsets.UTF_8), ByteSequence.from("", StandardCharsets.UTF_8)).get();
144 }
145 }
146 }
147
148 @Override
149 public void delete(final String key) {
150 client.getKVClient().delete(ByteSequence.from(key, StandardCharsets.UTF_8), DeleteOption.newBuilder().isPrefix(true).build());
151 }
152
153 @Override
154 public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
155 Watch.Listener listener = Watch.listener(response -> {
156 for (WatchEvent each : response.getEvents()) {
157 Type type = getEventChangedType(each);
158 if (Type.IGNORED != type) {
159 dispatchEvent(dataChangedEventListener, each, type);
160 }
161 }
162 });
163 ByteSequence prefix = ByteSequence.from(key, StandardCharsets.UTF_8);
164 Preconditions.checkNotNull(prefix, "prefix should not be null");
165 client.getWatchClient().watch(prefix,
166 WatchOption.newBuilder().withRange(OptionsUtil.prefixEndOf(prefix)).build(), listener);
167 }
168
169 private Type getEventChangedType(final WatchEvent event) {
170 if (1 == event.getKeyValue().getVersion()) {
171 return Type.ADDED;
172 }
173 switch (event.getEventType()) {
174 case PUT:
175 return Type.UPDATED;
176 case DELETE:
177 return Type.DELETED;
178 default:
179 return Type.IGNORED;
180 }
181 }
182
183 private void dispatchEvent(final DataChangedEventListener dataChangedEventListener, final WatchEvent event, final Type type) {
184 CompletableFuture.runAsync(() -> dataChangedEventListener.onChange(new DataChangedEvent(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8),
185 event.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> {
186 if (null != throwable) {
187 log.error("Dispatch event failed", throwable);
188 }
189 });
190 }
191
192 @Override
193 public void close() {
194 client.close();
195 EVENT_LISTENER_EXECUTOR.shutdown();
196 }
197
198 @Override
199 public String getType() {
200 return "etcd";
201 }
202 }