View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
19  
20  import com.google.common.base.Strings;
21  import lombok.AccessLevel;
22  import lombok.Getter;
23  import lombok.extern.slf4j.Slf4j;
24  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
25  import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
26  import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
27  import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
28  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
29  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
30  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.ColumnValueReaderEngine;
31  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
33  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
34  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
38  import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
39  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
40  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
41  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
42  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
43  import org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
44  import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
45  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
46  import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
47  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
48  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
49  import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
50  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
51  
52  import javax.sql.DataSource;
53  import java.sql.Connection;
54  import java.sql.PreparedStatement;
55  import java.sql.ResultSet;
56  import java.sql.ResultSetMetaData;
57  import java.sql.SQLException;
58  import java.sql.Statement;
59  import java.util.Collection;
60  import java.util.Collections;
61  import java.util.LinkedList;
62  import java.util.List;
63  import java.util.Optional;
64  import java.util.concurrent.atomic.AtomicReference;
65  
66  /**
67   * Inventory dumper.
68   */
69  @HighFrequencyInvocation
70  @Slf4j
71  public final class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper {
72      
73      @Getter(AccessLevel.PROTECTED)
74      private final InventoryDumperContext dumperContext;
75      
76      private final PipelineChannel channel;
77      
78      private final DataSource dataSource;
79      
80      private final PipelineTableMetaDataLoader metaDataLoader;
81      
82      private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
83      
84      private final ColumnValueReaderEngine columnValueReaderEngine;
85      
86      private final AtomicReference<Statement> runningStatement = new AtomicReference<>();
87      
88      public InventoryDumper(final InventoryDumperContext dumperContext, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
89          this.dumperContext = dumperContext;
90          this.channel = channel;
91          this.dataSource = dataSource;
92          this.metaDataLoader = metaDataLoader;
93          DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
94          inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
95          columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
96      }
97      
98      @Override
99      protected void runBlocking() {
100         IngestPosition position = dumperContext.getCommonContext().getPosition();
101         if (position instanceof IngestFinishedPosition) {
102             log.info("Ignored because of already finished.");
103             return;
104         }
105         PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
106                 dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName());
107         try (Connection connection = dataSource.getConnection()) {
108             dump(tableMetaData, connection);
109         } catch (final SQLException ex) {
110             log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
111             throw new IngestException("Inventory dump failed on " + dumperContext.getActualTableName(), ex);
112         }
113     }
114     
115     @SuppressWarnings("MagicConstant")
116     private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException {
117         int batchSize = dumperContext.getBatchSize();
118         DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
119         if (null != dumperContext.getTransactionIsolation()) {
120             connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
121         }
122         try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpSQL())) {
123             runningStatement.set(preparedStatement);
124             if (!(databaseType instanceof MySQLDatabaseType)) {
125                 preparedStatement.setFetchSize(batchSize);
126             }
127             setParameters(preparedStatement);
128             try (ResultSet resultSet = preparedStatement.executeQuery()) {
129                 int rowCount = 0;
130                 JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
131                 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
132                 List<Record> dataRecords = new LinkedList<>();
133                 while (resultSet.next()) {
134                     if (dataRecords.size() >= batchSize) {
135                         channel.push(dataRecords);
136                         dataRecords = new LinkedList<>();
137                     }
138                     dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
139                     ++rowCount;
140                     if (!isRunning()) {
141                         log.info("Broke because of inventory dump is not running.");
142                         break;
143                     }
144                     if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
145                         rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
146                     }
147                 }
148                 dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
149                 channel.push(dataRecords);
150                 log.info("Inventory dump done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
151             } finally {
152                 runningStatement.set(null);
153             }
154         }
155     }
156     
157     private String buildInventoryDumpSQL() {
158         if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
159             return dumperContext.getQuerySQL();
160         }
161         String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
162         if (!dumperContext.hasUniqueKey()) {
163             return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName());
164         }
165         PrimaryKeyIngestPosition<?> primaryKeyPosition = (PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition();
166         PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
167         Collection<String> columnNames = Collections.singleton("*");
168         if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
169             if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) {
170                 return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
171             }
172             if (null != primaryKeyPosition.getBeginValue() && null == primaryKeyPosition.getEndValue()) {
173                 return inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
174             }
175         }
176         return inventoryDumpSQLBuilder.buildIndivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
177     }
178     
179     private void setParameters(final PreparedStatement preparedStatement) throws SQLException {
180         if (!dumperContext.hasUniqueKey()) {
181             return;
182         }
183         PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
184         PrimaryKeyIngestPosition<?> position = (PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition();
185         if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) {
186             preparedStatement.setObject(1, position.getBeginValue());
187             preparedStatement.setObject(2, position.getEndValue());
188             return;
189         }
190         if (PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
191             if (null != position.getBeginValue()) {
192                 preparedStatement.setObject(1, position.getBeginValue());
193             }
194             if (null != position.getEndValue()) {
195                 preparedStatement.setObject(2, position.getEndValue());
196             }
197         }
198     }
199     
200     private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
201         int columnCount = resultSetMetaData.getColumnCount();
202         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, dumperContext.getLogicTableName(), newPosition(resultSet), columnCount);
203         List<String> insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
204         ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
205                 () -> new PipelineInvalidParameterException("Insert colum names count not equals ResultSet column count"));
206         for (int i = 1; i <= columnCount; i++) {
207             String columnName = insertColumnNames.isEmpty() ? resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
208             ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName), () -> new PipelineInvalidParameterException(String.format("Column name is %s", columnName)));
209             boolean isUniqueKey = tableMetaData.getColumnMetaData(columnName).isUniqueKey();
210             result.addColumn(new Column(columnName, columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, isUniqueKey));
211         }
212         return result;
213     }
214     
215     private IngestPosition newPosition(final ResultSet resultSet) throws SQLException {
216         return dumperContext.hasUniqueKey()
217                 ? PrimaryKeyIngestPositionFactory.newInstance(
218                         resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition()).getEndValue())
219                 : new IngestPlaceholderPosition();
220     }
221     
222     @Override
223     protected void doStop() {
224         Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
225     }
226 }