View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
19  
20  import lombok.AccessLevel;
21  import lombok.Getter;
22  import lombok.RequiredArgsConstructor;
23  import lombok.extern.slf4j.Slf4j;
24  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
25  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
26  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
27  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
28  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
29  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
30  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
31  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
33  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
34  import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
35  import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
36  import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
37  import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
38  
39  import java.sql.SQLException;
40  import java.util.Iterator;
41  import java.util.Objects;
42  import java.util.Optional;
43  import java.util.concurrent.ArrayBlockingQueue;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.Future;
46  import java.util.concurrent.ThreadFactory;
47  import java.util.concurrent.ThreadPoolExecutor;
48  import java.util.concurrent.TimeUnit;
49  import java.util.concurrent.atomic.AtomicBoolean;
50  
51  /**
52   * Matching table inventory checker.
53   */
54  @RequiredArgsConstructor
55  @Slf4j
56  public abstract class MatchingTableInventoryChecker implements TableInventoryChecker {
57      
58      @Getter(AccessLevel.PROTECTED)
59      private final TableInventoryCheckParameter param;
60      
61      private final AtomicBoolean canceling = new AtomicBoolean(false);
62      
63      private volatile SingleTableInventoryCalculator sourceCalculator;
64      
65      private volatile SingleTableInventoryCalculator targetCalculator;
66      
67      @Override
68      public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
69          ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build(param.getJobId() + "-matching-check-%d");
70          ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
71          try {
72              return checkSingleTableInventoryData(param, executor);
73          } finally {
74              executor.shutdown();
75              executor.shutdownNow();
76          }
77      }
78      
79      private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
80          SingleTableInventoryCalculateParameter sourceParam = new SingleTableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(),
81                  param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY);
82          sourceParam.setQueryRange(new QueryRange(param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName()), true, null));
83          SingleTableInventoryCalculateParameter targetParam = new SingleTableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(),
84                  param.getColumnNames(), param.getUniqueKeys(), QueryType.RANGE_QUERY);
85          targetParam.setQueryRange(new QueryRange(param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName()), true, null));
86          SingleTableInventoryCalculator sourceCalculator = buildSingleTableInventoryCalculator();
87          this.sourceCalculator = sourceCalculator;
88          SingleTableInventoryCalculator targetCalculator = buildSingleTableInventoryCalculator();
89          this.targetCalculator = targetCalculator;
90          try {
91              Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults = waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator();
92              Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults = waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator();
93              return checkSingleTableInventoryData(sourceCalculatedResults, targetCalculatedResults, param, executor);
94          } finally {
95              QuietlyCloser.close(sourceParam.getCalculationContext());
96              QuietlyCloser.close(targetParam.getCalculationContext());
97              this.sourceCalculator = null;
98              this.targetCalculator = null;
99          }
100     }
101     
102     private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
103                                                                           final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
104                                                                           final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
105         YamlTableDataConsistencyCheckResult checkResult = new YamlTableDataConsistencyCheckResult(true);
106         while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
107             if (null != param.getReadRateLimitAlgorithm()) {
108                 param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
109             }
110             SingleTableInventoryCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
111             SingleTableInventoryCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
112             if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) {
113                 checkResult.setMatched(false);
114                 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys());
115                 break;
116             }
117             if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
118                 param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
119             }
120             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
121                 param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName(), targetCalculatedResult.getMaxUniqueKeyValue().get());
122             }
123             param.getProgressContext().onProgressUpdated(new PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
124         }
125         if (sourceCalculatedResults.hasNext()) {
126             checkResult.setMatched(false);
127             return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
128         }
129         if (targetCalculatedResults.hasNext()) {
130             checkResult.setMatched(false);
131             return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
132         }
133         return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
134     }
135     
136     private <T> T waitFuture(final Future<T> future) {
137         try {
138             return future.get();
139         } catch (final InterruptedException ex) {
140             Thread.currentThread().interrupt();
141             throw new SQLWrapperException(new SQLException(ex));
142         } catch (final ExecutionException ex) {
143             if (ex.getCause() instanceof PipelineSQLException) {
144                 throw (PipelineSQLException) ex.getCause();
145             }
146             throw new SQLWrapperException(new SQLException(ex));
147         }
148     }
149     
150     protected abstract SingleTableInventoryCalculator buildSingleTableInventoryCalculator();
151     
152     @Override
153     public void cancel() {
154         canceling.set(true);
155         Optional.ofNullable(sourceCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
156         Optional.ofNullable(targetCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
157     }
158     
159     @Override
160     public boolean isCanceling() {
161         return canceling.get();
162     }
163 }