View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;
19  
20  import lombok.AccessLevel;
21  import lombok.NoArgsConstructor;
22  import lombok.extern.slf4j.Slf4j;
23  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
24  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
25  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
26  import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
27  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
28  import org.apache.shardingsphere.infra.database.mariadb.type.MariaDBDatabaseType;
29  import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
30  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
31  
32  import javax.sql.DataSource;
33  import java.sql.Connection;
34  import java.sql.PreparedStatement;
35  import java.sql.ResultSet;
36  import java.sql.SQLException;
37  import java.util.Optional;
38  
39  /**
40   * Inventory records count calculator.
41   */
42  @NoArgsConstructor(access = AccessLevel.PRIVATE)
43  @Slf4j
44  public final class InventoryRecordsCountCalculator {
45      
46      /**
47       * Get table records count.
48       *
49       * @param dumperContext inventory dumper context
50       * @param dataSource data source
51       * @return table records count
52       * @throws SplitPipelineJobByUniqueKeyException if there's exception from database
53       */
54      public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) {
55          String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
56          String actualTableName = dumperContext.getActualTableName();
57          PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
58          Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
59          try {
60              if (sql.isPresent()) {
61                  DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, dataSource.getDatabaseType().getType());
62                  long result = getEstimatedCount(databaseType, dataSource, sql.get());
63                  return result > 0 ? result : getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
64              }
65              return getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
66          } catch (final SQLException ex) {
67              String uniqueKey = dumperContext.hasUniqueKey() ? dumperContext.getUniqueKeyColumns().get(0).getName() : "";
68              throw new SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), uniqueKey, ex);
69          }
70      }
71      
72      private static long getEstimatedCount(final DatabaseType databaseType, final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
73          try (
74                  Connection connection = dataSource.getConnection();
75                  PreparedStatement preparedStatement = connection.prepareStatement(estimatedCountSQL)) {
76              if (databaseType instanceof MySQLDatabaseType || databaseType instanceof MariaDBDatabaseType) {
77                  preparedStatement.setString(1, connection.getCatalog());
78              }
79              try (ResultSet resultSet = preparedStatement.executeQuery()) {
80                  resultSet.next();
81                  return resultSet.getLong(1);
82              }
83          }
84      }
85      
86      private static long getCount(final DataSource dataSource, final String countSQL) throws SQLException {
87          long startTimeMillis = System.currentTimeMillis();
88          long result;
89          try (
90                  Connection connection = dataSource.getConnection();
91                  PreparedStatement preparedStatement = connection.prepareStatement(countSQL)) {
92              try (ResultSet resultSet = preparedStatement.executeQuery()) {
93                  resultSet.next();
94                  result = resultSet.getLong(1);
95              }
96          }
97          log.info("getCount cost {} ms, sql: {}, count: {}", System.currentTimeMillis() - startTimeMillis, countSQL, result);
98          return result;
99      }
100 }