View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.importer.sink.type;
19  
20  import lombok.SneakyThrows;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
24  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
25  import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
26  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
27  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
28  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30  import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
31  import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
33  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
34  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
35  import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
36  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
37  import org.apache.shardingsphere.infra.util.json.JsonUtils;
38  
39  import javax.sql.DataSource;
40  import java.sql.Connection;
41  import java.sql.PreparedStatement;
42  import java.sql.SQLException;
43  import java.util.Collection;
44  import java.util.Collections;
45  import java.util.List;
46  import java.util.Optional;
47  import java.util.Set;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.stream.Collectors;
50  
51  /**
52   * Pipeline data source sink.
53   */
54  @HighFrequencyInvocation
55  @Slf4j
56  public final class PipelineDataSourceSink implements PipelineSink {
57      
58      private final ImporterConfiguration importerConfig;
59      
60      private final DataSource dataSource;
61      
62      private final PipelineImportSQLBuilder importSQLBuilder;
63      
64      private final DataRecordGroupEngine groupEngine;
65      
66      private final AtomicReference<PreparedStatement> runningStatement;
67      
68      public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
69          this.importerConfig = importerConfig;
70          dataSource = dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
71          importSQLBuilder = new PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
72          groupEngine = new DataRecordGroupEngine();
73          runningStatement = new AtomicReference<>();
74      }
75      
76      @Override
77      public PipelineJobUpdateProgress write(final String ackId, final Collection<Record> records) {
78          List<DataRecord> dataRecords = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
79          if (dataRecords.isEmpty()) {
80              return new PipelineJobUpdateProgress(0);
81          }
82          if (dataRecords.iterator().next().getUniqueKeyValue().isEmpty()) {
83              sequentialWrite(dataRecords);
84              return new PipelineJobUpdateProgress(dataRecords.size());
85          }
86          for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
87              batchWrite(each.getDeleteDataRecords());
88              batchWrite(each.getInsertDataRecords());
89              batchWrite(each.getUpdateDataRecords());
90          }
91          return new PipelineJobUpdateProgress((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count());
92      }
93      
94      private void sequentialWrite(final List<DataRecord> buffer) {
95          // TODO It's better to use transaction, but delete operation may not take effect on PostgreSQL sometimes
96          try {
97              for (DataRecord each : buffer) {
98                  doWrite(Collections.singletonList(each), true);
99              }
100         } catch (final SQLException ex) {
101             throw new PipelineImporterJobWriteException(ex);
102         }
103     }
104     
105     @SuppressWarnings("BusyWait")
106     @SneakyThrows(InterruptedException.class)
107     private void batchWrite(final Collection<DataRecord> records) {
108         if (records.isEmpty()) {
109             return;
110         }
111         for (int i = 0; !Thread.interrupted() && i <= importerConfig.getRetryTimes(); i++) {
112             try {
113                 doWrite(records, 0 == i);
114                 break;
115             } catch (final SQLException ex) {
116                 log.error("Flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
117                 if (i == importerConfig.getRetryTimes()) {
118                     throw new PipelineImporterJobWriteException(ex);
119                 }
120                 Thread.sleep(Math.min(5L * 60L * 1000L, 1000L << i));
121             }
122         }
123     }
124     
125     private void doWrite(final Collection<DataRecord> records, final boolean firstTimeRun) throws SQLException {
126         switch (records.iterator().next().getType()) {
127             case INSERT:
128                 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.INSERT, 1));
129                 executeBatchInsert(records, firstTimeRun);
130                 break;
131             case UPDATE:
132                 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.UPDATE, 1));
133                 executeUpdate(records, firstTimeRun);
134                 break;
135             case DELETE:
136                 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.DELETE, 1));
137                 executeBatchDelete(records);
138                 break;
139             default:
140                 break;
141         }
142     }
143     
144     private void executeBatchInsert(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
145         DataRecord dataRecord = dataRecords.iterator().next();
146         String sql = importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord);
147         try (
148                 Connection connection = dataSource.getConnection();
149                 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
150             runningStatement.set(preparedStatement);
151             if (firstTimeRun) {
152                 executeBatchInsertFirstTime(connection, preparedStatement, dataRecords);
153             } else {
154                 retryBatchInsert(preparedStatement, dataRecords);
155             }
156         } finally {
157             runningStatement.set(null);
158         }
159     }
160     
161     private void executeBatchInsertFirstTime(final Connection connection, final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
162         boolean transactionEnabled = dataRecords.size() > 1;
163         if (transactionEnabled) {
164             connection.setAutoCommit(false);
165         }
166         preparedStatement.setQueryTimeout(30);
167         for (DataRecord each : dataRecords) {
168             for (int i = 0; i < each.getColumnCount(); i++) {
169                 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
170             }
171             preparedStatement.addBatch();
172         }
173         preparedStatement.executeBatch();
174         if (transactionEnabled) {
175             connection.commit();
176         }
177     }
178     
179     private void retryBatchInsert(final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
180         for (DataRecord each : dataRecords) {
181             for (int i = 0; i < each.getColumnCount(); i++) {
182                 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
183             }
184             preparedStatement.executeUpdate();
185         }
186     }
187     
188     private void executeUpdate(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
189         try (Connection connection = dataSource.getConnection()) {
190             boolean transactionEnabled = dataRecords.size() > 1 && firstTimeRun;
191             if (transactionEnabled) {
192                 connection.setAutoCommit(false);
193             }
194             for (DataRecord each : dataRecords) {
195                 executeUpdate(connection, each);
196             }
197             if (transactionEnabled) {
198                 connection.commit();
199             }
200         }
201     }
202     
203     private void executeUpdate(final Connection connection, final DataRecord dataRecord) throws SQLException {
204         Set<String> shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName());
205         List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
206         List<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
207         String sql = importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns);
208         try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
209             runningStatement.set(preparedStatement);
210             for (int i = 0; i < setColumns.size(); i++) {
211                 preparedStatement.setObject(i + 1, setColumns.get(i).getValue());
212             }
213             for (int i = 0; i < conditionColumns.size(); i++) {
214                 Column keyColumn = conditionColumns.get(i);
215                 // TODO There to be compatible with PostgreSQL before value is null except primary key and unsupported updating sharding value now.
216                 if (shardingColumns.contains(keyColumn.getName()) && null == keyColumn.getOldValue()) {
217                     preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getValue());
218                     continue;
219                 }
220                 preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getOldValue());
221             }
222             // TODO if table without unique key the conditionColumns before values is null, so update will fail at PostgreSQL
223             int updateCount = preparedStatement.executeUpdate();
224             if (1 != updateCount) {
225                 log.warn("Update failed, update count: {}, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}",
226                         updateCount, sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns));
227             }
228         } catch (final SQLException ex) {
229             log.error("execute update failed, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}, error message: {}, data record: {}",
230                     sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns), ex.getMessage(), dataRecord);
231             throw ex;
232         } finally {
233             runningStatement.set(null);
234         }
235     }
236     
237     private void executeBatchDelete(final Collection<DataRecord> dataRecords) throws SQLException {
238         try (Connection connection = dataSource.getConnection()) {
239             boolean transactionEnabled = dataRecords.size() > 1;
240             if (transactionEnabled) {
241                 connection.setAutoCommit(false);
242             }
243             executeBatchDelete(connection, dataRecords, importerConfig.getShardingColumns(dataRecords.iterator().next().getTableName()));
244             if (transactionEnabled) {
245                 connection.commit();
246             }
247         }
248     }
249     
250     private void executeBatchDelete(final Connection connection, final Collection<DataRecord> dataRecords, final Set<String> shardingColumns) throws SQLException {
251         DataRecord dataRecord = dataRecords.iterator().next();
252         String deleteSQL = importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord,
253                 RecordUtils.extractConditionColumns(dataRecord, shardingColumns));
254         try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
255             runningStatement.set(preparedStatement);
256             preparedStatement.setQueryTimeout(30);
257             for (DataRecord each : dataRecords) {
258                 List<Column> conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
259                 for (int i = 0; i < conditionColumns.size(); i++) {
260                     Object oldValue = conditionColumns.get(i).getOldValue();
261                     if (null == oldValue) {
262                         log.warn("Record old value is null, record: {}", each);
263                     }
264                     preparedStatement.setObject(i + 1, oldValue);
265                 }
266                 preparedStatement.addBatch();
267             }
268             preparedStatement.executeBatch();
269         } finally {
270             runningStatement.set(null);
271         }
272     }
273     
274     @Override
275     public void close() {
276         Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
277     }
278 }