1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.importer.sink.type;
19
20 import lombok.SneakyThrows;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
24 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
25 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
26 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
27 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
28 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30 import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
31 import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
33 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
34 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
35 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
36 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
37 import org.apache.shardingsphere.infra.util.json.JsonUtils;
38
39 import javax.sql.DataSource;
40 import java.sql.Connection;
41 import java.sql.PreparedStatement;
42 import java.sql.SQLException;
43 import java.util.Collection;
44 import java.util.List;
45 import java.util.Optional;
46 import java.util.Set;
47 import java.util.concurrent.atomic.AtomicReference;
48 import java.util.stream.Collectors;
49
50
51
52
53 @HighFrequencyInvocation
54 @Slf4j
55 public final class PipelineDataSourceSink implements PipelineSink {
56
57 private final ImporterConfiguration importerConfig;
58
59 private final DataSource dataSource;
60
61 private final PipelineImportSQLBuilder importSQLBuilder;
62
63 private final DataRecordGroupEngine groupEngine;
64
65 private final AtomicReference<PreparedStatement> runningStatement;
66
67 public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
68 this.importerConfig = importerConfig;
69 dataSource = dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
70 importSQLBuilder = new PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
71 groupEngine = new DataRecordGroupEngine();
72 runningStatement = new AtomicReference<>();
73 }
74
75 @Override
76 public PipelineJobProgressUpdatedParameter write(final String ackId, final Collection<Record> records) {
77 List<DataRecord> dataRecords = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
78 if (dataRecords.isEmpty()) {
79 return new PipelineJobProgressUpdatedParameter(0);
80 }
81 for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
82 batchWrite(each.getDeleteDataRecords());
83 batchWrite(each.getInsertDataRecords());
84 batchWrite(each.getUpdateDataRecords());
85 }
86 return new PipelineJobProgressUpdatedParameter((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count());
87 }
88
89 @SuppressWarnings("BusyWait")
90 @SneakyThrows(InterruptedException.class)
91 private void batchWrite(final Collection<DataRecord> records) {
92 if (records.isEmpty()) {
93 return;
94 }
95 for (int i = 0; !Thread.interrupted() && i <= importerConfig.getRetryTimes(); i++) {
96 try {
97 doWrite(records, 0 == i);
98 break;
99 } catch (final SQLException ex) {
100 log.error("Flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
101 if (i == importerConfig.getRetryTimes()) {
102 throw new PipelineImporterJobWriteException(ex);
103 }
104 Thread.sleep(Math.min(5 * 60 * 1000L, 1000L << i));
105 }
106 }
107 }
108
109 private void doWrite(final Collection<DataRecord> records, final boolean firstTimeRun) throws SQLException {
110 switch (records.iterator().next().getType()) {
111 case INSERT:
112 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.INSERT, 1));
113 executeBatchInsert(records, firstTimeRun);
114 break;
115 case UPDATE:
116 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.UPDATE, 1));
117 executeUpdate(records, firstTimeRun);
118 break;
119 case DELETE:
120 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.DELETE, 1));
121 executeBatchDelete(records);
122 break;
123 default:
124 break;
125 }
126 }
127
128 private void executeBatchInsert(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
129 DataRecord dataRecord = dataRecords.iterator().next();
130 String sql = importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord);
131 try (
132 Connection connection = dataSource.getConnection();
133 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
134 runningStatement.set(preparedStatement);
135 if (firstTimeRun) {
136 executeBatchInsertFirstTime(connection, preparedStatement, dataRecords);
137 } else {
138 retryBatchInsert(preparedStatement, dataRecords);
139 }
140 } finally {
141 runningStatement.set(null);
142 }
143 }
144
145 private void executeBatchInsertFirstTime(final Connection connection, final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
146 boolean transactionEnabled = dataRecords.size() > 1;
147 if (transactionEnabled) {
148 connection.setAutoCommit(false);
149 }
150 preparedStatement.setQueryTimeout(30);
151 for (DataRecord each : dataRecords) {
152 for (int i = 0; i < each.getColumnCount(); i++) {
153 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
154 }
155 preparedStatement.addBatch();
156 }
157 preparedStatement.executeBatch();
158 if (transactionEnabled) {
159 connection.commit();
160 }
161 }
162
163 private void retryBatchInsert(final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
164 for (DataRecord each : dataRecords) {
165 for (int i = 0; i < each.getColumnCount(); i++) {
166 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
167 }
168 preparedStatement.executeUpdate();
169 }
170 }
171
172 private void executeUpdate(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
173 try (Connection connection = dataSource.getConnection()) {
174 boolean transactionEnabled = dataRecords.size() > 1 && firstTimeRun;
175 if (transactionEnabled) {
176 connection.setAutoCommit(false);
177 }
178 for (DataRecord each : dataRecords) {
179 executeUpdate(connection, each);
180 }
181 if (transactionEnabled) {
182 connection.commit();
183 }
184 }
185 }
186
187 private void executeUpdate(final Connection connection, final DataRecord dataRecord) throws SQLException {
188 Set<String> shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName());
189 List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
190 List<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
191 String sql = importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns);
192 try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
193 runningStatement.set(preparedStatement);
194 for (int i = 0; i < setColumns.size(); i++) {
195 preparedStatement.setObject(i + 1, setColumns.get(i).getValue());
196 }
197 for (int i = 0; i < conditionColumns.size(); i++) {
198 Column keyColumn = conditionColumns.get(i);
199
200 if (shardingColumns.contains(keyColumn.getName()) && null == keyColumn.getOldValue()) {
201 preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getValue());
202 continue;
203 }
204 preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getOldValue());
205 }
206
207 int updateCount = preparedStatement.executeUpdate();
208 if (1 != updateCount) {
209 log.warn("execute update failed, update count: {}, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}",
210 updateCount, sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns));
211 }
212 } catch (final SQLException ex) {
213 log.error("execute update failed, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}, error message: {}, data record: {}",
214 sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns), ex.getMessage(), dataRecord);
215 throw ex;
216 } finally {
217 runningStatement.set(null);
218 }
219 }
220
221 private void executeBatchDelete(final Collection<DataRecord> dataRecords) throws SQLException {
222 try (Connection connection = dataSource.getConnection()) {
223 boolean transactionEnabled = dataRecords.size() > 1;
224 if (transactionEnabled) {
225 connection.setAutoCommit(false);
226 }
227 executeBatchDelete(connection, dataRecords, importerConfig.getShardingColumns(dataRecords.iterator().next().getTableName()));
228 if (transactionEnabled) {
229 connection.commit();
230 }
231 }
232 }
233
234 private void executeBatchDelete(final Connection connection, final Collection<DataRecord> dataRecords, final Set<String> shardingColumns) throws SQLException {
235 DataRecord dataRecord = dataRecords.iterator().next();
236 String deleteSQL = importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord,
237 RecordUtils.extractConditionColumns(dataRecord, shardingColumns));
238 try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
239 runningStatement.set(preparedStatement);
240 preparedStatement.setQueryTimeout(30);
241 for (DataRecord each : dataRecords) {
242 List<Column> conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
243 for (int i = 0; i < conditionColumns.size(); i++) {
244 Object oldValue = conditionColumns.get(i).getOldValue();
245 if (null == oldValue) {
246 log.warn("Record old value is null, record: {}", each);
247 }
248 preparedStatement.setObject(i + 1, oldValue);
249 }
250 preparedStatement.addBatch();
251 }
252 preparedStatement.executeBatch();
253 } finally {
254 runningStatement.set(null);
255 }
256 }
257
258 @Override
259 public void close() {
260 Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
261 }
262 }