View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.mode.repository.cluster.zookeeper;
19  
20  import com.google.common.base.Strings;
21  import lombok.Getter;
22  import org.apache.curator.framework.CuratorFramework;
23  import org.apache.curator.framework.CuratorFrameworkFactory;
24  import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
25  import org.apache.curator.framework.api.ACLProvider;
26  import org.apache.curator.framework.recipes.cache.CuratorCache;
27  import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
28  import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
29  import org.apache.curator.retry.ExponentialBackoffRetry;
30  import org.apache.curator.utils.CloseableUtils;
31  import org.apache.shardingsphere.infra.instance.InstanceContext;
32  import org.apache.shardingsphere.infra.instance.InstanceContextAware;
33  import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
34  import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
35  import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
36  import org.apache.shardingsphere.mode.event.DataChangedEvent;
37  import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
38  import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
39  import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
40  import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
41  import org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
42  import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
43  import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperPropertyKey;
44  import org.apache.zookeeper.CreateMode;
45  import org.apache.zookeeper.KeeperException.NodeExistsException;
46  import org.apache.zookeeper.KeeperException.OperationTimeoutException;
47  import org.apache.zookeeper.ZooDefs;
48  import org.apache.zookeeper.data.ACL;
49  
50  import java.nio.charset.StandardCharsets;
51  import java.util.Collections;
52  import java.util.Comparator;
53  import java.util.List;
54  import java.util.Map;
55  import java.util.concurrent.ConcurrentHashMap;
56  import java.util.concurrent.TimeUnit;
57  
58  /**
59   * Registry repository of ZooKeeper.
60   */
61  public final class ZookeeperRepository implements ClusterPersistRepository, InstanceContextAware {
62      
63      private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
64      
65      private final Builder builder = CuratorFrameworkFactory.builder();
66      
67      private CuratorFramework client;
68      
69      @Getter
70      private DistributedLockHolder distributedLockHolder;
71      
72      @Override
73      public void init(final ClusterPersistRepositoryConfiguration config) {
74          ZookeeperProperties zookeeperProps = new ZookeeperProperties(config.getProps());
75          client = buildCuratorClient(config, zookeeperProps);
76          distributedLockHolder = new DistributedLockHolder(getType(), client, zookeeperProps);
77          initCuratorClient(zookeeperProps);
78      }
79      
80      private CuratorFramework buildCuratorClient(final ClusterPersistRepositoryConfiguration config, final ZookeeperProperties zookeeperProps) {
81          int retryIntervalMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS);
82          int maxRetries = zookeeperProps.getValue(ZookeeperPropertyKey.MAX_RETRIES);
83          int timeToLiveSeconds = zookeeperProps.getValue(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS);
84          int operationTimeoutMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS);
85          builder.connectString(config.getServerLists())
86                  .ensembleTracker(false)
87                  .retryPolicy(new ExponentialBackoffRetry(retryIntervalMilliseconds, maxRetries, retryIntervalMilliseconds * maxRetries))
88                  .namespace(config.getNamespace());
89          if (0 != timeToLiveSeconds) {
90              builder.sessionTimeoutMs(timeToLiveSeconds * 1000);
91          }
92          if (0 != operationTimeoutMilliseconds) {
93              builder.connectionTimeoutMs(operationTimeoutMilliseconds);
94          }
95          String digest = zookeeperProps.getValue(ZookeeperPropertyKey.DIGEST);
96          if (!Strings.isNullOrEmpty(digest)) {
97              builder.authorization(ZookeeperPropertyKey.DIGEST.getKey(), digest.getBytes(StandardCharsets.UTF_8))
98                      .aclProvider(new ACLProvider() {
99                          
100                         @Override
101                         public List<ACL> getDefaultAcl() {
102                             return ZooDefs.Ids.CREATOR_ALL_ACL;
103                         }
104                         
105                         @Override
106                         public List<ACL> getAclForPath(final String path) {
107                             return ZooDefs.Ids.CREATOR_ALL_ACL;
108                         }
109                     });
110         }
111         return builder.build();
112     }
113     
114     private void initCuratorClient(final ZookeeperProperties zookeeperProps) {
115         client.start();
116         try {
117             int retryIntervalMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS);
118             int maxRetries = zookeeperProps.getValue(ZookeeperPropertyKey.MAX_RETRIES);
119             if (!client.blockUntilConnected(retryIntervalMilliseconds * maxRetries, TimeUnit.MILLISECONDS)) {
120                 client.close();
121                 throw new OperationTimeoutException();
122             }
123         } catch (final InterruptedException ex) {
124             Thread.currentThread().interrupt();
125         } catch (final OperationTimeoutException ex) {
126             ZookeeperExceptionHandler.handleException(ex);
127         }
128     }
129     
130     @Override
131     public List<String> getChildrenKeys(final String key) {
132         try {
133             List<String> result = client.getChildren().forPath(key);
134             result.sort(Comparator.reverseOrder());
135             return result;
136             // CHECKSTYLE:OFF
137         } catch (final Exception ex) {
138             // CHECKSTYLE:ON
139             ZookeeperExceptionHandler.handleException(ex);
140             return Collections.emptyList();
141         }
142     }
143     
144     @Override
145     public void persist(final String key, final String value) {
146         try {
147             if (isExisted(key)) {
148                 update(key, value);
149             } else {
150                 client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
151             }
152             // CHECKSTYLE:OFF
153         } catch (final Exception ex) {
154             // CHECKSTYLE:ON
155             ZookeeperExceptionHandler.handleException(ex);
156         }
157     }
158     
159     @Override
160     public void update(final String key, final String value) {
161         try {
162             client.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8));
163             // CHECKSTYLE:OFF
164         } catch (final Exception ex) {
165             // CHECKSTYLE:ON
166             ZookeeperExceptionHandler.handleException(ex);
167         }
168     }
169     
170     @Override
171     public String getDirectly(final String key) {
172         try {
173             return new String(client.getData().forPath(key), StandardCharsets.UTF_8);
174             // CHECKSTYLE:OFF
175         } catch (final Exception ex) {
176             // CHECKSTYLE:ON
177             ZookeeperExceptionHandler.handleException(ex);
178             return null;
179         }
180     }
181     
182     @Override
183     public boolean isExisted(final String key) {
184         try {
185             return null != client.checkExists().forPath(key);
186             // CHECKSTYLE:OFF
187         } catch (final Exception ex) {
188             // CHECKSTYLE:ON
189             ZookeeperExceptionHandler.handleException(ex);
190             return false;
191         }
192     }
193     
194     @Override
195     public void persistEphemeral(final String key, final String value) {
196         try {
197             if (isExisted(key)) {
198                 client.delete().deletingChildrenIfNeeded().forPath(key);
199             }
200             client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
201             // CHECKSTYLE:OFF
202         } catch (final Exception ex) {
203             // CHECKSTYLE:ON
204             ZookeeperExceptionHandler.handleException(ex);
205         }
206     }
207     
208     @Override
209     public void persistExclusiveEphemeral(final String key, final String value) {
210         try {
211             client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
212         } catch (final NodeExistsException ex) {
213             throw new ClusterPersistRepositoryException(ex);
214             // CHECKSTYLE:OFF
215         } catch (final Exception ex) {
216             ZookeeperExceptionHandler.handleException(ex);
217             // CHECKSTYLE:ON
218         }
219     }
220     
221     @Override
222     public void delete(final String key) {
223         try {
224             if (isExisted(key)) {
225                 client.delete().deletingChildrenIfNeeded().forPath(key);
226             }
227             // CHECKSTYLE:OFF
228         } catch (final Exception ex) {
229             // CHECKSTYLE:ON
230             ZookeeperExceptionHandler.handleException(ex);
231         }
232     }
233     
234     @Override
235     public void watch(final String key, final DataChangedEventListener listener) {
236         CuratorCache cache = caches.get(key);
237         if (null == cache) {
238             cache = CuratorCache.build(client, key);
239             caches.put(key, cache);
240         }
241         CuratorCacheListener curatorCacheListener = CuratorCacheListener.builder()
242                 .afterInitialized()
243                 .forTreeCache(client, (framework, treeCacheListener) -> {
244                     Type changedType = getChangedType(treeCacheListener.getType());
245                     if (Type.IGNORED != changedType) {
246                         listener.onChange(new DataChangedEvent(treeCacheListener.getData().getPath(),
247                                 new String(treeCacheListener.getData().getData(), StandardCharsets.UTF_8), changedType));
248                     }
249                 }).build();
250         cache.listenable().addListener(curatorCacheListener);
251         cache.start();
252     }
253     
254     private Type getChangedType(final TreeCacheEvent.Type type) {
255         switch (type) {
256             case NODE_ADDED:
257                 return Type.ADDED;
258             case NODE_UPDATED:
259                 return Type.UPDATED;
260             case NODE_REMOVED:
261                 return Type.DELETED;
262             default:
263                 return Type.IGNORED;
264         }
265     }
266     
267     @Override
268     public void close() {
269         caches.values().forEach(CuratorCache::close);
270         waitForCacheClose();
271         CloseableUtils.closeQuietly(client);
272     }
273     
274     /*
275      * TODO wait 500ms, close cache before close client, or will throw exception Because of asynchronous processing, may cause client to close first and cache has not yet closed the end. Wait for new
276      * version of Curator to fix this. BUG address: https://issues.apache.org/jira/browse/CURATOR-157
277      */
278     private void waitForCacheClose() {
279         try {
280             Thread.sleep(500L);
281         } catch (final InterruptedException ex) {
282             Thread.currentThread().interrupt();
283         }
284     }
285     
286     @Override
287     public void setInstanceContext(final InstanceContext instanceContext) {
288         client.getConnectionStateListenable().addListener(new SessionConnectionListener(instanceContext, this));
289     }
290     
291     @Override
292     public String getType() {
293         return "ZooKeeper";
294     }
295 }