1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.commons.lang3.StringUtils;
22 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
23 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
24 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
25 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
26 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
27 import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
28 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
29 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
30 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
31 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point.InventoryPointQueryParameter;
33 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.InventoryRangeQueryParameter;
34 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
38 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
39 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
40 import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
41 import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
42 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
43 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
44 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
45 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
46 import org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
47 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
48 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.BuildDivisibleSQLParameter;
49 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
50 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
51 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
52 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
53 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
54
55 import javax.sql.DataSource;
56 import java.sql.Connection;
57 import java.sql.PreparedStatement;
58 import java.sql.ResultSet;
59 import java.sql.ResultSetMetaData;
60 import java.sql.SQLException;
61 import java.sql.Statement;
62 import java.util.Collections;
63 import java.util.LinkedList;
64 import java.util.List;
65 import java.util.Objects;
66 import java.util.Optional;
67 import java.util.concurrent.atomic.AtomicLong;
68 import java.util.concurrent.atomic.AtomicReference;
69
70
71
72
73 @HighFrequencyInvocation
74 @Slf4j
75 public final class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper {
76
77 private final InventoryDumperContext dumperContext;
78
79 private final PipelineChannel channel;
80
81 private final DataSource dataSource;
82
83 private final PipelineTableMetaDataLoader metaDataLoader;
84
85 private final InventoryDataRecordPositionCreator positionCreator;
86
87 private final PipelineInventoryDumpSQLBuilder sqlBuilder;
88
89 private final InventoryColumnValueReaderEngine columnValueReaderEngine;
90
91 private final AtomicReference<Statement> runningStatement = new AtomicReference<>();
92
93 public InventoryDumper(final InventoryDumperContext dumperContext, final PipelineChannel channel, final DataSource dataSource,
94 final PipelineTableMetaDataLoader metaDataLoader, final InventoryDataRecordPositionCreator positionCreator) {
95 this.dumperContext = dumperContext;
96 this.channel = channel;
97 this.dataSource = dataSource;
98 this.metaDataLoader = metaDataLoader;
99 this.positionCreator = positionCreator;
100 DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
101 sqlBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
102 columnValueReaderEngine = new InventoryColumnValueReaderEngine(databaseType);
103 }
104
105 @Override
106 protected void runBlocking() {
107 IngestPosition position = dumperContext.getCommonContext().getPosition();
108 if (position instanceof IngestFinishedPosition) {
109 log.info("Ignored because of already finished.");
110 return;
111 }
112 PipelineTableMetaData tableMetaData = getPipelineTableMetaData();
113 try (Connection connection = dataSource.getConnection()) {
114 if (StringUtils.isNotBlank(dumperContext.getQuerySQL()) || !dumperContext.hasUniqueKey() || isPrimaryKeyWithoutRange(position)) {
115 dumpWithStreamingQuery(connection, tableMetaData);
116 } else {
117 dumpByPage(connection, tableMetaData);
118 }
119
120 } catch (final SQLException | RuntimeException ex) {
121
122 log.error("Inventory dump failed on {}", dumperContext.getActualTableName(), ex);
123 throw new IngestException("Inventory dump failed on " + dumperContext.getActualTableName(), ex);
124 }
125 }
126
127 private PipelineTableMetaData getPipelineTableMetaData() {
128 String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
129 String tableName = dumperContext.getActualTableName();
130 return metaDataLoader.getTableMetaData(schemaName, tableName);
131 }
132
133 private boolean isPrimaryKeyWithoutRange(final IngestPosition position) {
134 return position instanceof PrimaryKeyIngestPosition && null == ((PrimaryKeyIngestPosition<?>) position).getBeginValue() && null == ((PrimaryKeyIngestPosition<?>) position).getEndValue();
135 }
136
137 @SuppressWarnings("MagicConstant")
138 private void dumpByPage(final Connection connection, final PipelineTableMetaData tableMetaData) throws SQLException {
139 log.info("Start to dump inventory data by page, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
140 if (null != dumperContext.getTransactionIsolation()) {
141 connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
142 }
143 boolean firstQuery = true;
144 AtomicLong rowCount = new AtomicLong();
145 IngestPosition position = dumperContext.getCommonContext().getPosition();
146 do {
147 QueryRange queryRange = new QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery && dumperContext.isFirstDump(),
148 ((PrimaryKeyIngestPosition<?>) position).getEndValue());
149 InventoryQueryParameter<?> queryParam = new InventoryRangeQueryParameter(queryRange);
150 List<Record> dataRecords = dumpByPage(connection, queryParam, rowCount, tableMetaData);
151 if (dataRecords.size() > 1 && Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
152 queryParam = new InventoryPointQueryParameter(getFirstUniqueKeyValue(dataRecords, 0));
153 dataRecords = dumpByPage(connection, queryParam, rowCount, tableMetaData);
154 }
155 firstQuery = false;
156 if (dataRecords.isEmpty()) {
157 position = new IngestFinishedPosition();
158 dataRecords.add(new FinishedRecord(position));
159 log.info("Inventory dump done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
160 } else {
161 position = PrimaryKeyIngestPositionFactory.newInstance(getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1), queryRange.getUpper());
162 }
163 channel.push(dataRecords);
164 dumperContext.getCommonContext().setPosition(position);
165 } while (!(position instanceof IngestFinishedPosition));
166 log.info("End to dump inventory data by page, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
167 }
168
169 private List<Record> dumpByPage(final Connection connection,
170 final InventoryQueryParameter<?> queryParam, final AtomicLong rowCount, final PipelineTableMetaData tableMetaData) throws SQLException {
171 DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
172 int batchSize = dumperContext.getBatchSize();
173 try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildDumpByPageSQL(queryParam), batchSize)) {
174 runningStatement.set(preparedStatement);
175 setParameters(preparedStatement, queryParam);
176 try (ResultSet resultSet = preparedStatement.executeQuery()) {
177 JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
178 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
179 List<Record> result = new LinkedList<>();
180 while (resultSet.next()) {
181 if (result.size() >= batchSize) {
182 if (!dumperContext.hasUniqueKey()) {
183 channel.push(result);
184 }
185 result = new LinkedList<>();
186 }
187 result.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
188 rowCount.incrementAndGet();
189 if (!isRunning()) {
190 log.info("Broke because of inventory dump is not running.");
191 break;
192 }
193 if (null != rateLimitAlgorithm && 0 == rowCount.get() % batchSize) {
194 rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
195 }
196 }
197 return result;
198 } finally {
199 runningStatement.set(null);
200 }
201 }
202 }
203
204 private void setParameters(final PreparedStatement preparedStatement, final InventoryQueryParameter<?> queryParam) throws SQLException {
205 if (queryParam instanceof InventoryRangeQueryParameter) {
206 int parameterIndex = 1;
207 Object lower = ((InventoryRangeQueryParameter) queryParam).getValue().getLower();
208 if (null != lower) {
209 preparedStatement.setObject(parameterIndex++, lower);
210 }
211 Object upper = ((InventoryRangeQueryParameter) queryParam).getValue().getUpper();
212 if (null != upper) {
213 preparedStatement.setObject(parameterIndex++, upper);
214 }
215 preparedStatement.setInt(parameterIndex, dumperContext.getBatchSize());
216 } else if (queryParam instanceof InventoryPointQueryParameter) {
217 preparedStatement.setObject(1, queryParam.getValue());
218 } else {
219 throw new UnsupportedOperationException("Query type: " + queryParam.getValue());
220 }
221 }
222
223 private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
224 int columnCount = resultSetMetaData.getColumnCount();
225 String tableName = dumperContext.getLogicTableName();
226 DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, tableName, positionCreator.create(dumperContext, resultSet), columnCount);
227 List<String> insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
228 ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
229 () -> new PipelineInvalidParameterException("Insert column names count not equals ResultSet column count"));
230 for (int i = 1; i <= columnCount; i++) {
231 String columnName = insertColumnNames.isEmpty() ? resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
232 ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName), () -> new PipelineInvalidParameterException(String.format("Column name is %s", columnName)));
233 result.addColumn(new NormalColumn(columnName, columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
234 }
235 result.setActualTableName(dumperContext.getActualTableName());
236 return result;
237 }
238
239 private String buildDumpByPageSQL(final InventoryQueryParameter<?> queryParam) {
240 String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
241 PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
242 List<String> columnNames = dumperContext.getQueryColumnNames();
243 if (queryParam instanceof InventoryPointQueryParameter) {
244 return sqlBuilder.buildPointQuerySQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
245 }
246 QueryRange queryRange = ((InventoryRangeQueryParameter) queryParam).getValue();
247 boolean lowerInclusive = queryRange.isLowerInclusive();
248 if (null != queryRange.getLower() && null != queryRange.getUpper()) {
249 return sqlBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, true));
250 }
251 if (null != queryRange.getLower()) {
252 return sqlBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, false));
253 }
254 throw new PipelineInternalException("Primary key position is invalid.");
255 }
256
257 private Object getFirstUniqueKeyValue(final List<Record> dataRecords, final int index) {
258 return ((DataRecord) dataRecords.get(index)).getUniqueKeyValue().iterator().next();
259 }
260
261 @SuppressWarnings("MagicConstant")
262 private void dumpWithStreamingQuery(final Connection connection, final PipelineTableMetaData tableMetaData) throws SQLException {
263 int batchSize = dumperContext.getBatchSize();
264 DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
265 if (null != dumperContext.getTransactionIsolation()) {
266 connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
267 }
268 if (null == dumperContext.getQuerySQL()) {
269 fetchAllQuery(connection, tableMetaData, databaseType, batchSize);
270 } else {
271 designatedParametersQuery(connection, tableMetaData, databaseType, batchSize);
272 }
273 }
274
275 private void fetchAllQuery(final Connection connection, final PipelineTableMetaData tableMetaData, final DatabaseType databaseType,
276 final int batchSize) throws SQLException {
277 log.info("Start to fetch all inventory data with streaming query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
278 try (PreparedStatement statement = JDBCStreamQueryBuilder.build(databaseType, connection, buildFetchAllSQLWithStreamingQuery(), batchSize)) {
279 runningStatement.set(statement);
280 try (ResultSet resultSet = statement.executeQuery()) {
281 consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
282 } finally {
283 runningStatement.set(null);
284 }
285 }
286 log.info("End to fetch all inventory data with streaming query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
287 }
288
289 private void designatedParametersQuery(final Connection connection, final PipelineTableMetaData tableMetaData, final DatabaseType databaseType, final int batchSize) throws SQLException {
290 log.info("Start to dump inventory data with designated parameters query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
291 dumperContext.getActualTableName());
292 try (PreparedStatement statement = JDBCStreamQueryBuilder.build(databaseType, connection, dumperContext.getQuerySQL(), batchSize)) {
293 runningStatement.set(statement);
294 for (int i = 0; i < dumperContext.getQueryParams().size(); i++) {
295 statement.setObject(i + 1, dumperContext.getQueryParams().get(i));
296 }
297 try (ResultSet resultSet = statement.executeQuery()) {
298 consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
299 } finally {
300 runningStatement.set(null);
301 }
302 }
303 log.info("End to dump inventory data with designated parameters query, dataSource={}, actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
304 dumperContext.getActualTableName());
305 }
306
307 private void consumeResultSetToChannel(final PipelineTableMetaData tableMetaData, final ResultSet resultSet, final int batchSize) throws SQLException {
308 int rowCount = 0;
309 JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
310 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
311 List<Record> dataRecords = new LinkedList<>();
312 while (resultSet.next()) {
313 if (dataRecords.size() >= batchSize) {
314 channel.push(dataRecords);
315 dataRecords = new LinkedList<>();
316 }
317 dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
318 ++rowCount;
319 if (!isRunning()) {
320 log.info("Broke because of inventory dump is not running.");
321 break;
322 }
323 if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
324 rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
325 }
326 }
327 dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
328 channel.push(dataRecords);
329 log.info("Inventory dump with streaming query done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(),
330 dumperContext.getActualTableName());
331 }
332
333 private String buildFetchAllSQLWithStreamingQuery() {
334 String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
335 List<String> columnNames = dumperContext.getQueryColumnNames();
336 if (dumperContext.hasUniqueKey()) {
337 return sqlBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames, dumperContext.getUniqueKeyColumns().get(0).getName());
338 }
339 return sqlBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames);
340 }
341
342 @Override
343 protected void doStop() {
344 Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
345 }
346 }