1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
19
20 import io.netty.channel.Channel;
21 import lombok.Getter;
22 import lombok.SneakyThrows;
23 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
24 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
25 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
26 import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
27 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
28 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
31 import org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
32 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
33
34 import java.util.Collection;
35 import java.util.HashMap;
36 import java.util.LinkedList;
37 import java.util.Map;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.locks.Condition;
40 import java.util.concurrent.locks.Lock;
41 import java.util.concurrent.locks.ReentrantLock;
42
43
44
45
46 public final class PipelineCDCSocketSink implements PipelineSink {
47
48 private static final long DEFAULT_TIMEOUT_MILLISECONDS = 100L;
49
50 private final Lock lock = new ReentrantLock();
51
52 private final Condition condition = lock.newCondition();
53
54 @Getter
55 private final Channel channel;
56
57 private final ShardingSphereDatabase database;
58
59 private final Map<String, String> tableSchemaNameMap;
60
61 public PipelineCDCSocketSink(final Channel channel, final ShardingSphereDatabase database, final Collection<String> schemaTableNames) {
62 this.channel = channel;
63 this.database = database;
64 tableSchemaNameMap = new HashMap<>(schemaTableNames.size(), 1F);
65 schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
66 String[] split = each.split("\\.");
67 tableSchemaNameMap.put(split[1], split[0]);
68 });
69 }
70
71 @Override
72 public PipelineJobUpdateProgress write(final String ackId, final Collection<Record> records) {
73 if (records.isEmpty()) {
74 return new PipelineJobUpdateProgress(0);
75 }
76 while (!channel.isWritable() && channel.isActive()) {
77 doAwait();
78 }
79 if (!channel.isActive()) {
80 return new PipelineJobUpdateProgress(0);
81 }
82 Collection<DataRecordResult.Record> resultRecords = getResultRecords(records);
83 DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
84 channel.writeAndFlush(CDCResponseUtils.succeed("", ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
85 return new PipelineJobUpdateProgress(resultRecords.size());
86 }
87
88 @SneakyThrows(InterruptedException.class)
89 private void doAwait() {
90 lock.lock();
91 long startMillis = System.currentTimeMillis();
92 long endMillis = startMillis;
93 boolean awaitResult;
94 try {
95 do {
96 awaitResult = condition.await(DEFAULT_TIMEOUT_MILLISECONDS - (endMillis - startMillis), TimeUnit.MILLISECONDS);
97 endMillis = System.currentTimeMillis();
98 } while (!awaitResult && DEFAULT_TIMEOUT_MILLISECONDS > endMillis - startMillis);
99 } finally {
100 lock.unlock();
101 }
102 }
103
104 private Collection<DataRecordResult.Record> getResultRecords(final Collection<Record> records) {
105 Collection<DataRecordResult.Record> result = new LinkedList<>();
106 for (Record each : records) {
107 if (each instanceof DataRecord) {
108 DataRecord dataRecord = (DataRecord) each;
109 result.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(), tableSchemaNameMap.get(dataRecord.getTableName()), dataRecord));
110 }
111 }
112 return result;
113 }
114
115 @Override
116 public void close() {
117 channel.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), "The socket channel is closed."));
118 }
119 }