1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
19
20 import io.netty.channel.Channel;
21 import lombok.Getter;
22 import lombok.SneakyThrows;
23 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
24 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
25 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
26 import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
27 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
28 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
31 import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
32 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
33
34 import java.io.IOException;
35 import java.util.Collection;
36 import java.util.HashMap;
37 import java.util.LinkedList;
38 import java.util.Map;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.locks.Condition;
41 import java.util.concurrent.locks.Lock;
42 import java.util.concurrent.locks.ReentrantLock;
43
44
45
46
47 public final class PipelineCDCSocketSink implements PipelineSink {
48
49 private static final long DEFAULT_TIMEOUT_MILLISECONDS = 100L;
50
51 private final Lock lock = new ReentrantLock();
52
53 private final Condition condition = lock.newCondition();
54
55 @Getter
56 private final Channel channel;
57
58 private final ShardingSphereDatabase database;
59
60 private final Map<String, String> tableSchemaNameMap;
61
62 public PipelineCDCSocketSink(final Channel channel, final ShardingSphereDatabase database, final Collection<String> schemaTableNames) {
63 this.channel = channel;
64 this.database = database;
65 tableSchemaNameMap = new HashMap<>(schemaTableNames.size(), 1F);
66 schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
67 String[] split = each.split("\\.");
68 tableSchemaNameMap.put(split[1], split[0]);
69 });
70 }
71
72 @Override
73 public PipelineJobProgressUpdatedParameter write(final String ackId, final Collection<Record> records) {
74 if (records.isEmpty()) {
75 return new PipelineJobProgressUpdatedParameter(0);
76 }
77 while (!channel.isWritable() && channel.isActive()) {
78 doAwait();
79 }
80 if (!channel.isActive()) {
81 return new PipelineJobProgressUpdatedParameter(0);
82 }
83 Collection<DataRecordResult.Record> resultRecords = getResultRecords(records);
84 DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
85 channel.writeAndFlush(CDCResponseUtils.succeed("", ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
86 return new PipelineJobProgressUpdatedParameter(resultRecords.size());
87 }
88
89 @SneakyThrows(InterruptedException.class)
90 private void doAwait() {
91 lock.lock();
92 long startMillis = System.currentTimeMillis();
93 long endMillis = startMillis;
94 boolean awaitResult;
95 try {
96 do {
97 awaitResult = condition.await(DEFAULT_TIMEOUT_MILLISECONDS - (endMillis - startMillis), TimeUnit.MILLISECONDS);
98 endMillis = System.currentTimeMillis();
99 } while (!awaitResult && DEFAULT_TIMEOUT_MILLISECONDS > endMillis - startMillis);
100 } finally {
101 lock.unlock();
102 }
103 }
104
105 private Collection<DataRecordResult.Record> getResultRecords(final Collection<Record> records) {
106 Collection<DataRecordResult.Record> result = new LinkedList<>();
107 for (Record each : records) {
108 if (each instanceof DataRecord) {
109 DataRecord dataRecord = (DataRecord) each;
110 result.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(), tableSchemaNameMap.get(dataRecord.getTableName()), dataRecord));
111 }
112 }
113 return result;
114 }
115
116 @Override
117 public void close() throws IOException {
118 channel.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), "The socket channel is closed."));
119 }
120 }