1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;
19
20 import lombok.AccessLevel;
21 import lombok.NoArgsConstructor;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
24 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
25 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
26 import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
27 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
28 import org.apache.shardingsphere.infra.database.mariadb.type.MariaDBDatabaseType;
29 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
30 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
31
32 import javax.sql.DataSource;
33 import java.sql.Connection;
34 import java.sql.PreparedStatement;
35 import java.sql.ResultSet;
36 import java.sql.SQLException;
37 import java.util.Optional;
38
39
40
41
42 @NoArgsConstructor(access = AccessLevel.PRIVATE)
43 @Slf4j
44 public final class InventoryRecordsCountCalculator {
45
46
47
48
49
50
51
52
53
54 public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) {
55 String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
56 String actualTableName = dumperContext.getActualTableName();
57 PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
58 Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
59 try {
60 if (sql.isPresent()) {
61 DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, dataSource.getDatabaseType().getType());
62 long result = getEstimatedCount(databaseType, dataSource, sql.get());
63 return result > 0 ? result : getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
64 }
65 return getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
66 } catch (final SQLException ex) {
67 String uniqueKey = dumperContext.hasUniqueKey() ? dumperContext.getUniqueKeyColumns().get(0).getName() : "";
68 throw new SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), uniqueKey, ex);
69 }
70 }
71
72 private static long getEstimatedCount(final DatabaseType databaseType, final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
73 try (
74 Connection connection = dataSource.getConnection();
75 PreparedStatement preparedStatement = connection.prepareStatement(estimatedCountSQL)) {
76 if (databaseType instanceof MySQLDatabaseType || databaseType instanceof MariaDBDatabaseType) {
77 preparedStatement.setString(1, connection.getCatalog());
78 }
79 try (ResultSet resultSet = preparedStatement.executeQuery()) {
80 resultSet.next();
81 return resultSet.getLong(1);
82 }
83 }
84 }
85
86 private static long getCount(final DataSource dataSource, final String countSQL) throws SQLException {
87 long startTimeMillis = System.currentTimeMillis();
88 long result;
89 try (
90 Connection connection = dataSource.getConnection();
91 PreparedStatement preparedStatement = connection.prepareStatement(countSQL)) {
92 try (ResultSet resultSet = preparedStatement.executeQuery()) {
93 resultSet.next();
94 result = resultSet.getLong(1);
95 }
96 }
97 log.info("getCount cost {} ms, sql: {}, count: {}", System.currentTimeMillis() - startTimeMillis, countSQL, result);
98 return result;
99 }
100 }