1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
19
20 import com.google.common.base.Strings;
21 import lombok.AccessLevel;
22 import lombok.Getter;
23 import lombok.extern.slf4j.Slf4j;
24 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
25 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
26 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
27 import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
28 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
29 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
30 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.ColumnValueReaderEngine;
31 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
33 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
34 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
38 import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
39 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
40 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
41 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
42 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
43 import org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
44 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
45 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
46 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
47 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
48 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
49 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
50 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
51
52 import javax.sql.DataSource;
53 import java.sql.Connection;
54 import java.sql.PreparedStatement;
55 import java.sql.ResultSet;
56 import java.sql.ResultSetMetaData;
57 import java.sql.SQLException;
58 import java.sql.Statement;
59 import java.util.Collection;
60 import java.util.Collections;
61 import java.util.LinkedList;
62 import java.util.List;
63 import java.util.Optional;
64 import java.util.concurrent.atomic.AtomicReference;
65
66
67
68
69 @HighFrequencyInvocation
70 @Slf4j
71 public final class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper {
72
73 @Getter(AccessLevel.PROTECTED)
74 private final InventoryDumperContext dumperContext;
75
76 private final PipelineChannel channel;
77
78 private final DataSource dataSource;
79
80 private final PipelineTableMetaDataLoader metaDataLoader;
81
82 private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
83
84 private final ColumnValueReaderEngine columnValueReaderEngine;
85
86 private final AtomicReference<Statement> runningStatement = new AtomicReference<>();
87
88 public InventoryDumper(final InventoryDumperContext dumperContext, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
89 this.dumperContext = dumperContext;
90 this.channel = channel;
91 this.dataSource = dataSource;
92 this.metaDataLoader = metaDataLoader;
93 DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
94 inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
95 columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
96 }
97
98 @Override
99 protected void runBlocking() {
100 IngestPosition position = dumperContext.getCommonContext().getPosition();
101 if (position instanceof IngestFinishedPosition) {
102 log.info("Ignored because of already finished.");
103 return;
104 }
105 PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
106 dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName());
107 try (Connection connection = dataSource.getConnection()) {
108 dump(tableMetaData, connection);
109 } catch (final SQLException ex) {
110 log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
111 throw new IngestException("Inventory dump failed on " + dumperContext.getActualTableName(), ex);
112 }
113 }
114
115 @SuppressWarnings("MagicConstant")
116 private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException {
117 int batchSize = dumperContext.getBatchSize();
118 DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
119 if (null != dumperContext.getTransactionIsolation()) {
120 connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
121 }
122 try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpSQL())) {
123 runningStatement.set(preparedStatement);
124 if (!(databaseType instanceof MySQLDatabaseType)) {
125 preparedStatement.setFetchSize(batchSize);
126 }
127 setParameters(preparedStatement);
128 try (ResultSet resultSet = preparedStatement.executeQuery()) {
129 int rowCount = 0;
130 JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm();
131 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
132 List<Record> dataRecords = new LinkedList<>();
133 while (resultSet.next()) {
134 if (dataRecords.size() >= batchSize) {
135 channel.push(dataRecords);
136 dataRecords = new LinkedList<>();
137 }
138 dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
139 ++rowCount;
140 if (!isRunning()) {
141 log.info("Broke because of inventory dump is not running.");
142 break;
143 }
144 if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
145 rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
146 }
147 }
148 dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
149 channel.push(dataRecords);
150 log.info("Inventory dump done, rowCount={}, dataSource={}, actualTable={}", rowCount, dumperContext.getCommonContext().getDataSourceName(), dumperContext.getActualTableName());
151 } finally {
152 runningStatement.set(null);
153 }
154 }
155 }
156
157 private String buildInventoryDumpSQL() {
158 if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
159 return dumperContext.getQuerySQL();
160 }
161 String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
162 if (!dumperContext.hasUniqueKey()) {
163 return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName());
164 }
165 PrimaryKeyIngestPosition<?> primaryKeyPosition = (PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition();
166 PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
167 Collection<String> columnNames = Collections.singleton("*");
168 if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
169 if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) {
170 return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
171 }
172 if (null != primaryKeyPosition.getBeginValue() && null == primaryKeyPosition.getEndValue()) {
173 return inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
174 }
175 }
176 return inventoryDumpSQLBuilder.buildIndivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
177 }
178
179 private void setParameters(final PreparedStatement preparedStatement) throws SQLException {
180 if (!dumperContext.hasUniqueKey()) {
181 return;
182 }
183 PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
184 PrimaryKeyIngestPosition<?> position = (PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition();
185 if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) {
186 preparedStatement.setObject(1, position.getBeginValue());
187 preparedStatement.setObject(2, position.getEndValue());
188 return;
189 }
190 if (PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
191 if (null != position.getBeginValue()) {
192 preparedStatement.setObject(1, position.getBeginValue());
193 }
194 if (null != position.getEndValue()) {
195 preparedStatement.setObject(2, position.getEndValue());
196 }
197 }
198 }
199
200 private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
201 int columnCount = resultSetMetaData.getColumnCount();
202 DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, dumperContext.getLogicTableName(), newPosition(resultSet), columnCount);
203 List<String> insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
204 ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
205 () -> new PipelineInvalidParameterException("Insert colum names count not equals ResultSet column count"));
206 for (int i = 1; i <= columnCount; i++) {
207 String columnName = insertColumnNames.isEmpty() ? resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
208 ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName), () -> new PipelineInvalidParameterException(String.format("Column name is %s", columnName)));
209 boolean isUniqueKey = tableMetaData.getColumnMetaData(columnName).isUniqueKey();
210 result.addColumn(new Column(columnName, columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, isUniqueKey));
211 }
212 return result;
213 }
214
215 private IngestPosition newPosition(final ResultSet resultSet) throws SQLException {
216 return dumperContext.hasUniqueKey()
217 ? PrimaryKeyIngestPositionFactory.newInstance(
218 resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition()).getEndValue())
219 : new IngestPlaceholderPosition();
220 }
221
222 @Override
223 protected void doStop() {
224 Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
225 }
226 }