View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
22  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
24  import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
25  import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
26  import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
27  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
28  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
29  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
30  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
31  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
33  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractRecordTableInventoryCalculator;
34  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
38  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
39  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
40  import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
41  import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
42  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
43  import org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
44  import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
45  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
46  import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
47  import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
48  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
49  import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
50  import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
51  import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
52  import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
53  
54  import java.sql.Connection;
55  import java.sql.PreparedStatement;
56  import java.sql.ResultSet;
57  import java.sql.ResultSetMetaData;
58  import java.sql.SQLException;
59  import java.sql.Statement;
60  import java.util.Collections;
61  import java.util.LinkedList;
62  import java.util.List;
63  import java.util.Optional;
64  import java.util.concurrent.atomic.AtomicReference;
65  
66  /**
67   * Inventory dumper.
68   */
69  @HighFrequencyInvocation
70  @Slf4j
71  public final class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper {
72      
73      private final InventoryDumperContext dumperContext;
74      
75      private final PipelineChannel channel;
76      
77      private final PipelineDataSource dataSource;
78      
79      private final InventoryDataRecordPositionCreator positionCreator;
80      
81      private final PipelineInventoryDumpSQLBuilder sqlBuilder;
82      
83      private final InventoryColumnValueReaderEngine columnValueReaderEngine;
84      
85      private final AtomicReference<Statement> runningStatement = new AtomicReference<>();
86      
87      public InventoryDumper(final InventoryDumperContext dumperContext, final PipelineChannel channel, final PipelineDataSource dataSource, final InventoryDataRecordPositionCreator positionCreator) {
88          this.dumperContext = dumperContext;
89          this.channel = channel;
90          this.dataSource = dataSource;
91          this.positionCreator = positionCreator;
92          DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
93          sqlBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
94          columnValueReaderEngine = new InventoryColumnValueReaderEngine(databaseType);
95      }
96      
97      @Override
98      protected void runBlocking() {
99          IngestPosition position = dumperContext.getCommonContext().getPosition();
100         if (position instanceof IngestFinishedPosition) {
101             log.info("Ignored because of already finished.");
102             return;
103         }
104         try {
105             if (dumperContext.hasUniqueKey()) {
106                 dumpByCalculator();
107             } else {
108                 dumpWithStreamingQuery();
109             }
110             // CHECKSTYLE:OFF
111         } catch (final SQLException | RuntimeException ex) {
112             // CHECKSTYLE:ON
113             log.error("Inventory dump failed on {}", dumperContext.getActualTableName(), ex);
114             throw new IngestException("Inventory dump failed on " + dumperContext.getActualTableName(), ex);
115         }
116     }
117     
118     private void dumpByCalculator() {
119         String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
120         QualifiedTable table = new QualifiedTable(schemaName, dumperContext.getActualTableName());
121         IngestPosition initialPosition = dumperContext.getCommonContext().getPosition();
122         log.info("Dump by calculator start, dataSource={}, table={}, initialPosition={}", dumperContext.getCommonContext().getDataSourceName(), table, initialPosition);
123         List<String> columnNames = dumperContext.getQueryColumnNames();
124         TableInventoryCalculateParameter calculateParam = new TableInventoryCalculateParameter(dataSource, table,
125                 columnNames, dumperContext.getUniqueKeyColumns(), QueryType.RANGE_QUERY, null);
126         Range<?> range = Range.closed(((UniqueKeyIngestPosition<?>) initialPosition).getLowerBound(), ((UniqueKeyIngestPosition<?>) initialPosition).getUpperBound());
127         calculateParam.setRange(range);
128         RecordTableInventoryDumpCalculator dumpCalculator = new RecordTableInventoryDumpCalculator(dumperContext.getBatchSize(), StreamingRangeType.SMALL);
129         long rowCount = 0L;
130         try {
131             String firstUniqueKey = calculateParam.getFirstUniqueKey().getName();
132             for (List<DataRecord> each : dumpCalculator.calculate(calculateParam)) {
133                 channel.push(Collections.unmodifiableList(each));
134                 IngestPosition position = UniqueKeyIngestPosition.newInstance(Range.closed(dumpCalculator.getFirstUniqueKeyValue(each.get(each.size() - 1), firstUniqueKey), range.getUpperBound()));
135                 dumperContext.getCommonContext().setPosition(position);
136                 rowCount += each.size();
137             }
138         } finally {
139             QuietlyCloser.close(calculateParam.getCalculationContext());
140         }
141         IngestPosition position = new IngestFinishedPosition();
142         channel.push(Collections.singletonList(new FinishedRecord(position)));
143         dumperContext.getCommonContext().setPosition(position);
144         log.info("Dump by calculator done, rowCount={}, dataSource={}, table={}, initialPosition={}", rowCount, dumperContext.getCommonContext().getDataSourceName(), table, initialPosition);
145     }
146     
147     private void dumpWithStreamingQuery() throws SQLException {
148         DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
149         try (Connection connection = dataSource.getConnection()) {
150             fetchAllNoUniqueKeyQuery(connection, databaseType, dumperContext.getBatchSize());
151         }
152     }
153     
154     private void fetchAllNoUniqueKeyQuery(final Connection connection, final DatabaseType databaseType, final int batchSize) throws SQLException {
155         log.info("Start to fetch all no unique key query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
156         try (PreparedStatement statement = JDBCStreamQueryBuilder.build(databaseType, connection, buildFetchAllNoUniqueKeySQL(), batchSize)) {
157             runningStatement.set(statement);
158             try (ResultSet resultSet = statement.executeQuery()) {
159                 consumeResultSetToChannel(resultSet, batchSize);
160             } finally {
161                 runningStatement.set(null);
162             }
163         }
164         log.info("End to fetch all no unique key query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
165     }
166     
167     private String buildFetchAllNoUniqueKeySQL() {
168         String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
169         List<String> columnNames = dumperContext.getQueryColumnNames();
170         return sqlBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames);
171     }
172     
173     private void consumeResultSetToChannel(final ResultSet resultSet, final int batchSize) throws SQLException {
174         long rowCount = 0;
175         JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
176         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
177         List<Record> dataRecords = new LinkedList<>();
178         while (resultSet.next()) {
179             if (dataRecords.size() >= batchSize) {
180                 channel.push(dataRecords);
181                 dataRecords = new LinkedList<>();
182             }
183             dataRecords.add(loadDataRecord(resultSet, resultSetMetaData));
184             ++rowCount;
185             if (!isRunning()) {
186                 log.info("Broke because of inventory dump is not running.");
187                 break;
188             }
189             if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
190                 rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
191             }
192         }
193         dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
194         channel.push(dataRecords);
195         log.info("Inventory dump with streaming query done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(),
196                 dumperContext.getActualTableName());
197     }
198     
199     private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData) throws SQLException {
200         int columnCount = resultSetMetaData.getColumnCount();
201         String tableName = dumperContext.getLogicTableName();
202         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, tableName, positionCreator.create(dumperContext, resultSet), columnCount);
203         List<String> insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
204         ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
205                 () -> new PipelineInvalidParameterException("Insert column names count not equals ResultSet column count"));
206         for (int i = 1; i <= columnCount; i++) {
207             String columnName = insertColumnNames.isEmpty() ? resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
208             Column column = getColumn(resultSet, resultSetMetaData, columnName, i, dumperContext.getTargetUniqueKeysNames().contains(new ShardingSphereIdentifier(columnName)));
209             result.addColumn(column);
210         }
211         result.setActualTableName(dumperContext.getActualTableName());
212         return result;
213     }
214     
215     private Column getColumn(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final String columnName, final int columnIndex, final boolean isUniqueKey) throws SQLException {
216         return new NormalColumn(columnName, columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex), true, isUniqueKey);
217     }
218     
219     @Override
220     protected void doStop() {
221         Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
222     }
223     
224     private class RecordTableInventoryDumpCalculator extends AbstractRecordTableInventoryCalculator<List<DataRecord>, DataRecord> {
225         
226         RecordTableInventoryDumpCalculator(final int chunkSize, final StreamingRangeType streamingRangeType) {
227             super(chunkSize, streamingRangeType);
228         }
229         
230         @Override
231         protected DataRecord readRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
232             return loadDataRecord(resultSet, resultSetMetaData);
233         }
234         
235         @Override
236         protected Object getFirstUniqueKeyValue(final DataRecord record, final String firstUniqueKey) {
237             return record.getColumn(firstUniqueKey).getValue();
238         }
239         
240         @Override
241         protected List<DataRecord> convertRecordsToResult(final List<DataRecord> records, final Object maxUniqueKeyValue) {
242             return records;
243         }
244     }
245 }