View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
19  
20  import lombok.AccessLevel;
21  import lombok.Getter;
22  import lombok.RequiredArgsConstructor;
23  import lombok.extern.slf4j.Slf4j;
24  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
25  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
26  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
27  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
28  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
29  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
30  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
31  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
33  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculator;
34  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
35  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
36  import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
37  import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
38  
39  import java.util.Iterator;
40  import java.util.Objects;
41  import java.util.Optional;
42  import java.util.concurrent.ArrayBlockingQueue;
43  import java.util.concurrent.ThreadFactory;
44  import java.util.concurrent.ThreadPoolExecutor;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  
48  /**
49   * Matching table inventory checker.
50   */
51  @RequiredArgsConstructor
52  @Slf4j
53  public abstract class MatchingTableInventoryChecker implements TableInventoryChecker {
54      
55      @Getter(AccessLevel.PROTECTED)
56      private final TableInventoryCheckParameter param;
57      
58      private final AtomicBoolean canceling = new AtomicBoolean(false);
59      
60      private volatile TableInventoryCalculator<TableInventoryCheckCalculatedResult> sourceCalculator;
61      
62      private volatile TableInventoryCalculator<TableInventoryCheckCalculatedResult> targetCalculator;
63      
64      @Override
65      public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
66          ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build(param.getJobId() + "-matching-check-%d");
67          ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
68          try {
69              return checkSingleTableInventoryData(param, executor);
70          } finally {
71              executor.shutdown();
72              executor.shutdownNow();
73          }
74      }
75      
76      private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
77          TableInventoryCalculateParameter sourceParam = new TableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(),
78                  param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY, param.getQueryCondition());
79          TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
80          sourceParam.setRange(Range.closed(null != checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition() : checkRangePosition.getSourceRange().getLowerBound(),
81                  checkRangePosition.getSourceRange().getUpperBound()));
82          TableInventoryCalculateParameter targetParam = getTableInventoryCalculateParameter(param, checkRangePosition);
83          TableInventoryCalculator<TableInventoryCheckCalculatedResult> sourceCalculator = buildSingleTableInventoryCalculator();
84          this.sourceCalculator = sourceCalculator;
85          TableInventoryCalculator<TableInventoryCheckCalculatedResult> targetCalculator = buildSingleTableInventoryCalculator();
86          this.targetCalculator = targetCalculator;
87          try {
88              Iterator<TableInventoryCheckCalculatedResult> sourceCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator();
89              Iterator<TableInventoryCheckCalculatedResult> targetCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator();
90              return checkSingleTableInventoryData(sourceCalculatedResults, targetCalculatedResults, param, executor);
91          } finally {
92              QuietlyCloser.close(sourceParam.getCalculationContext());
93              QuietlyCloser.close(targetParam.getCalculationContext());
94              this.sourceCalculator = null;
95              this.targetCalculator = null;
96          }
97      }
98      
99      private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iterator<TableInventoryCheckCalculatedResult> sourceCalculatedResults,
100                                                                           final Iterator<TableInventoryCheckCalculatedResult> targetCalculatedResults,
101                                                                           final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
102         YamlTableDataConsistencyCheckResult checkResult = new YamlTableDataConsistencyCheckResult(true);
103         while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
104             if (null != param.getReadRateLimitAlgorithm()) {
105                 param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
106             }
107             TableInventoryCheckCalculatedResult sourceCalculatedResult = PipelineTaskUtils.waitFuture(executor.submit(sourceCalculatedResults::next));
108             TableInventoryCheckCalculatedResult targetCalculatedResult = PipelineTaskUtils.waitFuture(executor.submit(targetCalculatedResults::next));
109             if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) {
110                 checkResult.setMatched(false);
111                 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys());
112                 break;
113             }
114             TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
115             if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
116                 checkRangePosition.setSourcePosition(sourceCalculatedResult.getMaxUniqueKeyValue().get());
117             }
118             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
119                 checkRangePosition.setTargetPosition(targetCalculatedResult.getMaxUniqueKeyValue().get());
120             }
121             param.getProgressContext().onProgressUpdated(new PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
122         }
123         TableCheckRangePosition checkRangePosition = param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
124         checkRangePosition.setFinished(true);
125         if (sourceCalculatedResults.hasNext() || targetCalculatedResults.hasNext()) {
126             checkResult.setMatched(false);
127         }
128         checkRangePosition.setMatched(checkResult.isMatched());
129         return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
130     }
131     
132     private TableInventoryCalculateParameter getTableInventoryCalculateParameter(final TableInventoryCheckParameter param, final TableCheckRangePosition checkRangePosition) {
133         TableInventoryCalculateParameter result = new TableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(),
134                 param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY, param.getQueryCondition());
135         result.setRange(Range.closed(null != checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition() : checkRangePosition.getTargetRange().getLowerBound(),
136                 checkRangePosition.getTargetRange().getUpperBound()));
137         return result;
138     }
139     
140     protected abstract TableInventoryCalculator<TableInventoryCheckCalculatedResult> buildSingleTableInventoryCalculator();
141     
142     @Override
143     public void cancel() {
144         canceling.set(true);
145         Optional.ofNullable(sourceCalculator).ifPresent(TableInventoryCalculator::cancel);
146         Optional.ofNullable(targetCalculator).ifPresent(TableInventoryCalculator::cancel);
147     }
148     
149     @Override
150     public boolean isCanceling() {
151         return canceling.get();
152     }
153 }