View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
19  
20  import io.netty.channel.Channel;
21  import lombok.Getter;
22  import lombok.SneakyThrows;
23  import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
24  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
25  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
26  import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
27  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
28  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
31  import org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
32  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
33  
34  import java.util.Collection;
35  import java.util.HashMap;
36  import java.util.LinkedList;
37  import java.util.Map;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.locks.Condition;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  /**
44   * Pipeline CDC socket sink.
45   */
46  public final class PipelineCDCSocketSink implements PipelineSink {
47      
48      private static final long DEFAULT_TIMEOUT_MILLISECONDS = 100L;
49      
50      private final Lock lock = new ReentrantLock();
51      
52      private final Condition condition = lock.newCondition();
53      
54      @Getter
55      private final Channel channel;
56      
57      private final ShardingSphereDatabase database;
58      
59      private final Map<String, String> tableSchemaNameMap;
60      
61      public PipelineCDCSocketSink(final Channel channel, final ShardingSphereDatabase database, final Collection<String> schemaTableNames) {
62          this.channel = channel;
63          this.database = database;
64          tableSchemaNameMap = new HashMap<>(schemaTableNames.size(), 1F);
65          schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
66              String[] split = each.split("\\.");
67              tableSchemaNameMap.put(split[1], split[0]);
68          });
69      }
70      
71      @Override
72      public PipelineJobUpdateProgress write(final String ackId, final Collection<Record> records) {
73          if (records.isEmpty()) {
74              return new PipelineJobUpdateProgress(0);
75          }
76          while (!channel.isWritable() && channel.isActive()) {
77              doAwait();
78          }
79          if (!channel.isActive()) {
80              return new PipelineJobUpdateProgress(0);
81          }
82          Collection<DataRecordResult.Record> resultRecords = getResultRecords(records);
83          DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
84          channel.writeAndFlush(CDCResponseUtils.succeed("", ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
85          return new PipelineJobUpdateProgress(resultRecords.size());
86      }
87      
88      @SneakyThrows(InterruptedException.class)
89      private void doAwait() {
90          lock.lock();
91          long startMillis = System.currentTimeMillis();
92          long endMillis = startMillis;
93          boolean awaitResult;
94          try {
95              do {
96                  awaitResult = condition.await(DEFAULT_TIMEOUT_MILLISECONDS - (endMillis - startMillis), TimeUnit.MILLISECONDS);
97                  endMillis = System.currentTimeMillis();
98              } while (!awaitResult && DEFAULT_TIMEOUT_MILLISECONDS > endMillis - startMillis);
99          } finally {
100             lock.unlock();
101         }
102     }
103     
104     private Collection<DataRecordResult.Record> getResultRecords(final Collection<Record> records) {
105         Collection<DataRecordResult.Record> result = new LinkedList<>();
106         for (Record each : records) {
107             if (each instanceof DataRecord) {
108                 DataRecord dataRecord = (DataRecord) each;
109                 result.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(), tableSchemaNameMap.get(dataRecord.getTableName()), dataRecord));
110             }
111         }
112         return result;
113     }
114     
115     @Override
116     public void close() {
117         channel.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), "The socket channel is closed."));
118     }
119 }