View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
19  
20  import lombok.Getter;
21  import lombok.RequiredArgsConstructor;
22  import lombok.extern.slf4j.Slf4j;
23  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
24  import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
25  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
26  import org.apache.shardingsphere.infra.algorithm.core.exception.UnsupportedAlgorithmOnDatabaseTypeException;
27  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
28  
29  import java.sql.Connection;
30  import java.sql.PreparedStatement;
31  import java.sql.ResultSet;
32  import java.sql.SQLException;
33  import java.util.Collection;
34  import java.util.Collections;
35  import java.util.List;
36  import java.util.Optional;
37  import java.util.stream.Collectors;
38  
39  /**
40   * CRC32 single table inventory calculator.
41   */
42  @Slf4j
43  public final class CRC32SingleTableInventoryCalculator extends AbstractSingleTableInventoryCalculator {
44      
45      @Override
46      public Iterable<SingleTableInventoryCalculatedResult> calculate(final SingleTableInventoryCalculateParameter param) {
47          PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType());
48          List<CalculatedItem> calculatedItems = param.getColumnNames().stream().map(each -> calculateCRC32(pipelineSQLBuilder, param, each)).collect(Collectors.toList());
49          return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
50      }
51      
52      private CalculatedItem calculateCRC32(final PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder, final SingleTableInventoryCalculateParameter param, final String columnName) {
53          Optional<String> sql = pipelineSQLBuilder.buildCRC32SQL(param.getSchemaName(), param.getLogicTableName(), columnName);
54          ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new UnsupportedAlgorithmOnDatabaseTypeException("DataConsistencyCalculate", "CRC32", param.getDatabaseType()));
55          try (
56                  Connection connection = param.getDataSource().getConnection();
57                  PreparedStatement preparedStatement = connection.prepareStatement(sql.get());
58                  ResultSet resultSet = preparedStatement.executeQuery()) {
59              setCurrentStatement(preparedStatement);
60              resultSet.next();
61              long crc32 = resultSet.getLong(1);
62              int recordsCount = resultSet.getInt(2);
63              return new CalculatedItem(crc32, recordsCount);
64          } catch (final SQLException ex) {
65              throw new PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), param.getLogicTableName(), ex);
66          }
67      }
68      
69      @RequiredArgsConstructor
70      @Getter
71      private static final class CalculatedItem {
72          
73          private final long crc32;
74          
75          private final int recordsCount;
76      }
77      
78      @RequiredArgsConstructor
79      @Getter
80      private static final class CalculatedResult implements SingleTableInventoryCalculatedResult {
81          
82          private final int recordsCount;
83          
84          private final Collection<Long> columnsCrc32;
85          
86          @Override
87          public boolean equals(final Object o) {
88              if (null == o) {
89                  return false;
90              }
91              if (this == o) {
92                  return true;
93              }
94              if (getClass() != o.getClass()) {
95                  log.warn("RecordSingleTableInventoryCalculatedResult type not match, o.className={}", o.getClass().getName());
96                  return false;
97              }
98              final CalculatedResult that = (CalculatedResult) o;
99              if (recordsCount != that.recordsCount) {
100                 log.info("recordsCount not match, recordsCount={}, that.recordsCount={}", recordsCount, that.recordsCount);
101                 return false;
102             }
103             if (!columnsCrc32.equals(that.columnsCrc32)) {
104                 log.info("columnsCrc32 not match, columnsCrc32={}, that.columnsCrc32={}", columnsCrc32, that.columnsCrc32);
105                 return false;
106             }
107             return true;
108         }
109         
110         @Override
111         public int hashCode() {
112             int result = recordsCount;
113             result = 31 * result + columnsCrc32.hashCode();
114             return result;
115         }
116         
117         // TODO not support now
118         @Override
119         public Optional<Object> getMaxUniqueKeyValue() {
120             return Optional.empty();
121         }
122     }
123 }