1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
19
20 import lombok.RequiredArgsConstructor;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
23 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
24 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
25 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
26 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
27 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
28 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
29 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
30 import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
31 import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
32 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
33 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
34
35 import java.sql.SQLException;
36 import java.util.Iterator;
37 import java.util.Objects;
38 import java.util.Optional;
39 import java.util.concurrent.ArrayBlockingQueue;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ThreadFactory;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicBoolean;
46
47
48
49
50 @RequiredArgsConstructor
51 @Slf4j
52 public abstract class MatchingTableInventoryChecker implements TableInventoryChecker {
53
54 private final TableInventoryCheckParameter param;
55
56 private final AtomicBoolean canceling = new AtomicBoolean(false);
57
58 private volatile SingleTableInventoryCalculator sourceCalculator;
59
60 private volatile SingleTableInventoryCalculator targetCalculator;
61
62 @Override
63 public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
64 ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) + "-matching-check-%d");
65 ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
66 try {
67 return checkSingleTableInventoryData(param, executor);
68 } finally {
69 executor.shutdown();
70 executor.shutdownNow();
71 }
72 }
73
74 private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
75 SingleTableInventoryCalculateParameter sourceParam = new SingleTableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(),
76 param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().toString()));
77 SingleTableInventoryCalculateParameter targetParam = new SingleTableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(),
78 param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().toString()));
79 SingleTableInventoryCalculator sourceCalculator = buildSingleTableInventoryCalculator();
80 this.sourceCalculator = sourceCalculator;
81 SingleTableInventoryCalculator targetCalculator = buildSingleTableInventoryCalculator();
82 this.targetCalculator = targetCalculator;
83 try {
84 Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults = waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator();
85 Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults = waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator();
86 return checkSingleTableInventoryData(sourceCalculatedResults, targetCalculatedResults, param, executor);
87 } finally {
88 QuietlyCloser.close(sourceParam.getCalculationContext());
89 QuietlyCloser.close(targetParam.getCalculationContext());
90 this.sourceCalculator = null;
91 this.targetCalculator = null;
92 }
93 }
94
95 private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
96 final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
97 final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
98 YamlTableDataConsistencyCheckResult checkResult = new YamlTableDataConsistencyCheckResult(true);
99 while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
100 if (null != param.getReadRateLimitAlgorithm()) {
101 param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
102 }
103 SingleTableInventoryCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
104 SingleTableInventoryCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
105 if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) {
106 checkResult.setMatched(false);
107 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys());
108 break;
109 }
110 if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
111 param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().toString(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
112 }
113 if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
114 param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().toString(), targetCalculatedResult.getMaxUniqueKeyValue().get());
115 }
116 param.getProgressContext().onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
117 }
118 if (sourceCalculatedResults.hasNext()) {
119 checkResult.setMatched(false);
120 return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
121 }
122 if (targetCalculatedResults.hasNext()) {
123 checkResult.setMatched(false);
124 return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
125 }
126 return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
127 }
128
129
130 private String getJobIdDigest(final String jobId) {
131 return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
132 }
133
134 private <T> T waitFuture(final Future<T> future) {
135 try {
136 return future.get();
137 } catch (final InterruptedException ex) {
138 Thread.currentThread().interrupt();
139 throw new SQLWrapperException(new SQLException(ex));
140 } catch (final ExecutionException ex) {
141 if (ex.getCause() instanceof PipelineSQLException) {
142 throw (PipelineSQLException) ex.getCause();
143 }
144 throw new SQLWrapperException(new SQLException(ex));
145 }
146 }
147
148 protected abstract SingleTableInventoryCalculator buildSingleTableInventoryCalculator();
149
150 @Override
151 public void cancel() {
152 canceling.set(true);
153 Optional.ofNullable(sourceCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
154 Optional.ofNullable(targetCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
155 }
156
157 @Override
158 public boolean isCanceling() {
159 return canceling.get();
160 }
161 }