View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.commons.lang3.StringUtils;
22  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
23  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
24  import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
25  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
26  import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
27  import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
28  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
29  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
30  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
31  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point.InventoryPointQueryParameter;
33  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.InventoryRangeQueryParameter;
34  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
38  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
39  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
40  import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
41  import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
42  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
43  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
44  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
45  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
46  import org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
47  import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
48  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.BuildDivisibleSQLParameter;
49  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
50  import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
51  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
52  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
53  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
54  
55  import javax.sql.DataSource;
56  import java.sql.Connection;
57  import java.sql.PreparedStatement;
58  import java.sql.ResultSet;
59  import java.sql.ResultSetMetaData;
60  import java.sql.SQLException;
61  import java.sql.Statement;
62  import java.util.Collections;
63  import java.util.LinkedList;
64  import java.util.List;
65  import java.util.Objects;
66  import java.util.Optional;
67  import java.util.concurrent.atomic.AtomicLong;
68  import java.util.concurrent.atomic.AtomicReference;
69  
70  /**
71   * Inventory dumper.
72   */
73  @HighFrequencyInvocation
74  @Slf4j
75  public final class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper {
76      
77      private final InventoryDumperContext dumperContext;
78      
79      private final PipelineChannel channel;
80      
81      private final DataSource dataSource;
82      
83      private final PipelineTableMetaDataLoader metaDataLoader;
84      
85      private final InventoryDataRecordPositionCreator positionCreator;
86      
87      private final PipelineInventoryDumpSQLBuilder sqlBuilder;
88      
89      private final InventoryColumnValueReaderEngine columnValueReaderEngine;
90      
91      private final AtomicReference<Statement> runningStatement = new AtomicReference<>();
92      
93      public InventoryDumper(final InventoryDumperContext dumperContext, final PipelineChannel channel, final DataSource dataSource,
94                             final PipelineTableMetaDataLoader metaDataLoader, final InventoryDataRecordPositionCreator positionCreator) {
95          this.dumperContext = dumperContext;
96          this.channel = channel;
97          this.dataSource = dataSource;
98          this.metaDataLoader = metaDataLoader;
99          this.positionCreator = positionCreator;
100         DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
101         sqlBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
102         columnValueReaderEngine = new InventoryColumnValueReaderEngine(databaseType);
103     }
104     
105     @Override
106     protected void runBlocking() {
107         IngestPosition position = dumperContext.getCommonContext().getPosition();
108         if (position instanceof IngestFinishedPosition) {
109             log.info("Ignored because of already finished.");
110             return;
111         }
112         PipelineTableMetaData tableMetaData = getPipelineTableMetaData();
113         try (Connection connection = dataSource.getConnection()) {
114             if (StringUtils.isNotBlank(dumperContext.getQuerySQL()) || !dumperContext.hasUniqueKey() || isPrimaryKeyWithoutRange(position)) {
115                 dumpWithStreamingQuery(connection, tableMetaData);
116             } else {
117                 dumpByPage(connection, tableMetaData);
118             }
119             // CHECKSTYLE:OFF
120         } catch (final SQLException | RuntimeException ex) {
121             // CHECKSTYLE:ON
122             log.error("Inventory dump failed on {}", dumperContext.getActualTableName(), ex);
123             throw new IngestException("Inventory dump failed on " + dumperContext.getActualTableName(), ex);
124         }
125     }
126     
127     private PipelineTableMetaData getPipelineTableMetaData() {
128         String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
129         String tableName = dumperContext.getActualTableName();
130         return metaDataLoader.getTableMetaData(schemaName, tableName);
131     }
132     
133     private boolean isPrimaryKeyWithoutRange(final IngestPosition position) {
134         return position instanceof PrimaryKeyIngestPosition && null == ((PrimaryKeyIngestPosition<?>) position).getBeginValue() && null == ((PrimaryKeyIngestPosition<?>) position).getEndValue();
135     }
136     
137     @SuppressWarnings("MagicConstant")
138     private void dumpByPage(final Connection connection, final PipelineTableMetaData tableMetaData) throws SQLException {
139         log.info("Start to dump inventory data by page, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
140         if (null != dumperContext.getTransactionIsolation()) {
141             connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
142         }
143         boolean firstQuery = true;
144         AtomicLong rowCount = new AtomicLong();
145         IngestPosition position = dumperContext.getCommonContext().getPosition();
146         do {
147             QueryRange queryRange = new QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery && dumperContext.isFirstDump(),
148                     ((PrimaryKeyIngestPosition<?>) position).getEndValue());
149             InventoryQueryParameter<?> queryParam = new InventoryRangeQueryParameter(queryRange);
150             List<Record> dataRecords = dumpByPage(connection, queryParam, rowCount, tableMetaData);
151             if (dataRecords.size() > 1 && Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
152                 queryParam = new InventoryPointQueryParameter(getFirstUniqueKeyValue(dataRecords, 0));
153                 dataRecords = dumpByPage(connection, queryParam, rowCount, tableMetaData);
154             }
155             firstQuery = false;
156             if (dataRecords.isEmpty()) {
157                 position = new IngestFinishedPosition();
158                 dataRecords.add(new FinishedRecord(position));
159                 log.info("Inventory dump done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
160             } else {
161                 position = PrimaryKeyIngestPositionFactory.newInstance(getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1), queryRange.getUpper());
162             }
163             channel.push(dataRecords);
164             dumperContext.getCommonContext().setPosition(position);
165         } while (!(position instanceof IngestFinishedPosition));
166         log.info("End to dump inventory data by page, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
167     }
168     
169     private List<Record> dumpByPage(final Connection connection,
170                                     final InventoryQueryParameter<?> queryParam, final AtomicLong rowCount, final PipelineTableMetaData tableMetaData) throws SQLException {
171         DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
172         int batchSize = dumperContext.getBatchSize();
173         try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildDumpByPageSQL(queryParam), batchSize)) {
174             runningStatement.set(preparedStatement);
175             setParameters(preparedStatement, queryParam);
176             try (ResultSet resultSet = preparedStatement.executeQuery()) {
177                 JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
178                 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
179                 List<Record> result = new LinkedList<>();
180                 while (resultSet.next()) {
181                     if (result.size() >= batchSize) {
182                         if (!dumperContext.hasUniqueKey()) {
183                             channel.push(result);
184                         }
185                         result = new LinkedList<>();
186                     }
187                     result.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
188                     rowCount.incrementAndGet();
189                     if (!isRunning()) {
190                         log.info("Broke because of inventory dump is not running.");
191                         break;
192                     }
193                     if (null != rateLimitAlgorithm && 0 == rowCount.get() % batchSize) {
194                         rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
195                     }
196                 }
197                 return result;
198             } finally {
199                 runningStatement.set(null);
200             }
201         }
202     }
203     
204     private void setParameters(final PreparedStatement preparedStatement, final InventoryQueryParameter<?> queryParam) throws SQLException {
205         if (queryParam instanceof InventoryRangeQueryParameter) {
206             int parameterIndex = 1;
207             Object lower = ((InventoryRangeQueryParameter) queryParam).getValue().getLower();
208             if (null != lower) {
209                 preparedStatement.setObject(parameterIndex++, lower);
210             }
211             Object upper = ((InventoryRangeQueryParameter) queryParam).getValue().getUpper();
212             if (null != upper) {
213                 preparedStatement.setObject(parameterIndex++, upper);
214             }
215             preparedStatement.setInt(parameterIndex, dumperContext.getBatchSize());
216         } else if (queryParam instanceof InventoryPointQueryParameter) {
217             preparedStatement.setObject(1, queryParam.getValue());
218         } else {
219             throw new UnsupportedOperationException("Query type: " + queryParam.getValue());
220         }
221     }
222     
223     private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
224         int columnCount = resultSetMetaData.getColumnCount();
225         String tableName = dumperContext.getLogicTableName();
226         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, tableName, positionCreator.create(dumperContext, resultSet), columnCount);
227         List<String> insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
228         ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
229                 () -> new PipelineInvalidParameterException("Insert column names count not equals ResultSet column count"));
230         for (int i = 1; i <= columnCount; i++) {
231             String columnName = insertColumnNames.isEmpty() ? resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
232             ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName), () -> new PipelineInvalidParameterException(String.format("Column name is %s", columnName)));
233             result.addColumn(new NormalColumn(columnName, columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
234         }
235         result.setActualTableName(dumperContext.getActualTableName());
236         return result;
237     }
238     
239     private String buildDumpByPageSQL(final InventoryQueryParameter<?> queryParam) {
240         String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
241         PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
242         List<String> columnNames = dumperContext.getQueryColumnNames();
243         if (queryParam instanceof InventoryPointQueryParameter) {
244             return sqlBuilder.buildPointQuerySQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
245         }
246         QueryRange queryRange = ((InventoryRangeQueryParameter) queryParam).getValue();
247         boolean lowerInclusive = queryRange.isLowerInclusive();
248         if (null != queryRange.getLower() && null != queryRange.getUpper()) {
249             return sqlBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, true));
250         }
251         if (null != queryRange.getLower()) {
252             return sqlBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, false));
253         }
254         throw new PipelineInternalException("Primary key position is invalid.");
255     }
256     
257     private Object getFirstUniqueKeyValue(final List<Record> dataRecords, final int index) {
258         return ((DataRecord) dataRecords.get(index)).getUniqueKeyValue().iterator().next();
259     }
260     
261     @SuppressWarnings("MagicConstant")
262     private void dumpWithStreamingQuery(final Connection connection, final PipelineTableMetaData tableMetaData) throws SQLException {
263         int batchSize = dumperContext.getBatchSize();
264         DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
265         if (null != dumperContext.getTransactionIsolation()) {
266             connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
267         }
268         if (null == dumperContext.getQuerySQL()) {
269             fetchAllQuery(connection, tableMetaData, databaseType, batchSize);
270         } else {
271             designatedParametersQuery(connection, tableMetaData, databaseType, batchSize);
272         }
273     }
274     
275     private void fetchAllQuery(final Connection connection, final PipelineTableMetaData tableMetaData, final DatabaseType databaseType,
276                                final int batchSize) throws SQLException {
277         log.info("Start to fetch all inventory data with streaming query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
278         try (PreparedStatement statement = JDBCStreamQueryBuilder.build(databaseType, connection, buildFetchAllSQLWithStreamingQuery(), batchSize)) {
279             runningStatement.set(statement);
280             try (ResultSet resultSet = statement.executeQuery()) {
281                 consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
282             } finally {
283                 runningStatement.set(null);
284             }
285         }
286         log.info("End to fetch all inventory data with streaming query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
287     }
288     
289     private void designatedParametersQuery(final Connection connection, final PipelineTableMetaData tableMetaData, final DatabaseType databaseType, final int batchSize) throws SQLException {
290         log.info("Start to dump inventory data with designated parameters query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
291                 dumperContext.getActualTableName());
292         try (PreparedStatement statement = JDBCStreamQueryBuilder.build(databaseType, connection, dumperContext.getQuerySQL(), batchSize)) {
293             runningStatement.set(statement);
294             for (int i = 0; i < dumperContext.getQueryParams().size(); i++) {
295                 statement.setObject(i + 1, dumperContext.getQueryParams().get(i));
296             }
297             try (ResultSet resultSet = statement.executeQuery()) {
298                 consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
299             } finally {
300                 runningStatement.set(null);
301             }
302         }
303         log.info("End to dump inventory data with designated parameters query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
304                 dumperContext.getActualTableName());
305     }
306     
307     private void consumeResultSetToChannel(final PipelineTableMetaData tableMetaData, final ResultSet resultSet, final int batchSize) throws SQLException {
308         int rowCount = 0;
309         JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
310         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
311         List<Record> dataRecords = new LinkedList<>();
312         while (resultSet.next()) {
313             if (dataRecords.size() >= batchSize) {
314                 channel.push(dataRecords);
315                 dataRecords = new LinkedList<>();
316             }
317             dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
318             ++rowCount;
319             if (!isRunning()) {
320                 log.info("Broke because of inventory dump is not running.");
321                 break;
322             }
323             if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
324                 rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
325             }
326         }
327         dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
328         channel.push(dataRecords);
329         log.info("Inventory dump with streaming query done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(),
330                 dumperContext.getActualTableName());
331     }
332     
333     private String buildFetchAllSQLWithStreamingQuery() {
334         String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
335         List<String> columnNames = dumperContext.getQueryColumnNames();
336         if (dumperContext.hasUniqueKey()) {
337             return sqlBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames, dumperContext.getUniqueKeyColumns().get(0).getName());
338         }
339         return sqlBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames);
340     }
341     
342     @Override
343     protected void doStop() {
344         Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
345     }
346 }