1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
19
20 import com.github.benmanes.caffeine.cache.Cache;
21 import com.github.benmanes.caffeine.cache.Caffeine;
22 import lombok.Getter;
23 import lombok.RequiredArgsConstructor;
24 import lombok.SneakyThrows;
25 import lombok.extern.slf4j.Slf4j;
26 import org.apache.commons.lang3.RandomStringUtils;
27 import org.apache.commons.lang3.tuple.Pair;
28 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
29 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
30 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
31 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
32 import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
33 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
34 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
38 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
39 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
40
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.PriorityQueue;
47 import java.util.concurrent.TimeUnit;
48 import java.util.stream.Collectors;
49
50
51
52
53 @RequiredArgsConstructor
54 @Slf4j
55 public final class CDCImporter extends AbstractPipelineLifecycleRunnable implements Importer {
56
57 @Getter
58 private final String importerId = RandomStringUtils.randomAlphanumeric(8);
59
60 private final List<CDCChannelProgressPair> channelProgressPairs;
61
62 private final int batchSize;
63
64 private final long timeoutMillis;
65
66 private final PipelineSink sink;
67
68 private final boolean needSorting;
69
70 private final JobRateLimitAlgorithm rateLimitAlgorithm;
71
72 private final PriorityQueue<CSNRecords> csnRecordsQueue = new PriorityQueue<>(new CSNRecordsComparator());
73
74 private final Cache<String, List<Pair<CDCChannelProgressPair, CDCAckPosition>>> ackCache = Caffeine.newBuilder().maximumSize(10000).expireAfterAccess(5, TimeUnit.MINUTES).build();
75
76 @Override
77 protected void runBlocking() {
78 CDCImporterManager.putImporter(this);
79 for (CDCChannelProgressPair each : channelProgressPairs) {
80 each.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
81 }
82 while (isRunning()) {
83 if (needSorting) {
84 doWithSorting();
85 } else {
86 doWithoutSorting();
87 }
88 if (channelProgressPairs.isEmpty()) {
89 break;
90 }
91 }
92 }
93
94 @SneakyThrows(InterruptedException.class)
95 private void doWithSorting() {
96 if (null != rateLimitAlgorithm) {
97 rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
98 }
99 List<CSNRecords> csnRecordsList = getCsnRecordsList();
100 if (csnRecordsList.isEmpty()) {
101 TimeUnit.MILLISECONDS.sleep(timeoutMillis);
102 return;
103 }
104
105 String ackId = CDCAckId.build(importerId).marshal();
106 if (1 == csnRecordsList.size()) {
107 processCSNRecords(csnRecordsList.get(0), ackId);
108 } else {
109 processCSNRecordsList(csnRecordsList, ackId);
110 }
111 }
112
113 private List<CSNRecords> getCsnRecordsList() {
114 List<CSNRecords> result = new LinkedList<>();
115 CSNRecords firstRecords = null;
116 for (int i = 0, count = channelProgressPairs.size(); i < count; i++) {
117 prepareTransactionRecords();
118 CSNRecords csnRecords = csnRecordsQueue.peek();
119 if (null == csnRecords) {
120 continue;
121 }
122 if (null == firstRecords) {
123 csnRecords = csnRecordsQueue.poll();
124 firstRecords = csnRecords;
125 result.add(csnRecords);
126 } else if (csnRecords.getCsn() == firstRecords.getCsn()) {
127 csnRecords = csnRecordsQueue.poll();
128 result.add(csnRecords);
129 }
130 }
131 return result;
132 }
133
134
135
136 private void prepareTransactionRecords() {
137 if (csnRecordsQueue.isEmpty()) {
138 prepareWhenQueueIsEmpty();
139 } else {
140 prepareWhenQueueIsNotEmpty(csnRecordsQueue.peek().getCsn());
141 }
142 }
143
144 private void prepareWhenQueueIsEmpty() {
145 for (CDCChannelProgressPair each : channelProgressPairs) {
146 PipelineChannel channel = each.getChannel();
147 List<Record> records = channel.poll();
148 if (records.isEmpty()) {
149 continue;
150 }
151 if (0 == getDataRecordsCount(records)) {
152 channel.ack(records);
153 continue;
154 }
155 csnRecordsQueue.add(new CSNRecords(findFirstDataRecord(records).getCsn(), each, records));
156 }
157 }
158
159 private void prepareWhenQueueIsNotEmpty(final long oldestCSN) {
160 for (CDCChannelProgressPair each : channelProgressPairs) {
161 PipelineChannel channel = each.getChannel();
162 List<Record> records = channel.peek();
163 if (records.isEmpty()) {
164 continue;
165 }
166 if (0 == getDataRecordsCount(records)) {
167 records = channel.poll();
168 channel.ack(records);
169 continue;
170 }
171 long csn = findFirstDataRecord(records).getCsn();
172 if (csn <= oldestCSN) {
173 records = channel.poll();
174 csnRecordsQueue.add(new CSNRecords(csn, each, records));
175 }
176 }
177 }
178
179 private int getDataRecordsCount(final List<Record> records) {
180 return (int) records.stream().filter(DataRecord.class::isInstance).count();
181 }
182
183 private DataRecord findFirstDataRecord(final List<Record> records) {
184 for (Record each : records) {
185 if (each instanceof DataRecord) {
186 return (DataRecord) each;
187 }
188 }
189 throw new IllegalStateException("No data record found");
190 }
191
192 private void processCSNRecords(final CSNRecords csnRecords, final String ackId) {
193 List<Record> records = csnRecords.getRecords();
194 ackCache.put(ackId, Collections.singletonList(Pair.of(csnRecords.getChannelProgressPair(), new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records)))));
195 sink.write(ackId, filterDataRecords(records));
196 }
197
198 private void processCSNRecordsList(final List<CSNRecords> csnRecordsList, final String ackId) {
199 List<Pair<CDCChannelProgressPair, CDCAckPosition>> ackValue = csnRecordsList.stream().map(each -> Pair.of(each.getChannelProgressPair(),
200 new CDCAckPosition(each.getRecords().get(each.getRecords().size() - 1), getDataRecordsCount(each.getRecords())))).collect(Collectors.toList());
201 ackCache.put(ackId, ackValue);
202 Collection<Record> records = new ArrayList<>(ackValue.stream().mapToInt(each -> each.getRight().getDataRecordCount()).sum());
203 csnRecordsList.forEach(each -> records.addAll(filterDataRecords(each.getRecords())));
204 sink.write(ackId, filterDataRecords(records));
205 }
206
207 private List<Record> filterDataRecords(final Collection<Record> records) {
208 return records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
209 }
210
211 private void doWithoutSorting() {
212 for (CDCChannelProgressPair each : channelProgressPairs) {
213 doWithoutSorting(each);
214 }
215 }
216
217 private void doWithoutSorting(final CDCChannelProgressPair channelProgressPair) {
218 PipelineChannel channel = channelProgressPair.getChannel();
219 List<Record> records = channel.fetch(batchSize, timeoutMillis);
220 if (records.isEmpty()) {
221 return;
222 }
223 Record lastRecord = records.get(records.size() - 1);
224 if (records.stream().noneMatch(DataRecord.class::isInstance)) {
225 channel.ack(records);
226 channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
227 if (lastRecord instanceof FinishedRecord) {
228 channelProgressPairs.remove(channelProgressPair);
229 }
230 return;
231 }
232 if (null != rateLimitAlgorithm) {
233 rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
234 }
235 String ackId = CDCAckId.build(importerId).marshal();
236 ackCache.put(ackId, Collections.singletonList(Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records)))));
237 sink.write(ackId, records);
238 }
239
240
241
242
243
244
245 public void ack(final String ackId) {
246 List<Pair<CDCChannelProgressPair, CDCAckPosition>> channelPositionPairList = ackCache.getIfPresent(ackId);
247 if (null == channelPositionPairList) {
248 log.warn("Could not find cached ack info, ack id: {}", ackId);
249 return;
250 }
251 for (Pair<CDCChannelProgressPair, CDCAckPosition> each : channelPositionPairList) {
252 CDCAckPosition ackPosition = each.getRight();
253 Record lastRecord = ackPosition.getLastRecord();
254 each.getLeft().getChannel().ack(Collections.singletonList(lastRecord));
255 if (lastRecord instanceof FinishedRecord) {
256 channelProgressPairs.remove(each.getKey());
257 }
258 each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
259 }
260 ackCache.invalidate(ackId);
261 }
262
263 @Override
264 protected void doStop() {
265 CDCImporterManager.removeImporter(importerId);
266 }
267 }