1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
19
20 import lombok.AccessLevel;
21 import lombok.Getter;
22 import lombok.RequiredArgsConstructor;
23 import lombok.extern.slf4j.Slf4j;
24 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
25 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
26 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
27 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
28 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
29 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
30 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
31 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
33 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculator;
34 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
35 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
36 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
37 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
38
39 import java.util.Iterator;
40 import java.util.Objects;
41 import java.util.Optional;
42 import java.util.concurrent.ArrayBlockingQueue;
43 import java.util.concurrent.ThreadFactory;
44 import java.util.concurrent.ThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47
48
49
50
51 @RequiredArgsConstructor
52 @Slf4j
53 public abstract class MatchingTableInventoryChecker implements TableInventoryChecker {
54
55 @Getter(AccessLevel.PROTECTED)
56 private final TableInventoryCheckParameter param;
57
58 private final AtomicBoolean canceling = new AtomicBoolean(false);
59
60 private volatile TableInventoryCalculator<TableInventoryCheckCalculatedResult> sourceCalculator;
61
62 private volatile TableInventoryCalculator<TableInventoryCheckCalculatedResult> targetCalculator;
63
64 @Override
65 public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
66 ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build(param.getJobId() + "-matching-check-%d");
67 ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
68 try {
69 return checkSingleTableInventoryData(param, executor);
70 } finally {
71 executor.shutdown();
72 executor.shutdownNow();
73 }
74 }
75
76 private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
77 TableInventoryCalculateParameter sourceParam = new TableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(),
78 param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY, param.getQueryCondition());
79 TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
80 sourceParam.setRange(Range.closed(null != checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition() : checkRangePosition.getSourceRange().getLowerBound(),
81 checkRangePosition.getSourceRange().getUpperBound()));
82 TableInventoryCalculateParameter targetParam = getTableInventoryCalculateParameter(param, checkRangePosition);
83 TableInventoryCalculator<TableInventoryCheckCalculatedResult> sourceCalculator = buildSingleTableInventoryCalculator();
84 this.sourceCalculator = sourceCalculator;
85 TableInventoryCalculator<TableInventoryCheckCalculatedResult> targetCalculator = buildSingleTableInventoryCalculator();
86 this.targetCalculator = targetCalculator;
87 try {
88 Iterator<TableInventoryCheckCalculatedResult> sourceCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator();
89 Iterator<TableInventoryCheckCalculatedResult> targetCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator();
90 return checkSingleTableInventoryData(sourceCalculatedResults, targetCalculatedResults, param, executor);
91 } finally {
92 QuietlyCloser.close(sourceParam.getCalculationContext());
93 QuietlyCloser.close(targetParam.getCalculationContext());
94 this.sourceCalculator = null;
95 this.targetCalculator = null;
96 }
97 }
98
99 private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iterator<TableInventoryCheckCalculatedResult> sourceCalculatedResults,
100 final Iterator<TableInventoryCheckCalculatedResult> targetCalculatedResults,
101 final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
102 YamlTableDataConsistencyCheckResult checkResult = new YamlTableDataConsistencyCheckResult(true);
103 while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
104 if (null != param.getReadRateLimitAlgorithm()) {
105 param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
106 }
107 TableInventoryCheckCalculatedResult sourceCalculatedResult = PipelineTaskUtils.waitFuture(executor.submit(sourceCalculatedResults::next));
108 TableInventoryCheckCalculatedResult targetCalculatedResult = PipelineTaskUtils.waitFuture(executor.submit(targetCalculatedResults::next));
109 if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) {
110 checkResult.setMatched(false);
111 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys());
112 break;
113 }
114 TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
115 if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
116 checkRangePosition.setSourcePosition(sourceCalculatedResult.getMaxUniqueKeyValue().get());
117 }
118 if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
119 checkRangePosition.setTargetPosition(targetCalculatedResult.getMaxUniqueKeyValue().get());
120 }
121 param.getProgressContext().onProgressUpdated(new PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
122 }
123 TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
124 checkRangePosition.setFinished(true);
125 if (sourceCalculatedResults.hasNext() || targetCalculatedResults.hasNext()) {
126 checkResult.setMatched(false);
127 }
128 checkRangePosition.setMatched(checkResult.isMatched());
129 return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
130 }
131
132 private TableInventoryCalculateParameter getTableInventoryCalculateParameter(final TableInventoryCheckParameter param, final TableCheckRangePosition checkRangePosition) {
133 TableInventoryCalculateParameter result = new TableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(),
134 param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY, param.getQueryCondition());
135 result.setRange(Range.closed(null != checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition() : checkRangePosition.getTargetRange().getLowerBound(),
136 checkRangePosition.getTargetRange().getUpperBound()));
137 return result;
138 }
139
140 protected abstract TableInventoryCalculator<TableInventoryCheckCalculatedResult> buildSingleTableInventoryCalculator();
141
142 @Override
143 public void cancel() {
144 canceling.set(true);
145 Optional.ofNullable(sourceCalculator).ifPresent(TableInventoryCalculator::cancel);
146 Optional.ofNullable(targetCalculator).ifPresent(TableInventoryCalculator::cancel);
147 }
148
149 @Override
150 public boolean isCanceling() {
151 return canceling.get();
152 }
153 }