1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.mode.repository.cluster.zookeeper;
19
20 import com.google.common.base.Strings;
21 import org.apache.curator.framework.CuratorFramework;
22 import org.apache.curator.framework.CuratorFrameworkFactory;
23 import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
24 import org.apache.curator.framework.api.ACLProvider;
25 import org.apache.curator.framework.recipes.cache.CuratorCache;
26 import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
27 import org.apache.curator.retry.ExponentialBackoffRetry;
28 import org.apache.curator.utils.CloseableUtils;
29 import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
30 import org.apache.shardingsphere.mode.event.DataChangedEvent;
31 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
32 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
33 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
34 import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterRepositoryPersistException;
35 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
36 import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
37 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.exception.ZookeeperExceptionHandler;
38 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionReconnectListener;
39 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock.ZookeeperDistributedLock;
40 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
41 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperPropertyKey;
42 import org.apache.zookeeper.CreateMode;
43 import org.apache.zookeeper.KeeperException;
44 import org.apache.zookeeper.KeeperException.NodeExistsException;
45 import org.apache.zookeeper.KeeperException.OperationTimeoutException;
46 import org.apache.zookeeper.ZooDefs;
47 import org.apache.zookeeper.data.ACL;
48
49 import java.nio.charset.StandardCharsets;
50 import java.util.Collections;
51 import java.util.Comparator;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Objects;
55 import java.util.Optional;
56 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.TimeUnit;
58
59
60
61
62 public final class ZookeeperRepository implements ClusterPersistRepository {
63
64 private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
65
66 private final Map<String, CuratorCacheListener> dataListeners = new ConcurrentHashMap<>();
67
68 private final Builder builder = CuratorFrameworkFactory.builder();
69
70 private CuratorFramework client;
71
72 @Override
73 public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
74 ZookeeperProperties zookeeperProps = new ZookeeperProperties(config.getProps());
75 client = buildCuratorClient(config, zookeeperProps);
76 client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this));
77 initCuratorClient(zookeeperProps);
78 }
79
80 private CuratorFramework buildCuratorClient(final ClusterPersistRepositoryConfiguration config, final ZookeeperProperties zookeeperProps) {
81 int retryIntervalMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS);
82 int maxRetries = zookeeperProps.getValue(ZookeeperPropertyKey.MAX_RETRIES);
83 int timeToLiveSeconds = zookeeperProps.getValue(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS);
84 int operationTimeoutMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS);
85 builder.connectString(config.getServerLists())
86 .ensembleTracker(false)
87 .retryPolicy(new ExponentialBackoffRetry(retryIntervalMilliseconds, maxRetries, retryIntervalMilliseconds * maxRetries))
88 .namespace(config.getNamespace());
89 if (0 != timeToLiveSeconds) {
90 builder.sessionTimeoutMs(timeToLiveSeconds * 1000);
91 }
92 if (0 != operationTimeoutMilliseconds) {
93 builder.connectionTimeoutMs(operationTimeoutMilliseconds);
94 }
95 String digest = zookeeperProps.getValue(ZookeeperPropertyKey.DIGEST);
96 if (!Strings.isNullOrEmpty(digest)) {
97 builder.authorization(ZookeeperPropertyKey.DIGEST.getKey(), digest.getBytes(StandardCharsets.UTF_8))
98 .aclProvider(new ACLProvider() {
99
100 @Override
101 public List<ACL> getDefaultAcl() {
102 return ZooDefs.Ids.CREATOR_ALL_ACL;
103 }
104
105 @Override
106 public List<ACL> getAclForPath(final String path) {
107 return ZooDefs.Ids.CREATOR_ALL_ACL;
108 }
109 });
110 }
111 return builder.build();
112 }
113
114 private void initCuratorClient(final ZookeeperProperties zookeeperProps) {
115 client.start();
116 try {
117 int retryIntervalMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS);
118 int maxRetries = zookeeperProps.getValue(ZookeeperPropertyKey.MAX_RETRIES);
119 if (!client.blockUntilConnected(retryIntervalMilliseconds * maxRetries, TimeUnit.MILLISECONDS)) {
120 client.close();
121 throw new OperationTimeoutException();
122 }
123 } catch (final InterruptedException ex) {
124 Thread.currentThread().interrupt();
125 } catch (final OperationTimeoutException ex) {
126 ZookeeperExceptionHandler.handleException(ex);
127 }
128 }
129
130 @Override
131 public List<String> getChildrenKeys(final String key) {
132 try {
133 List<String> result = client.getChildren().forPath(key);
134 result.sort(Comparator.reverseOrder());
135 return result;
136
137 } catch (final Exception ex) {
138
139 ZookeeperExceptionHandler.handleException(ex);
140 return Collections.emptyList();
141 }
142 }
143
144 @Override
145 public void persist(final String key, final String value) {
146 try {
147 if (isExisted(key)) {
148 update(key, value);
149 } else {
150 client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
151 }
152
153 } catch (final Exception ex) {
154
155 ZookeeperExceptionHandler.handleException(ex);
156 }
157 }
158
159 @Override
160 public void update(final String key, final String value) {
161 try {
162 client.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8));
163
164 } catch (final Exception ex) {
165
166 ZookeeperExceptionHandler.handleException(ex);
167 }
168 }
169
170 @Override
171 public String query(final String key) {
172 try {
173 return new String(client.getData().forPath(key), StandardCharsets.UTF_8);
174 } catch (final KeeperException.NoNodeException ex) {
175 return null;
176
177 } catch (final Exception ex) {
178
179 throw new ClusterRepositoryPersistException(ex);
180 }
181 }
182
183 @Override
184 public boolean isExisted(final String key) {
185 try {
186 return null != client.checkExists().forPath(key);
187
188 } catch (final Exception ex) {
189
190 ZookeeperExceptionHandler.handleException(ex);
191 return false;
192 }
193 }
194
195 @Override
196 public void persistEphemeral(final String key, final String value) {
197 try {
198 if (isExisted(key)) {
199 client.delete().deletingChildrenIfNeeded().forPath(key);
200 }
201 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
202
203 } catch (final Exception ex) {
204
205 ZookeeperExceptionHandler.handleException(ex);
206 }
207 }
208
209 @Override
210 public boolean persistExclusiveEphemeral(final String key, final String value) {
211 try {
212 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
213 } catch (final NodeExistsException ex) {
214 return false;
215
216 } catch (final Exception ex) {
217 ZookeeperExceptionHandler.handleException(ex);
218
219 }
220 return true;
221 }
222
223 @Override
224 public Optional<DistributedLock> getDistributedLock(final String lockKey) {
225 return Optional.of(new ZookeeperDistributedLock(lockKey, client));
226 }
227
228 @Override
229 public void delete(final String key) {
230 try {
231 if (isExisted(key)) {
232 client.delete().deletingChildrenIfNeeded().forPath(key);
233 }
234
235 } catch (final Exception ex) {
236
237 ZookeeperExceptionHandler.handleException(ex);
238 }
239 }
240
241 @Override
242 public void watch(final String key, final DataChangedEventListener listener) {
243 if (null != dataListeners.get(key)) {
244 return;
245 }
246 CuratorCache cache = caches.get(key);
247 if (null == cache) {
248 cache = CuratorCache.build(client, key);
249 caches.put(key, cache);
250 }
251 CuratorCacheListener curatorCacheListener = CuratorCacheListener.builder()
252 .forCreates(childData -> listener.onChange(new DataChangedEvent(childData.getPath(), new String(childData.getData(), StandardCharsets.UTF_8), Type.ADDED)))
253 .forChanges((oldData, newData) -> {
254 if (!Objects.equals(oldData, newData)) {
255 listener.onChange(new DataChangedEvent(newData.getPath(), new String(newData.getData(), StandardCharsets.UTF_8), Type.UPDATED));
256 }
257 })
258 .forDeletes(oldData -> listener.onChange(new DataChangedEvent(oldData.getPath(), new String(oldData.getData(), StandardCharsets.UTF_8), Type.DELETED)))
259 .afterInitialized()
260 .build();
261 cache.listenable().addListener(curatorCacheListener);
262 cache.start();
263 dataListeners.computeIfAbsent(key, curator -> curatorCacheListener);
264 }
265
266 @Override
267 public void removeDataListener(final String key) {
268 CuratorCacheListener cacheListener = dataListeners.remove(key);
269 if (null == cacheListener) {
270 return;
271 }
272 Optional.ofNullable(caches.remove(key)).ifPresent(optional -> optional.listenable().removeListener(cacheListener));
273 }
274
275 @Override
276 public void close() {
277 caches.values().forEach(CuratorCache::close);
278 waitForCacheClose();
279 CloseableUtils.closeQuietly(client);
280 }
281
282
283
284
285
286 private void waitForCacheClose() {
287 try {
288 Thread.sleep(500L);
289 } catch (final InterruptedException ex) {
290 Thread.currentThread().interrupt();
291 }
292 }
293
294 @Override
295 public String getType() {
296 return "ZooKeeper";
297 }
298 }