1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.registrycenter.elasticjob;
19
20 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
21 import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
22 import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
23 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
24 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
25 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
26 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperPropertyKey;
27
28 import java.util.Properties;
29
30
31
32
33 public final class CoordinatorRegistryCenterInitializer {
34
35
36
37
38
39
40
41
42 public CoordinatorRegistryCenter createZookeeperRegistryCenter(final ModeConfiguration modeConfig, final String namespaceRelativePath) {
43 ClusterPersistRepositoryConfiguration repositoryConfig = (ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
44
45 CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(getZookeeperConfig(repositoryConfig, namespaceRelativePath));
46 result.init();
47 return result;
48 }
49
50 private ZookeeperConfiguration getZookeeperConfig(final ClusterPersistRepositoryConfiguration repositoryConfig, final String namespaceRelativePath) {
51
52 Properties props = repositoryConfig.getProps();
53 ZookeeperProperties zookeeperProps = new ZookeeperProperties(props);
54 String namespace = repositoryConfig.getNamespace() + (null != namespaceRelativePath ? namespaceRelativePath : "");
55 ZookeeperConfiguration result = new ZookeeperConfiguration(repositoryConfig.getServerLists(), namespace);
56 int retryIntervalMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.RETRY_INTERVAL_MILLISECONDS);
57 result.setBaseSleepTimeMilliseconds(retryIntervalMilliseconds);
58 int maxRetries = zookeeperProps.getValue(ZookeeperPropertyKey.MAX_RETRIES);
59 result.setMaxRetries(maxRetries);
60 result.setMaxSleepTimeMilliseconds(retryIntervalMilliseconds * maxRetries);
61 int timeToLiveSeconds = zookeeperProps.getValue(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS);
62 if (0 != timeToLiveSeconds) {
63 result.setSessionTimeoutMilliseconds(timeToLiveSeconds * 1000);
64 }
65 int operationTimeoutMilliseconds = zookeeperProps.getValue(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS);
66 if (0 != operationTimeoutMilliseconds) {
67 result.setConnectionTimeoutMilliseconds(operationTimeoutMilliseconds);
68 }
69 result.setDigest(zookeeperProps.getValue(ZookeeperPropertyKey.DIGEST));
70 return result;
71 }
72 }