View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.mode.repository.cluster.zookeeper;
19  
20  import com.google.common.base.Strings;
21  import org.apache.curator.framework.CuratorFramework;
22  import org.apache.curator.framework.CuratorFrameworkFactory;
23  import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
24  import org.apache.curator.framework.api.ACLProvider;
25  import org.apache.curator.framework.recipes.cache.CuratorCache;
26  import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
27  import org.apache.curator.retry.ExponentialBackoffRetry;
28  import org.apache.curator.utils.CloseableUtils;
29  import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
30  import org.apache.shardingsphere.mode.event.DataChangedEvent;
31  import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
32  import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
33  import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
34  import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterRepositoryPersistException;
35  import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
36  import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
37  import org.apache.shardingsphere.mode.repository.cluster.zookeeper.exception.ZookeeperExceptionHandler;
38  import org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionReconnectListener;
39  import org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock.ZookeeperDistributedLock;
40  import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
41  import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperPropertyKey;
42  import org.apache.zookeeper.CreateMode;
43  import org.apache.zookeeper.KeeperException;
44  import org.apache.zookeeper.KeeperException.NodeExistsException;
45  import org.apache.zookeeper.KeeperException.OperationTimeoutException;
46  import org.apache.zookeeper.ZooDefs;
47  import org.apache.zookeeper.data.ACL;
48  
49  import java.nio.charset.StandardCharsets;
50  import java.util.Collections;
51  import java.util.Comparator;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Objects;
55  import java.util.Optional;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.TimeUnit;
58  
59  /**
60   * Registry repository of ZooKeeper.
61   */
62  public final class ZookeeperRepository implements ClusterPersistRepository {
63      
64      private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
65      
66      private final Map<String, CuratorCacheListener> dataListeners = new ConcurrentHashMap<>();
67      
68      private final Builder builder = CuratorFrameworkFactory.builder();
69      
70      private CuratorFramework client;
71      
72      @Override
73      public void init(final ClusterPersistRepositoryConfiguration config, final ComputeNodeInstanceContext computeNodeInstanceContext) {
74          ZookeeperProperties zookeeperProps = new ZookeeperProperties(config.getProps());
75          client = buildCuratorClient(config, zookeeperProps);
76          client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this));
77          initCuratorClient(zookeeperProps);
78      }
79      
80      private CuratorFramework buildCuratorClient(final ClusterPersistRepositoryConfiguration config, final ZookeeperProperties zookeeperProps) {
81          int retryIntervalMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS);
82          int maxRetries = zookeeperProps.getValue(ZookeeperPropertyKey.MAX_RETRIES);
83          int timeToLiveSeconds = zookeeperProps.getValue(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS);
84          int operationTimeoutMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS);
85          builder.connectString(config.getServerLists())
86                  .ensembleTracker(false)
87                  .retryPolicy(new ExponentialBackoffRetry(retryIntervalMilliseconds, maxRetries, retryIntervalMilliseconds * maxRetries))
88                  .namespace(config.getNamespace());
89          if (0 != timeToLiveSeconds) {
90              builder.sessionTimeoutMs(timeToLiveSeconds * 1000);
91          }
92          if (0 != operationTimeoutMilliseconds) {
93              builder.connectionTimeoutMs(operationTimeoutMilliseconds);
94          }
95          String digest = zookeeperProps.getValue(ZookeeperPropertyKey.DIGEST);
96          if (!Strings.isNullOrEmpty(digest)) {
97              builder.authorization(ZookeeperPropertyKey.DIGEST.getKey(), digest.getBytes(StandardCharsets.UTF_8))
98                      .aclProvider(new ACLProvider() {
99                          
100                         @Override
101                         public List<ACL> getDefaultAcl() {
102                             return ZooDefs.Ids.CREATOR_ALL_ACL;
103                         }
104                         
105                         @Override
106                         public List<ACL> getAclForPath(final String path) {
107                             return ZooDefs.Ids.CREATOR_ALL_ACL;
108                         }
109                     });
110         }
111         return builder.build();
112     }
113     
114     private void initCuratorClient(final ZookeeperProperties zookeeperProps) {
115         client.start();
116         try {
117             int retryIntervalMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS);
118             int maxRetries = zookeeperProps.getValue(ZookeeperPropertyKey.MAX_RETRIES);
119             if (!client.blockUntilConnected(retryIntervalMilliseconds * maxRetries, TimeUnit.MILLISECONDS)) {
120                 client.close();
121                 throw new OperationTimeoutException();
122             }
123         } catch (final InterruptedException ex) {
124             Thread.currentThread().interrupt();
125         } catch (final OperationTimeoutException ex) {
126             ZookeeperExceptionHandler.handleException(ex);
127         }
128     }
129     
130     @Override
131     public List<String> getChildrenKeys(final String key) {
132         try {
133             List<String> result = client.getChildren().forPath(key);
134             result.sort(Comparator.reverseOrder());
135             return result;
136             // CHECKSTYLE:OFF
137         } catch (final Exception ex) {
138             // CHECKSTYLE:ON
139             ZookeeperExceptionHandler.handleException(ex);
140             return Collections.emptyList();
141         }
142     }
143     
144     @Override
145     public void persist(final String key, final String value) {
146         try {
147             if (isExisted(key)) {
148                 update(key, value);
149             } else {
150                 client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
151             }
152             // CHECKSTYLE:OFF
153         } catch (final Exception ex) {
154             // CHECKSTYLE:ON
155             ZookeeperExceptionHandler.handleException(ex);
156         }
157     }
158     
159     @Override
160     public void update(final String key, final String value) {
161         try {
162             client.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8));
163             // CHECKSTYLE:OFF
164         } catch (final Exception ex) {
165             // CHECKSTYLE:ON
166             ZookeeperExceptionHandler.handleException(ex);
167         }
168     }
169     
170     @Override
171     public String query(final String key) {
172         try {
173             return new String(client.getData().forPath(key), StandardCharsets.UTF_8);
174         } catch (final KeeperException.NoNodeException ex) {
175             return null;
176             // CHECKSTYLE:OFF
177         } catch (final Exception ex) {
178             // CHECKSTYLE:ON
179             throw new ClusterRepositoryPersistException(ex);
180         }
181     }
182     
183     @Override
184     public boolean isExisted(final String key) {
185         try {
186             return null != client.checkExists().forPath(key);
187             // CHECKSTYLE:OFF
188         } catch (final Exception ex) {
189             // CHECKSTYLE:ON
190             ZookeeperExceptionHandler.handleException(ex);
191             return false;
192         }
193     }
194     
195     @Override
196     public void persistEphemeral(final String key, final String value) {
197         try {
198             if (isExisted(key)) {
199                 client.delete().deletingChildrenIfNeeded().forPath(key);
200             }
201             client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
202             // CHECKSTYLE:OFF
203         } catch (final Exception ex) {
204             // CHECKSTYLE:ON
205             ZookeeperExceptionHandler.handleException(ex);
206         }
207     }
208     
209     @Override
210     public boolean persistExclusiveEphemeral(final String key, final String value) {
211         try {
212             client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
213         } catch (final NodeExistsException ex) {
214             return false;
215             // CHECKSTYLE:OFF
216         } catch (final Exception ex) {
217             ZookeeperExceptionHandler.handleException(ex);
218             // CHECKSTYLE:ON
219         }
220         return true;
221     }
222     
223     @Override
224     public Optional<DistributedLock> getDistributedLock(final String lockKey) {
225         return Optional.of(new ZookeeperDistributedLock(lockKey, client));
226     }
227     
228     @Override
229     public void delete(final String key) {
230         try {
231             if (isExisted(key)) {
232                 client.delete().deletingChildrenIfNeeded().forPath(key);
233             }
234             // CHECKSTYLE:OFF
235         } catch (final Exception ex) {
236             // CHECKSTYLE:ON
237             ZookeeperExceptionHandler.handleException(ex);
238         }
239     }
240     
241     @Override
242     public void watch(final String key, final DataChangedEventListener listener) {
243         if (null != dataListeners.get(key)) {
244             return;
245         }
246         CuratorCache cache = caches.get(key);
247         if (null == cache) {
248             cache = CuratorCache.build(client, key);
249             caches.put(key, cache);
250         }
251         CuratorCacheListener curatorCacheListener = CuratorCacheListener.builder()
252                 .forCreates(childData -> listener.onChange(new DataChangedEvent(childData.getPath(), new String(childData.getData(), StandardCharsets.UTF_8), Type.ADDED)))
253                 .forChanges((oldData, newData) -> {
254                     if (!Objects.equals(oldData, newData)) {
255                         listener.onChange(new DataChangedEvent(newData.getPath(), new String(newData.getData(), StandardCharsets.UTF_8), Type.UPDATED));
256                     }
257                 })
258                 .forDeletes(oldData -> listener.onChange(new DataChangedEvent(oldData.getPath(), new String(oldData.getData(), StandardCharsets.UTF_8), Type.DELETED)))
259                 .afterInitialized()
260                 .build();
261         cache.listenable().addListener(curatorCacheListener);
262         cache.start();
263         dataListeners.computeIfAbsent(key, curator -> curatorCacheListener);
264     }
265     
266     @Override
267     public void removeDataListener(final String key) {
268         CuratorCacheListener cacheListener = dataListeners.remove(key);
269         if (null == cacheListener) {
270             return;
271         }
272         Optional.ofNullable(caches.remove(key)).ifPresent(optional -> optional.listenable().removeListener(cacheListener));
273     }
274     
275     @Override
276     public void close() {
277         caches.values().forEach(CuratorCache::close);
278         waitForCacheClose();
279         CloseableUtils.closeQuietly(client);
280     }
281     
282     /*
283      * TODO wait 500ms, close cache before close client, or will throw exception Because of asynchronous processing, may cause client to close first and cache has not yet closed the end. Wait for new
284      * version of Curator to fix this. BUG address: https://issues.apache.org/jira/browse/CURATOR-157
285      */
286     private void waitForCacheClose() {
287         try {
288             Thread.sleep(500L);
289         } catch (final InterruptedException ex) {
290             Thread.currentThread().interrupt();
291         }
292     }
293     
294     @Override
295     public String getType() {
296         return "ZooKeeper";
297     }
298 }