View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
19  
20  import com.github.benmanes.caffeine.cache.Cache;
21  import com.github.benmanes.caffeine.cache.Caffeine;
22  import lombok.Getter;
23  import lombok.RequiredArgsConstructor;
24  import lombok.SneakyThrows;
25  import lombok.extern.slf4j.Slf4j;
26  import org.apache.commons.lang3.RandomStringUtils;
27  import org.apache.commons.lang3.tuple.Pair;
28  import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
29  import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
30  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
31  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
32  import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
33  import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
34  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
38  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
39  import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
40  
41  import java.util.ArrayList;
42  import java.util.Collection;
43  import java.util.Collections;
44  import java.util.LinkedList;
45  import java.util.List;
46  import java.util.PriorityQueue;
47  import java.util.concurrent.TimeUnit;
48  import java.util.stream.Collectors;
49  
50  /**
51   * CDC importer.
52   */
53  @RequiredArgsConstructor
54  @Slf4j
55  public final class CDCImporter extends AbstractPipelineLifecycleRunnable implements Importer {
56      
57      @Getter
58      private final String importerId = RandomStringUtils.randomAlphanumeric(8);
59      
60      private final List<CDCChannelProgressPair> channelProgressPairs;
61      
62      private final int batchSize;
63      
64      private final long timeoutMillis;
65      
66      private final PipelineSink sink;
67      
68      private final boolean needSorting;
69      
70      private final JobRateLimitAlgorithm rateLimitAlgorithm;
71      
72      private final PriorityQueue<CSNRecords> csnRecordsQueue = new PriorityQueue<>(new CSNRecordsComparator());
73      
74      private final Cache<String, List<Pair<CDCChannelProgressPair, CDCAckPosition>>> ackCache = Caffeine.newBuilder().maximumSize(10000).expireAfterAccess(5, TimeUnit.MINUTES).build();
75      
76      @Override
77      protected void runBlocking() {
78          CDCImporterManager.putImporter(this);
79          for (CDCChannelProgressPair each : channelProgressPairs) {
80              each.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
81          }
82          while (isRunning()) {
83              if (needSorting) {
84                  doWithSorting();
85              } else {
86                  doWithoutSorting();
87              }
88              if (channelProgressPairs.isEmpty()) {
89                  break;
90              }
91          }
92      }
93      
94      @SneakyThrows(InterruptedException.class)
95      private void doWithSorting() {
96          if (null != rateLimitAlgorithm) {
97              rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
98          }
99          List<CSNRecords> csnRecordsList = getCsnRecordsList();
100         if (csnRecordsList.isEmpty()) {
101             TimeUnit.MILLISECONDS.sleep(timeoutMillis);
102             return;
103         }
104         // TODO Combine small transactions into a large transaction, to improve transformation performance.
105         String ackId = CDCAckId.build(importerId).marshal();
106         if (1 == csnRecordsList.size()) {
107             processCSNRecords(csnRecordsList.get(0), ackId);
108         } else {
109             processCSNRecordsList(csnRecordsList, ackId);
110         }
111     }
112     
113     private List<CSNRecords> getCsnRecordsList() {
114         List<CSNRecords> result = new LinkedList<>();
115         CSNRecords firstRecords = null;
116         for (int i = 0, count = channelProgressPairs.size(); i < count; i++) {
117             prepareTransactionRecords();
118             CSNRecords csnRecords = csnRecordsQueue.peek();
119             if (null == csnRecords) {
120                 continue;
121             }
122             if (null == firstRecords) {
123                 csnRecords = csnRecordsQueue.poll();
124                 firstRecords = csnRecords;
125                 result.add(csnRecords);
126             } else if (csnRecords.getCsn() == firstRecords.getCsn()) {
127                 csnRecords = csnRecordsQueue.poll();
128                 result.add(csnRecords);
129             }
130         }
131         return result;
132     }
133     
134     // TODO openGauss CSN should be incremented for every transaction. Currently, CSN might be duplicated in transactions.
135     // TODO Use channels watermark depth to improve performance.
136     private void prepareTransactionRecords() {
137         if (csnRecordsQueue.isEmpty()) {
138             prepareWhenQueueIsEmpty();
139         } else {
140             prepareWhenQueueIsNotEmpty(csnRecordsQueue.peek().getCsn());
141         }
142     }
143     
144     private void prepareWhenQueueIsEmpty() {
145         for (CDCChannelProgressPair each : channelProgressPairs) {
146             PipelineChannel channel = each.getChannel();
147             List<Record> records = channel.poll();
148             if (records.isEmpty()) {
149                 continue;
150             }
151             if (0 == getDataRecordsCount(records)) {
152                 channel.ack(records);
153                 continue;
154             }
155             csnRecordsQueue.add(new CSNRecords(findFirstDataRecord(records).getCsn(), each, records));
156         }
157     }
158     
159     private void prepareWhenQueueIsNotEmpty(final long oldestCSN) {
160         for (CDCChannelProgressPair each : channelProgressPairs) {
161             PipelineChannel channel = each.getChannel();
162             List<Record> records = channel.peek();
163             if (records.isEmpty()) {
164                 continue;
165             }
166             if (0 == getDataRecordsCount(records)) {
167                 records = channel.poll();
168                 channel.ack(records);
169                 continue;
170             }
171             long csn = findFirstDataRecord(records).getCsn();
172             if (csn <= oldestCSN) {
173                 records = channel.poll();
174                 csnRecordsQueue.add(new CSNRecords(csn, each, records));
175             }
176         }
177     }
178     
179     private int getDataRecordsCount(final List<Record> records) {
180         return (int) records.stream().filter(DataRecord.class::isInstance).count();
181     }
182     
183     private DataRecord findFirstDataRecord(final List<Record> records) {
184         for (Record each : records) {
185             if (each instanceof DataRecord) {
186                 return (DataRecord) each;
187             }
188         }
189         throw new IllegalStateException("No data record found");
190     }
191     
192     private void processCSNRecords(final CSNRecords csnRecords, final String ackId) {
193         List<Record> records = csnRecords.getRecords();
194         ackCache.put(ackId, Collections.singletonList(Pair.of(csnRecords.getChannelProgressPair(), new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records)))));
195         sink.write(ackId, filterDataRecords(records));
196     }
197     
198     private void processCSNRecordsList(final List<CSNRecords> csnRecordsList, final String ackId) {
199         List<Pair<CDCChannelProgressPair, CDCAckPosition>> ackValue = csnRecordsList.stream().map(each -> Pair.of(each.getChannelProgressPair(),
200                 new CDCAckPosition(each.getRecords().get(each.getRecords().size() - 1), getDataRecordsCount(each.getRecords())))).collect(Collectors.toList());
201         ackCache.put(ackId, ackValue);
202         Collection<Record> records = new ArrayList<>(ackValue.stream().mapToInt(each -> each.getRight().getDataRecordCount()).sum());
203         csnRecordsList.forEach(each -> records.addAll(filterDataRecords(each.getRecords())));
204         sink.write(ackId, filterDataRecords(records));
205     }
206     
207     private List<Record> filterDataRecords(final Collection<Record> records) {
208         return records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
209     }
210     
211     private void doWithoutSorting() {
212         for (CDCChannelProgressPair each : channelProgressPairs) {
213             doWithoutSorting(each);
214         }
215     }
216     
217     private void doWithoutSorting(final CDCChannelProgressPair channelProgressPair) {
218         PipelineChannel channel = channelProgressPair.getChannel();
219         List<Record> records = channel.fetch(batchSize, timeoutMillis);
220         if (records.isEmpty()) {
221             return;
222         }
223         Record lastRecord = records.get(records.size() - 1);
224         if (records.stream().noneMatch(DataRecord.class::isInstance)) {
225             channel.ack(records);
226             channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
227             if (lastRecord instanceof FinishedRecord) {
228                 channelProgressPairs.remove(channelProgressPair);
229             }
230             return;
231         }
232         if (null != rateLimitAlgorithm) {
233             rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
234         }
235         String ackId = CDCAckId.build(importerId).marshal();
236         ackCache.put(ackId, Collections.singletonList(Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records)))));
237         sink.write(ackId, records);
238     }
239     
240     /**
241      * Ack.
242      *
243      * @param ackId ack id
244      */
245     public void ack(final String ackId) {
246         List<Pair<CDCChannelProgressPair, CDCAckPosition>> channelPositionPairList = ackCache.getIfPresent(ackId);
247         if (null == channelPositionPairList) {
248             log.warn("Could not find cached ack info, ack id: {}", ackId);
249             return;
250         }
251         for (Pair<CDCChannelProgressPair, CDCAckPosition> each : channelPositionPairList) {
252             CDCAckPosition ackPosition = each.getRight();
253             Record lastRecord = ackPosition.getLastRecord();
254             each.getLeft().getChannel().ack(Collections.singletonList(lastRecord));
255             if (lastRecord instanceof FinishedRecord) {
256                 channelProgressPairs.remove(each.getKey());
257             }
258             each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
259         }
260         ackCache.invalidate(ackId);
261     }
262     
263     @Override
264     protected void doStop() {
265         CDCImporterManager.removeImporter(importerId);
266     }
267 }