1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
19
20 import lombok.AccessLevel;
21 import lombok.Getter;
22 import lombok.RequiredArgsConstructor;
23 import lombok.extern.slf4j.Slf4j;
24 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
25 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
26 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
27 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
28 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
29 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
30 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
31 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
33 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
34 import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
35 import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
36 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
37 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
38
39 import java.sql.SQLException;
40 import java.util.Iterator;
41 import java.util.Objects;
42 import java.util.Optional;
43 import java.util.concurrent.ArrayBlockingQueue;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.ThreadFactory;
47 import java.util.concurrent.ThreadPoolExecutor;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.atomic.AtomicBoolean;
50
51
52
53
54 @RequiredArgsConstructor
55 @Slf4j
56 public abstract class MatchingTableInventoryChecker implements TableInventoryChecker {
57
58 @Getter(AccessLevel.PROTECTED)
59 private final TableInventoryCheckParameter param;
60
61 private final AtomicBoolean canceling = new AtomicBoolean(false);
62
63 private volatile SingleTableInventoryCalculator sourceCalculator;
64
65 private volatile SingleTableInventoryCalculator targetCalculator;
66
67 @Override
68 public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
69 ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build(param.getJobId() + "-matching-check-%d");
70 ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
71 try {
72 return checkSingleTableInventoryData(param, executor);
73 } finally {
74 executor.shutdown();
75 executor.shutdownNow();
76 }
77 }
78
79 private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
80 SingleTableInventoryCalculateParameter sourceParam = new SingleTableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(),
81 param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY);
82 sourceParam.setQueryRange(new QueryRange(param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName()), true, null));
83 SingleTableInventoryCalculateParameter targetParam = new SingleTableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(),
84 param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY);
85 targetParam.setQueryRange(new QueryRange(param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName()), true, null));
86 SingleTableInventoryCalculator sourceCalculator = buildSingleTableInventoryCalculator();
87 this.sourceCalculator = sourceCalculator;
88 SingleTableInventoryCalculator targetCalculator = buildSingleTableInventoryCalculator();
89 this.targetCalculator = targetCalculator;
90 try {
91 Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults = waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator();
92 Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults = waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator();
93 return checkSingleTableInventoryData(sourceCalculatedResults, targetCalculatedResults, param, executor);
94 } finally {
95 QuietlyCloser.close(sourceParam.getCalculationContext());
96 QuietlyCloser.close(targetParam.getCalculationContext());
97 this.sourceCalculator = null;
98 this.targetCalculator = null;
99 }
100 }
101
102 private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
103 final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
104 final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
105 YamlTableDataConsistencyCheckResult checkResult = new YamlTableDataConsistencyCheckResult(true);
106 while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
107 if (null != param.getReadRateLimitAlgorithm()) {
108 param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
109 }
110 SingleTableInventoryCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
111 SingleTableInventoryCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
112 if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) {
113 checkResult.setMatched(false);
114 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys());
115 break;
116 }
117 if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
118 param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
119 }
120 if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
121 param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName(), targetCalculatedResult.getMaxUniqueKeyValue().get());
122 }
123 param.getProgressContext().onProgressUpdated(new PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
124 }
125 if (sourceCalculatedResults.hasNext()) {
126 checkResult.setMatched(false);
127 return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
128 }
129 if (targetCalculatedResults.hasNext()) {
130 checkResult.setMatched(false);
131 return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
132 }
133 return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
134 }
135
136 private <T> T waitFuture(final Future<T> future) {
137 try {
138 return future.get();
139 } catch (final InterruptedException ex) {
140 Thread.currentThread().interrupt();
141 throw new SQLWrapperException(new SQLException(ex));
142 } catch (final ExecutionException ex) {
143 if (ex.getCause() instanceof PipelineSQLException) {
144 throw (PipelineSQLException) ex.getCause();
145 }
146 throw new SQLWrapperException(new SQLException(ex));
147 }
148 }
149
150 protected abstract SingleTableInventoryCalculator buildSingleTableInventoryCalculator();
151
152 @Override
153 public void cancel() {
154 canceling.set(true);
155 Optional.ofNullable(sourceCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
156 Optional.ofNullable(targetCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
157 }
158
159 @Override
160 public boolean isCanceling() {
161 return canceling.get();
162 }
163 }