1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
19
20 import lombok.Getter;
21 import lombok.RequiredArgsConstructor;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
24 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
25 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
26 import org.apache.shardingsphere.infra.algorithm.core.exception.UnsupportedAlgorithmOnDatabaseTypeException;
27 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
28
29 import java.sql.Connection;
30 import java.sql.PreparedStatement;
31 import java.sql.ResultSet;
32 import java.sql.SQLException;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Optional;
37 import java.util.stream.Collectors;
38
39
40
41
42 @Slf4j
43 public final class CRC32SingleTableInventoryCalculator extends AbstractSingleTableInventoryCalculator {
44
45 @Override
46 public Iterable<SingleTableInventoryCalculatedResult> calculate(final SingleTableInventoryCalculateParameter param) {
47 PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType());
48 List<CalculatedItem> calculatedItems = param.getColumnNames().stream().map(each -> calculateCRC32(pipelineSQLBuilder, param, each)).collect(Collectors.toList());
49 return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
50 }
51
52 private CalculatedItem calculateCRC32(final PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder, final SingleTableInventoryCalculateParameter param, final String columnName) {
53 Optional<String> sql = pipelineSQLBuilder.buildCRC32SQL(param.getSchemaName(), param.getLogicTableName(), columnName);
54 ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new UnsupportedAlgorithmOnDatabaseTypeException("DataConsistencyCalculate", "CRC32", param.getDatabaseType()));
55 try (
56 Connection connection = param.getDataSource().getConnection();
57 PreparedStatement preparedStatement = connection.prepareStatement(sql.get());
58 ResultSet resultSet = preparedStatement.executeQuery()) {
59 setCurrentStatement(preparedStatement);
60 resultSet.next();
61 long crc32 = resultSet.getLong(1);
62 int recordsCount = resultSet.getInt(2);
63 return new CalculatedItem(crc32, recordsCount);
64 } catch (final SQLException ex) {
65 throw new PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), param.getLogicTableName(), ex);
66 }
67 }
68
69 @RequiredArgsConstructor
70 @Getter
71 private static final class CalculatedItem {
72
73 private final long crc32;
74
75 private final int recordsCount;
76 }
77
78 @RequiredArgsConstructor
79 @Getter
80 private static final class CalculatedResult implements SingleTableInventoryCalculatedResult {
81
82 private final int recordsCount;
83
84 private final Collection<Long> columnsCrc32;
85
86 @Override
87 public boolean equals(final Object o) {
88 if (null == o) {
89 return false;
90 }
91 if (this == o) {
92 return true;
93 }
94 if (getClass() != o.getClass()) {
95 log.warn("RecordSingleTableInventoryCalculatedResult type not match, o.className={}", o.getClass().getName());
96 return false;
97 }
98 final CalculatedResult that = (CalculatedResult) o;
99 if (recordsCount != that.recordsCount) {
100 log.info("recordsCount not match, recordsCount={}, that.recordsCount={}", recordsCount, that.recordsCount);
101 return false;
102 }
103 if (!columnsCrc32.equals(that.columnsCrc32)) {
104 log.info("columnsCrc32 not match, columnsCrc32={}, that.columnsCrc32={}", columnsCrc32, that.columnsCrc32);
105 return false;
106 }
107 return true;
108 }
109
110 @Override
111 public int hashCode() {
112 int result = recordsCount;
113 result = 31 * result + columnsCrc32.hashCode();
114 return result;
115 }
116
117
118 @Override
119 public Optional<Object> getMaxUniqueKeyValue() {
120 return Optional.empty();
121 }
122 }
123 }