View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.mode.repository.cluster.etcd;
19  
20  import com.google.common.base.Preconditions;
21  import com.google.common.base.Splitter;
22  import com.google.common.util.concurrent.ThreadFactoryBuilder;
23  import io.etcd.jetcd.ByteSequence;
24  import io.etcd.jetcd.Client;
25  import io.etcd.jetcd.KeyValue;
26  import io.etcd.jetcd.Watch;
27  import io.etcd.jetcd.options.DeleteOption;
28  import io.etcd.jetcd.options.GetOption;
29  import io.etcd.jetcd.options.OptionsUtil;
30  import io.etcd.jetcd.options.PutOption;
31  import io.etcd.jetcd.options.WatchOption;
32  import io.etcd.jetcd.support.Observers;
33  import io.etcd.jetcd.support.Util;
34  import io.etcd.jetcd.watch.WatchEvent;
35  import lombok.Getter;
36  import lombok.SneakyThrows;
37  import lombok.extern.slf4j.Slf4j;
38  import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
39  import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
40  import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
41  import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
42  import org.apache.shardingsphere.mode.event.DataChangedEvent;
43  import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
44  import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
45  import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
46  
47  import java.nio.charset.StandardCharsets;
48  import java.util.List;
49  import java.util.concurrent.CompletableFuture;
50  import java.util.concurrent.ExecutionException;
51  import java.util.concurrent.ExecutorService;
52  import java.util.concurrent.Executors;
53  import java.util.stream.Collectors;
54  
55  /**
56   * Registry repository of ETCD.
57   */
58  @Slf4j
59  public final class EtcdRepository implements ClusterPersistRepository {
60      
61      private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Etcd-EventListener-%d").build());
62      
63      private Client client;
64      
65      private EtcdProperties etcdProps;
66      
67      @Getter
68      private DistributedLockHolder distributedLockHolder;
69      
70      @Override
71      public void init(final ClusterPersistRepositoryConfiguration config) {
72          etcdProps = new EtcdProperties(config.getProps());
73          client = Client.builder().endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(config.getServerLists())))
74                  .namespace(ByteSequence.from(config.getNamespace(), StandardCharsets.UTF_8))
75                  .maxInboundMessageSize((int) 32e9)
76                  .build();
77          distributedLockHolder = new DistributedLockHolder(getType(), client, etcdProps);
78      }
79      
80      @SneakyThrows({InterruptedException.class, ExecutionException.class})
81      @Override
82      public String getDirectly(final String key) {
83          List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8)).get().getKvs();
84          return keyValues.isEmpty() ? null : keyValues.iterator().next().getValue().toString(StandardCharsets.UTF_8);
85      }
86      
87      @SneakyThrows({InterruptedException.class, ExecutionException.class})
88      @Override
89      public List<String> getChildrenKeys(final String key) {
90          String prefix = key + PATH_SEPARATOR;
91          ByteSequence prefixByteSequence = ByteSequence.from(prefix, StandardCharsets.UTF_8);
92          GetOption getOption = GetOption.newBuilder().isPrefix(true).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).build();
93          List<KeyValue> keyValues = client.getKVClient().get(prefixByteSequence, getOption).get().getKvs();
94          return keyValues.stream().map(each -> getSubNodeKeyName(prefix, each.getKey().toString(StandardCharsets.UTF_8))).distinct().collect(Collectors.toList());
95      }
96      
97      @Override
98      public boolean isExisted(final String key) {
99          return false;
100     }
101     
102     private String getSubNodeKeyName(final String prefix, final String fullPath) {
103         String pathWithoutPrefix = fullPath.substring(prefix.length());
104         return pathWithoutPrefix.contains(PATH_SEPARATOR) ? pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(PATH_SEPARATOR)) : pathWithoutPrefix;
105     }
106     
107     @SneakyThrows({InterruptedException.class, ExecutionException.class})
108     @Override
109     public void persist(final String key, final String value) {
110         buildParentPath(key);
111         client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8), ByteSequence.from(value, StandardCharsets.UTF_8)).get();
112     }
113     
114     @Override
115     public void update(final String key, final String value) {
116         // TODO
117     }
118     
119     @SneakyThrows({InterruptedException.class, ExecutionException.class})
120     @Override
121     public void persistEphemeral(final String key, final String value) {
122         buildParentPath(key);
123         long leaseId = client.getLeaseClient().grant(etcdProps.getValue(EtcdPropertyKey.TIME_TO_LIVE_SECONDS)).get().getID();
124         client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
125         }));
126         client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8), ByteSequence.from(value, StandardCharsets.UTF_8), PutOption.newBuilder().withLeaseId(leaseId).build()).get();
127     }
128     
129     @Override
130     public void persistExclusiveEphemeral(final String key, final String value) {
131         persistEphemeral(key, value);
132     }
133     
134     private void buildParentPath(final String key) throws ExecutionException, InterruptedException {
135         StringBuilder parentPath = new StringBuilder();
136         String[] partPath = key.split(PATH_SEPARATOR);
137         for (int index = 1; index < partPath.length - 1; index++) {
138             parentPath.append(PATH_SEPARATOR);
139             parentPath.append(partPath[index]);
140             String path = parentPath.toString();
141             List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(path, StandardCharsets.UTF_8)).get().getKvs();
142             if (keyValues.isEmpty()) {
143                 client.getKVClient().put(ByteSequence.from(path, StandardCharsets.UTF_8), ByteSequence.from("", StandardCharsets.UTF_8)).get();
144             }
145         }
146     }
147     
148     @Override
149     public void delete(final String key) {
150         client.getKVClient().delete(ByteSequence.from(key, StandardCharsets.UTF_8), DeleteOption.newBuilder().isPrefix(true).build());
151     }
152     
153     @Override
154     public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
155         Watch.Listener listener = Watch.listener(response -> {
156             for (WatchEvent each : response.getEvents()) {
157                 Type type = getEventChangedType(each);
158                 if (Type.IGNORED != type) {
159                     dispatchEvent(dataChangedEventListener, each, type);
160                 }
161             }
162         });
163         ByteSequence prefix = ByteSequence.from(key, StandardCharsets.UTF_8);
164         Preconditions.checkNotNull(prefix, "prefix should not be null");
165         client.getWatchClient().watch(prefix,
166                 WatchOption.newBuilder().withRange(OptionsUtil.prefixEndOf(prefix)).build(), listener);
167     }
168     
169     private Type getEventChangedType(final WatchEvent event) {
170         if (1 == event.getKeyValue().getVersion()) {
171             return Type.ADDED;
172         }
173         switch (event.getEventType()) {
174             case PUT:
175                 return Type.UPDATED;
176             case DELETE:
177                 return Type.DELETED;
178             default:
179                 return Type.IGNORED;
180         }
181     }
182     
183     private void dispatchEvent(final DataChangedEventListener dataChangedEventListener, final WatchEvent event, final Type type) {
184         CompletableFuture.runAsync(() -> dataChangedEventListener.onChange(new DataChangedEvent(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8),
185                 event.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> {
186                     if (null != throwable) {
187                         log.error("Dispatch event failed", throwable);
188                     }
189                 });
190     }
191     
192     @Override
193     public void close() {
194         client.close();
195         EVENT_LISTENER_EXECUTOR.shutdown();
196     }
197     
198     @Override
199     public String getType() {
200         return "etcd";
201     }
202 }