1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.importer.sink.type;
19
20 import lombok.SneakyThrows;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
24 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
25 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
26 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
27 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
28 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30 import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
31 import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
33 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
34 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
35 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
36 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
37 import org.apache.shardingsphere.infra.util.json.JsonUtils;
38
39 import javax.sql.DataSource;
40 import java.sql.Connection;
41 import java.sql.PreparedStatement;
42 import java.sql.SQLException;
43 import java.util.Collection;
44 import java.util.Collections;
45 import java.util.List;
46 import java.util.Optional;
47 import java.util.Set;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.stream.Collectors;
50
51
52
53
54 @HighFrequencyInvocation
55 @Slf4j
56 public final class PipelineDataSourceSink implements PipelineSink {
57
58 private final ImporterConfiguration importerConfig;
59
60 private final DataSource dataSource;
61
62 private final PipelineImportSQLBuilder importSQLBuilder;
63
64 private final DataRecordGroupEngine groupEngine;
65
66 private final AtomicReference<PreparedStatement> runningStatement;
67
68 public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
69 this.importerConfig = importerConfig;
70 dataSource = dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
71 importSQLBuilder = new PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
72 groupEngine = new DataRecordGroupEngine();
73 runningStatement = new AtomicReference<>();
74 }
75
76 @Override
77 public PipelineJobUpdateProgress write(final String ackId, final Collection<Record> records) {
78 List<DataRecord> dataRecords = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
79 if (dataRecords.isEmpty()) {
80 return new PipelineJobUpdateProgress(0);
81 }
82 if (dataRecords.iterator().next().getUniqueKeyValue().isEmpty()) {
83 sequentialWrite(dataRecords);
84 return new PipelineJobUpdateProgress(dataRecords.size());
85 }
86 for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
87 batchWrite(each.getDeleteDataRecords());
88 batchWrite(each.getInsertDataRecords());
89 batchWrite(each.getUpdateDataRecords());
90 }
91 return new PipelineJobUpdateProgress((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count());
92 }
93
94 private void sequentialWrite(final List<DataRecord> buffer) {
95
96 try {
97 for (DataRecord each : buffer) {
98 doWrite(Collections.singletonList(each), true);
99 }
100 } catch (final SQLException ex) {
101 throw new PipelineImporterJobWriteException(ex);
102 }
103 }
104
105 @SuppressWarnings("BusyWait")
106 @SneakyThrows(InterruptedException.class)
107 private void batchWrite(final Collection<DataRecord> records) {
108 if (records.isEmpty()) {
109 return;
110 }
111 for (int i = 0; !Thread.interrupted() && i <= importerConfig.getRetryTimes(); i++) {
112 try {
113 doWrite(records, 0 == i);
114 break;
115 } catch (final SQLException ex) {
116 log.error("Flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
117 if (i == importerConfig.getRetryTimes()) {
118 throw new PipelineImporterJobWriteException(ex);
119 }
120 Thread.sleep(Math.min(5L * 60L * 1000L, 1000L << i));
121 }
122 }
123 }
124
125 private void doWrite(final Collection<DataRecord> records, final boolean firstTimeRun) throws SQLException {
126 switch (records.iterator().next().getType()) {
127 case INSERT:
128 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.INSERT, 1));
129 executeBatchInsert(records, firstTimeRun);
130 break;
131 case UPDATE:
132 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.UPDATE, 1));
133 executeUpdate(records, firstTimeRun);
134 break;
135 case DELETE:
136 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.DELETE, 1));
137 executeBatchDelete(records);
138 break;
139 default:
140 break;
141 }
142 }
143
144 private void executeBatchInsert(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
145 DataRecord dataRecord = dataRecords.iterator().next();
146 String sql = importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord);
147 try (
148 Connection connection = dataSource.getConnection();
149 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
150 runningStatement.set(preparedStatement);
151 if (firstTimeRun) {
152 executeBatchInsertFirstTime(connection, preparedStatement, dataRecords);
153 } else {
154 retryBatchInsert(preparedStatement, dataRecords);
155 }
156 } finally {
157 runningStatement.set(null);
158 }
159 }
160
161 private void executeBatchInsertFirstTime(final Connection connection, final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
162 boolean transactionEnabled = dataRecords.size() > 1;
163 if (transactionEnabled) {
164 connection.setAutoCommit(false);
165 }
166 preparedStatement.setQueryTimeout(30);
167 for (DataRecord each : dataRecords) {
168 for (int i = 0; i < each.getColumnCount(); i++) {
169 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
170 }
171 preparedStatement.addBatch();
172 }
173 preparedStatement.executeBatch();
174 if (transactionEnabled) {
175 connection.commit();
176 }
177 }
178
179 private void retryBatchInsert(final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
180 for (DataRecord each : dataRecords) {
181 for (int i = 0; i < each.getColumnCount(); i++) {
182 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
183 }
184 preparedStatement.executeUpdate();
185 }
186 }
187
188 private void executeUpdate(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
189 try (Connection connection = dataSource.getConnection()) {
190 boolean transactionEnabled = dataRecords.size() > 1 && firstTimeRun;
191 if (transactionEnabled) {
192 connection.setAutoCommit(false);
193 }
194 for (DataRecord each : dataRecords) {
195 executeUpdate(connection, each);
196 }
197 if (transactionEnabled) {
198 connection.commit();
199 }
200 }
201 }
202
203 private void executeUpdate(final Connection connection, final DataRecord dataRecord) throws SQLException {
204 Set<String> shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName());
205 List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
206 List<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
207 String sql = importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns);
208 try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
209 runningStatement.set(preparedStatement);
210 for (int i = 0; i < setColumns.size(); i++) {
211 preparedStatement.setObject(i + 1, setColumns.get(i).getValue());
212 }
213 for (int i = 0; i < conditionColumns.size(); i++) {
214 Column keyColumn = conditionColumns.get(i);
215
216 if (shardingColumns.contains(keyColumn.getName()) && null == keyColumn.getOldValue()) {
217 preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getValue());
218 continue;
219 }
220 preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getOldValue());
221 }
222
223 int updateCount = preparedStatement.executeUpdate();
224 if (1 != updateCount) {
225 log.warn("Update failed, update count: {}, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}",
226 updateCount, sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns));
227 }
228 } catch (final SQLException ex) {
229 log.error("execute update failed, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}, error message: {}, data record: {}",
230 sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns), ex.getMessage(), dataRecord);
231 throw ex;
232 } finally {
233 runningStatement.set(null);
234 }
235 }
236
237 private void executeBatchDelete(final Collection<DataRecord> dataRecords) throws SQLException {
238 try (Connection connection = dataSource.getConnection()) {
239 boolean transactionEnabled = dataRecords.size() > 1;
240 if (transactionEnabled) {
241 connection.setAutoCommit(false);
242 }
243 executeBatchDelete(connection, dataRecords, importerConfig.getShardingColumns(dataRecords.iterator().next().getTableName()));
244 if (transactionEnabled) {
245 connection.commit();
246 }
247 }
248 }
249
250 private void executeBatchDelete(final Connection connection, final Collection<DataRecord> dataRecords, final Set<String> shardingColumns) throws SQLException {
251 DataRecord dataRecord = dataRecords.iterator().next();
252 String deleteSQL = importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord,
253 RecordUtils.extractConditionColumns(dataRecord, shardingColumns));
254 try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
255 runningStatement.set(preparedStatement);
256 preparedStatement.setQueryTimeout(30);
257 for (DataRecord each : dataRecords) {
258 List<Column> conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
259 for (int i = 0; i < conditionColumns.size(); i++) {
260 Object oldValue = conditionColumns.get(i).getOldValue();
261 if (null == oldValue) {
262 log.warn("Record old value is null, record: {}", each);
263 }
264 preparedStatement.setObject(i + 1, oldValue);
265 }
266 preparedStatement.addBatch();
267 }
268 preparedStatement.executeBatch();
269 } finally {
270 runningStatement.set(null);
271 }
272 }
273
274 @Override
275 public void close() {
276 Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
277 }
278 }