1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.mode.repository.cluster.zookeeper;
19
20 import com.google.common.base.Strings;
21 import lombok.Getter;
22 import org.apache.curator.framework.CuratorFramework;
23 import org.apache.curator.framework.CuratorFrameworkFactory;
24 import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
25 import org.apache.curator.framework.api.ACLProvider;
26 import org.apache.curator.framework.recipes.cache.CuratorCache;
27 import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
28 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
29 import org.apache.curator.retry.ExponentialBackoffRetry;
30 import org.apache.curator.utils.CloseableUtils;
31 import org.apache.shardingsphere.infra.instance.InstanceContext;
32 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
33 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
34 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
35 import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
36 import org.apache.shardingsphere.mode.event.DataChangedEvent;
37 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
38 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
39 import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
40 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
41 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
42 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
43 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperPropertyKey;
44 import org.apache.zookeeper.CreateMode;
45 import org.apache.zookeeper.KeeperException.NodeExistsException;
46 import org.apache.zookeeper.KeeperException.OperationTimeoutException;
47 import org.apache.zookeeper.ZooDefs;
48 import org.apache.zookeeper.data.ACL;
49
50 import java.nio.charset.StandardCharsets;
51 import java.util.Collections;
52 import java.util.Comparator;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.concurrent.ConcurrentHashMap;
56 import java.util.concurrent.TimeUnit;
57
58
59
60
61 public final class ZookeeperRepository implements ClusterPersistRepository, InstanceContextAware {
62
63 private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
64
65 private final Builder builder = CuratorFrameworkFactory.builder();
66
67 private CuratorFramework client;
68
69 @Getter
70 private DistributedLockHolder distributedLockHolder;
71
72 @Override
73 public void init(final ClusterPersistRepositoryConfiguration config) {
74 ZookeeperProperties zookeeperProps = new ZookeeperProperties(config.getProps());
75 client = buildCuratorClient(config, zookeeperProps);
76 distributedLockHolder = new DistributedLockHolder(getType(), client, zookeeperProps);
77 initCuratorClient(zookeeperProps);
78 }
79
80 private CuratorFramework buildCuratorClient(final ClusterPersistRepositoryConfiguration config, final ZookeeperProperties zookeeperProps) {
81 int retryIntervalMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS);
82 int maxRetries = zookeeperProps.getValue(ZookeeperPropertyKey.MAX_RETRIES);
83 int timeToLiveSeconds = zookeeperProps.getValue(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS);
84 int operationTimeoutMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS);
85 builder.connectString(config.getServerLists())
86 .ensembleTracker(false)
87 .retryPolicy(new ExponentialBackoffRetry(retryIntervalMilliseconds, maxRetries, retryIntervalMilliseconds * maxRetries))
88 .namespace(config.getNamespace());
89 if (0 != timeToLiveSeconds) {
90 builder.sessionTimeoutMs(timeToLiveSeconds * 1000);
91 }
92 if (0 != operationTimeoutMilliseconds) {
93 builder.connectionTimeoutMs(operationTimeoutMilliseconds);
94 }
95 String digest = zookeeperProps.getValue(ZookeeperPropertyKey.DIGEST);
96 if (!Strings.isNullOrEmpty(digest)) {
97 builder.authorization(ZookeeperPropertyKey.DIGEST.getKey(), digest.getBytes(StandardCharsets.UTF_8))
98 .aclProvider(new ACLProvider() {
99
100 @Override
101 public List<ACL> getDefaultAcl() {
102 return ZooDefs.Ids.CREATOR_ALL_ACL;
103 }
104
105 @Override
106 public List<ACL> getAclForPath(final String path) {
107 return ZooDefs.Ids.CREATOR_ALL_ACL;
108 }
109 });
110 }
111 return builder.build();
112 }
113
114 private void initCuratorClient(final ZookeeperProperties zookeeperProps) {
115 client.start();
116 try {
117 int retryIntervalMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS);
118 int maxRetries = zookeeperProps.getValue(ZookeeperPropertyKey.MAX_RETRIES);
119 if (!client.blockUntilConnected(retryIntervalMilliseconds * maxRetries, TimeUnit.MILLISECONDS)) {
120 client.close();
121 throw new OperationTimeoutException();
122 }
123 } catch (final InterruptedException ex) {
124 Thread.currentThread().interrupt();
125 } catch (final OperationTimeoutException ex) {
126 ZookeeperExceptionHandler.handleException(ex);
127 }
128 }
129
130 @Override
131 public List<String> getChildrenKeys(final String key) {
132 try {
133 List<String> result = client.getChildren().forPath(key);
134 result.sort(Comparator.reverseOrder());
135 return result;
136
137 } catch (final Exception ex) {
138
139 ZookeeperExceptionHandler.handleException(ex);
140 return Collections.emptyList();
141 }
142 }
143
144 @Override
145 public void persist(final String key, final String value) {
146 try {
147 if (isExisted(key)) {
148 update(key, value);
149 } else {
150 client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
151 }
152
153 } catch (final Exception ex) {
154
155 ZookeeperExceptionHandler.handleException(ex);
156 }
157 }
158
159 @Override
160 public void update(final String key, final String value) {
161 try {
162 client.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8));
163
164 } catch (final Exception ex) {
165
166 ZookeeperExceptionHandler.handleException(ex);
167 }
168 }
169
170 @Override
171 public String getDirectly(final String key) {
172 try {
173 return new String(client.getData().forPath(key), StandardCharsets.UTF_8);
174
175 } catch (final Exception ex) {
176
177 ZookeeperExceptionHandler.handleException(ex);
178 return null;
179 }
180 }
181
182 @Override
183 public boolean isExisted(final String key) {
184 try {
185 return null != client.checkExists().forPath(key);
186
187 } catch (final Exception ex) {
188
189 ZookeeperExceptionHandler.handleException(ex);
190 return false;
191 }
192 }
193
194 @Override
195 public void persistEphemeral(final String key, final String value) {
196 try {
197 if (isExisted(key)) {
198 client.delete().deletingChildrenIfNeeded().forPath(key);
199 }
200 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
201
202 } catch (final Exception ex) {
203
204 ZookeeperExceptionHandler.handleException(ex);
205 }
206 }
207
208 @Override
209 public void persistExclusiveEphemeral(final String key, final String value) {
210 try {
211 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
212 } catch (final NodeExistsException ex) {
213 throw new ClusterPersistRepositoryException(ex);
214
215 } catch (final Exception ex) {
216 ZookeeperExceptionHandler.handleException(ex);
217
218 }
219 }
220
221 @Override
222 public void delete(final String key) {
223 try {
224 if (isExisted(key)) {
225 client.delete().deletingChildrenIfNeeded().forPath(key);
226 }
227
228 } catch (final Exception ex) {
229
230 ZookeeperExceptionHandler.handleException(ex);
231 }
232 }
233
234 @Override
235 public void watch(final String key, final DataChangedEventListener listener) {
236 CuratorCache cache = caches.get(key);
237 if (null == cache) {
238 cache = CuratorCache.build(client, key);
239 caches.put(key, cache);
240 }
241 CuratorCacheListener curatorCacheListener = CuratorCacheListener.builder()
242 .afterInitialized()
243 .forTreeCache(client, (framework, treeCacheListener) -> {
244 Type changedType = getChangedType(treeCacheListener.getType());
245 if (Type.IGNORED != changedType) {
246 listener.onChange(new DataChangedEvent(treeCacheListener.getData().getPath(),
247 new String(treeCacheListener.getData().getData(), StandardCharsets.UTF_8), changedType));
248 }
249 }).build();
250 cache.listenable().addListener(curatorCacheListener);
251 cache.start();
252 }
253
254 private Type getChangedType(final TreeCacheEvent.Type type) {
255 switch (type) {
256 case NODE_ADDED:
257 return Type.ADDED;
258 case NODE_UPDATED:
259 return Type.UPDATED;
260 case NODE_REMOVED:
261 return Type.DELETED;
262 default:
263 return Type.IGNORED;
264 }
265 }
266
267 @Override
268 public void close() {
269 caches.values().forEach(CuratorCache::close);
270 waitForCacheClose();
271 CloseableUtils.closeQuietly(client);
272 }
273
274
275
276
277
278 private void waitForCacheClose() {
279 try {
280 Thread.sleep(500L);
281 } catch (final InterruptedException ex) {
282 Thread.currentThread().interrupt();
283 }
284 }
285
286 @Override
287 public void setInstanceContext(final InstanceContext instanceContext) {
288 client.getConnectionStateListenable().addListener(new SessionConnectionListener(instanceContext, this));
289 }
290
291 @Override
292 public String getType() {
293 return "ZooKeeper";
294 }
295 }