View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
19  
20  import lombok.RequiredArgsConstructor;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
23  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
24  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
25  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
26  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
27  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
28  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
29  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
30  import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
31  import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
32  import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
33  import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
34  
35  import java.sql.SQLException;
36  import java.util.Iterator;
37  import java.util.Objects;
38  import java.util.Optional;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.ExecutionException;
41  import java.util.concurrent.Future;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.ThreadPoolExecutor;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  
47  /**
48   * Matching table inventory checker.
49   */
50  @RequiredArgsConstructor
51  @Slf4j
52  public abstract class MatchingTableInventoryChecker implements TableInventoryChecker {
53      
54      private final TableInventoryCheckParameter param;
55      
56      private final AtomicBoolean canceling = new AtomicBoolean(false);
57      
58      private volatile SingleTableInventoryCalculator sourceCalculator;
59      
60      private volatile SingleTableInventoryCalculator targetCalculator;
61      
62      @Override
63      public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
64          ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) + "-matching-check-%d");
65          ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
66          try {
67              return checkSingleTableInventoryData(param, executor);
68          } finally {
69              executor.shutdown();
70              executor.shutdownNow();
71          }
72      }
73      
74      private TableDataConsistencyCheckResult checkSingleTableInventoryData(final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
75          SingleTableInventoryCalculateParameter sourceParam = new SingleTableInventoryCalculateParameter(param.getSourceDataSource(), param.getSourceTable(),
76                  param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().toString()));
77          SingleTableInventoryCalculateParameter targetParam = new SingleTableInventoryCalculateParameter(param.getTargetDataSource(), param.getTargetTable(),
78                  param.getColumnNames(), param.getUniqueKeys(), param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().toString()));
79          SingleTableInventoryCalculator sourceCalculator = buildSingleTableInventoryCalculator();
80          this.sourceCalculator = sourceCalculator;
81          SingleTableInventoryCalculator targetCalculator = buildSingleTableInventoryCalculator();
82          this.targetCalculator = targetCalculator;
83          try {
84              Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults = waitFuture(executor.submit(() -> sourceCalculator.calculate(sourceParam))).iterator();
85              Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults = waitFuture(executor.submit(() -> targetCalculator.calculate(targetParam))).iterator();
86              return checkSingleTableInventoryData(sourceCalculatedResults, targetCalculatedResults, param, executor);
87          } finally {
88              QuietlyCloser.close(sourceParam.getCalculationContext());
89              QuietlyCloser.close(targetParam.getCalculationContext());
90              this.sourceCalculator = null;
91              this.targetCalculator = null;
92          }
93      }
94      
95      private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
96                                                                            final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
97                                                                            final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
98          YamlTableDataConsistencyCheckResult checkResult = new YamlTableDataConsistencyCheckResult(true);
99          while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
100             if (null != param.getReadRateLimitAlgorithm()) {
101                 param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
102             }
103             SingleTableInventoryCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
104             SingleTableInventoryCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
105             if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) {
106                 checkResult.setMatched(false);
107                 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys());
108                 break;
109             }
110             if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
111                 param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().toString(), sourceCalculatedResult.getMaxUniqueKeyValue().get());
112             }
113             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
114                 param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().toString(), targetCalculatedResult.getMaxUniqueKeyValue().get());
115             }
116             param.getProgressContext().onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
117         }
118         if (sourceCalculatedResults.hasNext()) {
119             checkResult.setMatched(false);
120             return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
121         }
122         if (targetCalculatedResults.hasNext()) {
123             checkResult.setMatched(false);
124             return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
125         }
126         return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
127     }
128     
129     // TODO use digest (crc32, murmurhash)
130     private String getJobIdDigest(final String jobId) {
131         return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
132     }
133     
134     private <T> T waitFuture(final Future<T> future) {
135         try {
136             return future.get();
137         } catch (final InterruptedException ex) {
138             Thread.currentThread().interrupt();
139             throw new SQLWrapperException(new SQLException(ex));
140         } catch (final ExecutionException ex) {
141             if (ex.getCause() instanceof PipelineSQLException) {
142                 throw (PipelineSQLException) ex.getCause();
143             }
144             throw new SQLWrapperException(new SQLException(ex));
145         }
146     }
147     
148     protected abstract SingleTableInventoryCalculator buildSingleTableInventoryCalculator();
149     
150     @Override
151     public void cancel() {
152         canceling.set(true);
153         Optional.ofNullable(sourceCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
154         Optional.ofNullable(targetCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
155     }
156     
157     @Override
158     public boolean isCanceling() {
159         return canceling.get();
160     }
161 }