View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.importer.sink.type;
19  
20  import lombok.SneakyThrows;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
24  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
25  import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
26  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
27  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
28  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30  import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
31  import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
33  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
34  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
35  import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
36  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
37  import org.apache.shardingsphere.infra.util.json.JsonUtils;
38  
39  import javax.sql.DataSource;
40  import java.sql.Connection;
41  import java.sql.PreparedStatement;
42  import java.sql.SQLException;
43  import java.util.Collection;
44  import java.util.Collections;
45  import java.util.List;
46  import java.util.Optional;
47  import java.util.concurrent.atomic.AtomicReference;
48  import java.util.stream.Collectors;
49  
50  /**
51   * Pipeline data source sink.
52   */
53  @HighFrequencyInvocation
54  @Slf4j
55  public final class PipelineDataSourceSink implements PipelineSink {
56      
57      private final ImporterConfiguration importerConfig;
58      
59      private final DataSource dataSource;
60      
61      private final PipelineImportSQLBuilder importSQLBuilder;
62      
63      private final DataRecordGroupEngine groupEngine;
64      
65      private final AtomicReference<PreparedStatement> runningStatement;
66      
67      public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
68          this.importerConfig = importerConfig;
69          dataSource = dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
70          importSQLBuilder = new PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
71          groupEngine = new DataRecordGroupEngine();
72          runningStatement = new AtomicReference<>();
73      }
74      
75      @Override
76      public PipelineJobUpdateProgress write(final String ackId, final Collection<Record> records) {
77          List<DataRecord> dataRecords = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
78          if (dataRecords.isEmpty()) {
79              return new PipelineJobUpdateProgress(0);
80          }
81          if (dataRecords.iterator().next().getUniqueKeyValue().isEmpty()) {
82              sequentialWrite(dataRecords);
83              return new PipelineJobUpdateProgress(dataRecords.size());
84          }
85          for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
86              batchWrite(each.getDeleteDataRecords());
87              batchWrite(each.getInsertDataRecords());
88              batchWrite(each.getUpdateDataRecords());
89          }
90          return new PipelineJobUpdateProgress((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count());
91      }
92      
93      private void sequentialWrite(final List<DataRecord> buffer) {
94          // TODO It's better to use transaction, but delete operation may not take effect on PostgreSQL sometimes
95          try {
96              for (DataRecord each : buffer) {
97                  doWrite(Collections.singletonList(each), true);
98              }
99          } catch (final SQLException ex) {
100             throw new PipelineImporterJobWriteException(ex);
101         }
102     }
103     
104     @SuppressWarnings("BusyWait")
105     @SneakyThrows(InterruptedException.class)
106     private void batchWrite(final Collection<DataRecord> records) {
107         if (records.isEmpty()) {
108             return;
109         }
110         for (int i = 0; !Thread.interrupted() && i <= importerConfig.getRetryTimes(); i++) {
111             try {
112                 doWrite(records, 0 == i);
113                 break;
114             } catch (final SQLException ex) {
115                 log.error("Flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
116                 if (i == importerConfig.getRetryTimes()) {
117                     throw new PipelineImporterJobWriteException(ex);
118                 }
119                 Thread.sleep(Math.min(5L * 60L * 1000L, 1000L << i));
120             }
121         }
122     }
123     
124     private void doWrite(final Collection<DataRecord> records, final boolean firstTimeRun) throws SQLException {
125         switch (records.iterator().next().getType()) {
126             case INSERT:
127                 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.INSERT, 1));
128                 executeBatchInsert(records, firstTimeRun);
129                 break;
130             case UPDATE:
131                 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.UPDATE, 1));
132                 executeUpdate(records, firstTimeRun);
133                 break;
134             case DELETE:
135                 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.DELETE, 1));
136                 executeBatchDelete(records);
137                 break;
138             default:
139                 break;
140         }
141     }
142     
143     private void executeBatchInsert(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
144         DataRecord dataRecord = dataRecords.iterator().next();
145         String sql = importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord);
146         try (
147                 Connection connection = dataSource.getConnection();
148                 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
149             runningStatement.set(preparedStatement);
150             if (firstTimeRun) {
151                 executeBatchInsertFirstTime(connection, preparedStatement, dataRecords);
152             } else {
153                 retryBatchInsert(preparedStatement, dataRecords);
154             }
155         } finally {
156             runningStatement.set(null);
157         }
158     }
159     
160     private void executeBatchInsertFirstTime(final Connection connection, final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
161         boolean transactionEnabled = dataRecords.size() > 1;
162         if (transactionEnabled) {
163             connection.setAutoCommit(false);
164         }
165         preparedStatement.setQueryTimeout(30);
166         for (DataRecord each : dataRecords) {
167             for (int i = 0; i < each.getColumnCount(); i++) {
168                 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
169             }
170             preparedStatement.addBatch();
171         }
172         preparedStatement.executeBatch();
173         if (transactionEnabled) {
174             connection.commit();
175         }
176     }
177     
178     private void retryBatchInsert(final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
179         for (DataRecord each : dataRecords) {
180             for (int i = 0; i < each.getColumnCount(); i++) {
181                 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
182             }
183             preparedStatement.executeUpdate();
184         }
185     }
186     
187     private void executeUpdate(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
188         try (Connection connection = dataSource.getConnection()) {
189             boolean transactionEnabled = dataRecords.size() > 1 && firstTimeRun;
190             if (transactionEnabled) {
191                 connection.setAutoCommit(false);
192             }
193             for (DataRecord each : dataRecords) {
194                 executeUpdate(connection, each);
195             }
196             if (transactionEnabled) {
197                 connection.commit();
198             }
199         }
200     }
201     
202     private void executeUpdate(final Connection connection, final DataRecord dataRecord) throws SQLException {
203         Collection<String> shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName());
204         List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
205         List<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
206         String sql = importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns);
207         try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
208             runningStatement.set(preparedStatement);
209             for (int i = 0; i < setColumns.size(); i++) {
210                 preparedStatement.setObject(i + 1, setColumns.get(i).getValue());
211             }
212             for (int i = 0; i < conditionColumns.size(); i++) {
213                 Column keyColumn = conditionColumns.get(i);
214                 // TODO There to be compatible with PostgreSQL before value is null except primary key and unsupported updating sharding value now.
215                 if (shardingColumns.contains(keyColumn.getName()) && null == keyColumn.getOldValue()) {
216                     preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getValue());
217                     continue;
218                 }
219                 preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getOldValue());
220             }
221             // TODO if table without unique key the conditionColumns before values is null, so update will fail at PostgreSQL
222             int updateCount = preparedStatement.executeUpdate();
223             if (1 != updateCount) {
224                 log.warn("Update failed, update count: {}, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}",
225                         updateCount, sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns));
226             }
227         } catch (final SQLException ex) {
228             log.error("execute update failed, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}, error message: {}, data record: {}",
229                     sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns), ex.getMessage(), dataRecord);
230             throw ex;
231         } finally {
232             runningStatement.set(null);
233         }
234     }
235     
236     private void executeBatchDelete(final Collection<DataRecord> dataRecords) throws SQLException {
237         try (Connection connection = dataSource.getConnection()) {
238             boolean transactionEnabled = dataRecords.size() > 1;
239             if (transactionEnabled) {
240                 connection.setAutoCommit(false);
241             }
242             executeBatchDelete(connection, dataRecords, importerConfig.getShardingColumns(dataRecords.iterator().next().getTableName()));
243             if (transactionEnabled) {
244                 connection.commit();
245             }
246         }
247     }
248     
249     private void executeBatchDelete(final Connection connection, final Collection<DataRecord> dataRecords, final Collection<String> shardingColumns) throws SQLException {
250         DataRecord dataRecord = dataRecords.iterator().next();
251         String deleteSQL = importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord,
252                 RecordUtils.extractConditionColumns(dataRecord, shardingColumns));
253         try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
254             runningStatement.set(preparedStatement);
255             preparedStatement.setQueryTimeout(30);
256             for (DataRecord each : dataRecords) {
257                 List<Column> conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
258                 for (int i = 0; i < conditionColumns.size(); i++) {
259                     Object oldValue = conditionColumns.get(i).getOldValue();
260                     if (null == oldValue) {
261                         log.warn("Record old value is null, record: {}", each);
262                     }
263                     preparedStatement.setObject(i + 1, oldValue);
264                 }
265                 preparedStatement.addBatch();
266             }
267             preparedStatement.executeBatch();
268         } finally {
269             runningStatement.set(null);
270         }
271     }
272     
273     @Override
274     public void close() {
275         Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
276     }
277 }