1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
22 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
24 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
25 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
26 import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
27 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
28 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
29 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
30 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
31 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
33 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractRecordTableInventoryCalculator;
34 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
38 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
39 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
40 import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
41 import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
42 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
43 import org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
44 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
45 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
46 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
47 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
48 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
49 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
50 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
51 import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
52 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
53
54 import java.sql.Connection;
55 import java.sql.PreparedStatement;
56 import java.sql.ResultSet;
57 import java.sql.ResultSetMetaData;
58 import java.sql.SQLException;
59 import java.sql.Statement;
60 import java.util.Collections;
61 import java.util.LinkedList;
62 import java.util.List;
63 import java.util.Optional;
64 import java.util.concurrent.atomic.AtomicReference;
65
66
67
68
69 @HighFrequencyInvocation
70 @Slf4j
71 public final class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper {
72
73 private final InventoryDumperContext dumperContext;
74
75 private final PipelineChannel channel;
76
77 private final PipelineDataSource dataSource;
78
79 private final InventoryDataRecordPositionCreator positionCreator;
80
81 private final PipelineInventoryDumpSQLBuilder sqlBuilder;
82
83 private final InventoryColumnValueReaderEngine columnValueReaderEngine;
84
85 private final AtomicReference<Statement> runningStatement = new AtomicReference<>();
86
87 public InventoryDumper(final InventoryDumperContext dumperContext, final PipelineChannel channel, final PipelineDataSource dataSource, final InventoryDataRecordPositionCreator positionCreator) {
88 this.dumperContext = dumperContext;
89 this.channel = channel;
90 this.dataSource = dataSource;
91 this.positionCreator = positionCreator;
92 DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
93 sqlBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
94 columnValueReaderEngine = new InventoryColumnValueReaderEngine(databaseType);
95 }
96
97 @Override
98 protected void runBlocking() {
99 IngestPosition position = dumperContext.getCommonContext().getPosition();
100 if (position instanceof IngestFinishedPosition) {
101 log.info("Ignored because of already finished.");
102 return;
103 }
104 try {
105 if (dumperContext.hasUniqueKey()) {
106 dumpByCalculator();
107 } else {
108 dumpWithStreamingQuery();
109 }
110
111 } catch (final SQLException | RuntimeException ex) {
112
113 log.error("Inventory dump failed on {}", dumperContext.getActualTableName(), ex);
114 throw new IngestException("Inventory dump failed on " + dumperContext.getActualTableName(), ex);
115 }
116 }
117
118 private void dumpByCalculator() {
119 String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
120 QualifiedTable table = new QualifiedTable(schemaName, dumperContext.getActualTableName());
121 IngestPosition initialPosition = dumperContext.getCommonContext().getPosition();
122 log.info("Dump by calculator start, dataSource={}, table={}, initialPosition={}", dumperContext.getCommonContext().getDataSourceName(), table, initialPosition);
123 List<String> columnNames = dumperContext.getQueryColumnNames();
124 TableInventoryCalculateParameter calculateParam = new TableInventoryCalculateParameter(dataSource, table,
125 columnNames, dumperContext.getUniqueKeyColumns(), QueryType.RANGE_QUERY, null);
126 Range<?> range = Range.closed(((UniqueKeyIngestPosition<?>) initialPosition).getLowerBound(), ((UniqueKeyIngestPosition<?>) initialPosition).getUpperBound());
127 calculateParam.setRange(range);
128 RecordTableInventoryDumpCalculator dumpCalculator = new RecordTableInventoryDumpCalculator(dumperContext.getBatchSize(), StreamingRangeType.SMALL);
129 long rowCount = 0L;
130 try {
131 String firstUniqueKey = calculateParam.getFirstUniqueKey().getName();
132 for (List<DataRecord> each : dumpCalculator.calculate(calculateParam)) {
133 channel.push(Collections.unmodifiableList(each));
134 IngestPosition position = UniqueKeyIngestPosition.newInstance(Range.closed(dumpCalculator.getFirstUniqueKeyValue(each.get(each.size() - 1), firstUniqueKey), range.getUpperBound()));
135 dumperContext.getCommonContext().setPosition(position);
136 rowCount += each.size();
137 }
138 } finally {
139 QuietlyCloser.close(calculateParam.getCalculationContext());
140 }
141 IngestPosition position = new IngestFinishedPosition();
142 channel.push(Collections.singletonList(new FinishedRecord(position)));
143 dumperContext.getCommonContext().setPosition(position);
144 log.info("Dump by calculator done, rowCount={}, dataSource={}, table={}, initialPosition={}", rowCount, dumperContext.getCommonContext().getDataSourceName(), table, initialPosition);
145 }
146
147 private void dumpWithStreamingQuery() throws SQLException {
148 DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
149 try (Connection connection = dataSource.getConnection()) {
150 fetchAllNoUniqueKeyQuery(connection, databaseType, dumperContext.getBatchSize());
151 }
152 }
153
154 private void fetchAllNoUniqueKeyQuery(final Connection connection, final DatabaseType databaseType, final int batchSize) throws SQLException {
155 log.info("Start to fetch all no unique key query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
156 try (PreparedStatement statement = JDBCStreamQueryBuilder.build(databaseType, connection, buildFetchAllNoUniqueKeySQL(), batchSize)) {
157 runningStatement.set(statement);
158 try (ResultSet resultSet = statement.executeQuery()) {
159 consumeResultSetToChannel(resultSet, batchSize);
160 } finally {
161 runningStatement.set(null);
162 }
163 }
164 log.info("End to fetch all no unique key query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
165 }
166
167 private String buildFetchAllNoUniqueKeySQL() {
168 String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
169 List<String> columnNames = dumperContext.getQueryColumnNames();
170 return sqlBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames);
171 }
172
173 private void consumeResultSetToChannel(final ResultSet resultSet, final int batchSize) throws SQLException {
174 long rowCount = 0;
175 JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
176 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
177 List<Record> dataRecords = new LinkedList<>();
178 while (resultSet.next()) {
179 if (dataRecords.size() >= batchSize) {
180 channel.push(dataRecords);
181 dataRecords = new LinkedList<>();
182 }
183 dataRecords.add(loadDataRecord(resultSet, resultSetMetaData));
184 ++rowCount;
185 if (!isRunning()) {
186 log.info("Broke because of inventory dump is not running.");
187 break;
188 }
189 if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
190 rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
191 }
192 }
193 dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
194 channel.push(dataRecords);
195 log.info("Inventory dump with streaming query done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(),
196 dumperContext.getActualTableName());
197 }
198
199 private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData) throws SQLException {
200 int columnCount = resultSetMetaData.getColumnCount();
201 String tableName = dumperContext.getLogicTableName();
202 DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, tableName, positionCreator.create(dumperContext, resultSet), columnCount);
203 List<String> insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
204 ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
205 () -> new PipelineInvalidParameterException("Insert column names count not equals ResultSet column count"));
206 for (int i = 1; i <= columnCount; i++) {
207 String columnName = insertColumnNames.isEmpty() ? resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
208 Column column = getColumn(resultSet, resultSetMetaData, columnName, i, dumperContext.getTargetUniqueKeysNames().contains(new ShardingSphereIdentifier(columnName)));
209 result.addColumn(column);
210 }
211 result.setActualTableName(dumperContext.getActualTableName());
212 return result;
213 }
214
215 private Column getColumn(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final String columnName, final int columnIndex, final boolean isUniqueKey) throws SQLException {
216 return new NormalColumn(columnName, columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex), true, isUniqueKey);
217 }
218
219 @Override
220 protected void doStop() {
221 Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
222 }
223
224 private class RecordTableInventoryDumpCalculator extends AbstractRecordTableInventoryCalculator<List<DataRecord>, DataRecord> {
225
226 RecordTableInventoryDumpCalculator(final int chunkSize, final StreamingRangeType streamingRangeType) {
227 super(chunkSize, streamingRangeType);
228 }
229
230 @Override
231 protected DataRecord readRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
232 return loadDataRecord(resultSet, resultSetMetaData);
233 }
234
235 @Override
236 protected Object getFirstUniqueKeyValue(final DataRecord record, final String firstUniqueKey) {
237 return record.getColumn(firstUniqueKey).getValue();
238 }
239
240 @Override
241 protected List<DataRecord> convertRecordsToResult(final List<DataRecord> records, final Object maxUniqueKeyValue) {
242 return records;
243 }
244 }
245 }