View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
19  
20  import io.netty.channel.Channel;
21  import lombok.Getter;
22  import lombok.SneakyThrows;
23  import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
24  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
25  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
26  import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
27  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
28  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
31  import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
32  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
33  
34  import java.io.IOException;
35  import java.util.Collection;
36  import java.util.HashMap;
37  import java.util.LinkedList;
38  import java.util.Map;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.locks.Condition;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  /**
45   * Pipeline CDC socket sink.
46   */
47  public final class PipelineCDCSocketSink implements PipelineSink {
48      
49      private static final long DEFAULT_TIMEOUT_MILLISECONDS = 100L;
50      
51      private final Lock lock = new ReentrantLock();
52      
53      private final Condition condition = lock.newCondition();
54      
55      @Getter
56      private final Channel channel;
57      
58      private final ShardingSphereDatabase database;
59      
60      private final Map<String, String> tableSchemaNameMap;
61      
62      public PipelineCDCSocketSink(final Channel channel, final ShardingSphereDatabase database, final Collection<String> schemaTableNames) {
63          this.channel = channel;
64          this.database = database;
65          tableSchemaNameMap = new HashMap<>(schemaTableNames.size(), 1F);
66          schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
67              String[] split = each.split("\\.");
68              tableSchemaNameMap.put(split[1], split[0]);
69          });
70      }
71      
72      @Override
73      public PipelineJobProgressUpdatedParameter write(final String ackId, final Collection<Record> records) {
74          if (records.isEmpty()) {
75              return new PipelineJobProgressUpdatedParameter(0);
76          }
77          while (!channel.isWritable() && channel.isActive()) {
78              doAwait();
79          }
80          if (!channel.isActive()) {
81              return new PipelineJobProgressUpdatedParameter(0);
82          }
83          Collection<DataRecordResult.Record> resultRecords = getResultRecords(records);
84          DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
85          channel.writeAndFlush(CDCResponseUtils.succeed("", ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
86          return new PipelineJobProgressUpdatedParameter(resultRecords.size());
87      }
88      
89      @SneakyThrows(InterruptedException.class)
90      private void doAwait() {
91          lock.lock();
92          long startMillis = System.currentTimeMillis();
93          long endMillis = startMillis;
94          boolean awaitResult;
95          try {
96              do {
97                  awaitResult = condition.await(DEFAULT_TIMEOUT_MILLISECONDS - (endMillis - startMillis), TimeUnit.MILLISECONDS);
98                  endMillis = System.currentTimeMillis();
99              } while (!awaitResult && DEFAULT_TIMEOUT_MILLISECONDS > endMillis - startMillis);
100         } finally {
101             lock.unlock();
102         }
103     }
104     
105     private Collection<DataRecordResult.Record> getResultRecords(final Collection<Record> records) {
106         Collection<DataRecordResult.Record> result = new LinkedList<>();
107         for (Record each : records) {
108             if (each instanceof DataRecord) {
109                 DataRecord dataRecord = (DataRecord) each;
110                 result.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(), tableSchemaNameMap.get(dataRecord.getTableName()), dataRecord));
111             }
112         }
113         return result;
114     }
115     
116     @Override
117     public void close() throws IOException {
118         channel.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), "The socket channel is closed."));
119     }
120 }