1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.importer.sink.type;
19
20 import lombok.SneakyThrows;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
24 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
25 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
26 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
27 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
28 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30 import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
31 import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
33 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
34 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
35 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
36 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
37 import org.apache.shardingsphere.infra.util.json.JsonUtils;
38
39 import javax.sql.DataSource;
40 import java.sql.Connection;
41 import java.sql.PreparedStatement;
42 import java.sql.SQLException;
43 import java.util.Collection;
44 import java.util.Collections;
45 import java.util.List;
46 import java.util.Optional;
47 import java.util.concurrent.atomic.AtomicReference;
48 import java.util.stream.Collectors;
49
50
51
52
53 @HighFrequencyInvocation
54 @Slf4j
55 public final class PipelineDataSourceSink implements PipelineSink {
56
57 private final ImporterConfiguration importerConfig;
58
59 private final DataSource dataSource;
60
61 private final PipelineImportSQLBuilder importSQLBuilder;
62
63 private final DataRecordGroupEngine groupEngine;
64
65 private final AtomicReference<PreparedStatement> runningStatement;
66
67 public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
68 this.importerConfig = importerConfig;
69 dataSource = dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
70 importSQLBuilder = new PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
71 groupEngine = new DataRecordGroupEngine();
72 runningStatement = new AtomicReference<>();
73 }
74
75 @Override
76 public PipelineJobUpdateProgress write(final String ackId, final Collection<Record> records) {
77 List<DataRecord> dataRecords = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
78 if (dataRecords.isEmpty()) {
79 return new PipelineJobUpdateProgress(0);
80 }
81 if (dataRecords.iterator().next().getUniqueKeyValue().isEmpty()) {
82 sequentialWrite(dataRecords);
83 return new PipelineJobUpdateProgress(dataRecords.size());
84 }
85 for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
86 batchWrite(each.getDeleteDataRecords());
87 batchWrite(each.getInsertDataRecords());
88 batchWrite(each.getUpdateDataRecords());
89 }
90 return new PipelineJobUpdateProgress((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count());
91 }
92
93 private void sequentialWrite(final List<DataRecord> buffer) {
94
95 try {
96 for (DataRecord each : buffer) {
97 doWrite(Collections.singletonList(each), true);
98 }
99 } catch (final SQLException ex) {
100 throw new PipelineImporterJobWriteException(ex);
101 }
102 }
103
104 @SuppressWarnings("BusyWait")
105 @SneakyThrows(InterruptedException.class)
106 private void batchWrite(final Collection<DataRecord> records) {
107 if (records.isEmpty()) {
108 return;
109 }
110 for (int i = 0; !Thread.interrupted() && i <= importerConfig.getRetryTimes(); i++) {
111 try {
112 doWrite(records, 0 == i);
113 break;
114 } catch (final SQLException ex) {
115 log.error("Flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
116 if (i == importerConfig.getRetryTimes()) {
117 throw new PipelineImporterJobWriteException(ex);
118 }
119 Thread.sleep(Math.min(5L * 60L * 1000L, 1000L << i));
120 }
121 }
122 }
123
124 private void doWrite(final Collection<DataRecord> records, final boolean firstTimeRun) throws SQLException {
125 switch (records.iterator().next().getType()) {
126 case INSERT:
127 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.INSERT, 1));
128 executeBatchInsert(records, firstTimeRun);
129 break;
130 case UPDATE:
131 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.UPDATE, 1));
132 executeUpdate(records, firstTimeRun);
133 break;
134 case DELETE:
135 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.DELETE, 1));
136 executeBatchDelete(records);
137 break;
138 default:
139 break;
140 }
141 }
142
143 private void executeBatchInsert(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
144 DataRecord dataRecord = dataRecords.iterator().next();
145 String sql = importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord);
146 try (
147 Connection connection = dataSource.getConnection();
148 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
149 runningStatement.set(preparedStatement);
150 if (firstTimeRun) {
151 executeBatchInsertFirstTime(connection, preparedStatement, dataRecords);
152 } else {
153 retryBatchInsert(preparedStatement, dataRecords);
154 }
155 } finally {
156 runningStatement.set(null);
157 }
158 }
159
160 private void executeBatchInsertFirstTime(final Connection connection, final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
161 boolean transactionEnabled = dataRecords.size() > 1;
162 if (transactionEnabled) {
163 connection.setAutoCommit(false);
164 }
165 preparedStatement.setQueryTimeout(30);
166 for (DataRecord each : dataRecords) {
167 for (int i = 0; i < each.getColumnCount(); i++) {
168 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
169 }
170 preparedStatement.addBatch();
171 }
172 preparedStatement.executeBatch();
173 if (transactionEnabled) {
174 connection.commit();
175 }
176 }
177
178 private void retryBatchInsert(final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
179 for (DataRecord each : dataRecords) {
180 for (int i = 0; i < each.getColumnCount(); i++) {
181 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
182 }
183 preparedStatement.executeUpdate();
184 }
185 }
186
187 private void executeUpdate(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
188 try (Connection connection = dataSource.getConnection()) {
189 boolean transactionEnabled = dataRecords.size() > 1 && firstTimeRun;
190 if (transactionEnabled) {
191 connection.setAutoCommit(false);
192 }
193 for (DataRecord each : dataRecords) {
194 executeUpdate(connection, each);
195 }
196 if (transactionEnabled) {
197 connection.commit();
198 }
199 }
200 }
201
202 private void executeUpdate(final Connection connection, final DataRecord dataRecord) throws SQLException {
203 Collection<String> shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName());
204 List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
205 List<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
206 String sql = importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns);
207 try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
208 runningStatement.set(preparedStatement);
209 for (int i = 0; i < setColumns.size(); i++) {
210 preparedStatement.setObject(i + 1, setColumns.get(i).getValue());
211 }
212 for (int i = 0; i < conditionColumns.size(); i++) {
213 Column keyColumn = conditionColumns.get(i);
214
215 if (shardingColumns.contains(keyColumn.getName()) && null == keyColumn.getOldValue()) {
216 preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getValue());
217 continue;
218 }
219 preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getOldValue());
220 }
221
222 int updateCount = preparedStatement.executeUpdate();
223 if (1 != updateCount) {
224 log.warn("Update failed, update count: {}, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}",
225 updateCount, sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns));
226 }
227 } catch (final SQLException ex) {
228 log.error("execute update failed, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}, error message: {}, data record: {}",
229 sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns), ex.getMessage(), dataRecord);
230 throw ex;
231 } finally {
232 runningStatement.set(null);
233 }
234 }
235
236 private void executeBatchDelete(final Collection<DataRecord> dataRecords) throws SQLException {
237 try (Connection connection = dataSource.getConnection()) {
238 boolean transactionEnabled = dataRecords.size() > 1;
239 if (transactionEnabled) {
240 connection.setAutoCommit(false);
241 }
242 executeBatchDelete(connection, dataRecords, importerConfig.getShardingColumns(dataRecords.iterator().next().getTableName()));
243 if (transactionEnabled) {
244 connection.commit();
245 }
246 }
247 }
248
249 private void executeBatchDelete(final Connection connection, final Collection<DataRecord> dataRecords, final Collection<String> shardingColumns) throws SQLException {
250 DataRecord dataRecord = dataRecords.iterator().next();
251 String deleteSQL = importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord,
252 RecordUtils.extractConditionColumns(dataRecord, shardingColumns));
253 try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
254 runningStatement.set(preparedStatement);
255 preparedStatement.setQueryTimeout(30);
256 for (DataRecord each : dataRecords) {
257 List<Column> conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
258 for (int i = 0; i < conditionColumns.size(); i++) {
259 Object oldValue = conditionColumns.get(i).getOldValue();
260 if (null == oldValue) {
261 log.warn("Record old value is null, record: {}", each);
262 }
263 preparedStatement.setObject(i + 1, oldValue);
264 }
265 preparedStatement.addBatch();
266 }
267 preparedStatement.executeBatch();
268 } finally {
269 runningStatement.set(null);
270 }
271 }
272
273 @Override
274 public void close() {
275 Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
276 }
277 }