View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.importer.sink.type;
19  
20  import lombok.SneakyThrows;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
24  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
25  import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
26  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
27  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
28  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
29  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
30  import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
31  import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
33  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
34  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
35  import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
36  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
37  import org.apache.shardingsphere.infra.util.json.JsonUtils;
38  
39  import javax.sql.DataSource;
40  import java.sql.Connection;
41  import java.sql.PreparedStatement;
42  import java.sql.SQLException;
43  import java.util.Collection;
44  import java.util.List;
45  import java.util.Optional;
46  import java.util.Set;
47  import java.util.concurrent.atomic.AtomicReference;
48  import java.util.stream.Collectors;
49  
50  /**
51   * Pipeline data source sink.
52   */
53  @HighFrequencyInvocation
54  @Slf4j
55  public final class PipelineDataSourceSink implements PipelineSink {
56      
57      private final ImporterConfiguration importerConfig;
58      
59      private final DataSource dataSource;
60      
61      private final PipelineImportSQLBuilder importSQLBuilder;
62      
63      private final DataRecordGroupEngine groupEngine;
64      
65      private final AtomicReference<PreparedStatement> runningStatement;
66      
67      public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
68          this.importerConfig = importerConfig;
69          dataSource = dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
70          importSQLBuilder = new PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
71          groupEngine = new DataRecordGroupEngine();
72          runningStatement = new AtomicReference<>();
73      }
74      
75      @Override
76      public PipelineJobProgressUpdatedParameter write(final String ackId, final Collection<Record> records) {
77          List<DataRecord> dataRecords = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
78          if (dataRecords.isEmpty()) {
79              return new PipelineJobProgressUpdatedParameter(0);
80          }
81          for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
82              batchWrite(each.getDeleteDataRecords());
83              batchWrite(each.getInsertDataRecords());
84              batchWrite(each.getUpdateDataRecords());
85          }
86          return new PipelineJobProgressUpdatedParameter((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count());
87      }
88      
89      @SuppressWarnings("BusyWait")
90      @SneakyThrows(InterruptedException.class)
91      private void batchWrite(final Collection<DataRecord> records) {
92          if (records.isEmpty()) {
93              return;
94          }
95          for (int i = 0; !Thread.interrupted() && i <= importerConfig.getRetryTimes(); i++) {
96              try {
97                  doWrite(records, 0 == i);
98                  break;
99              } catch (final SQLException ex) {
100                 log.error("Flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
101                 if (i == importerConfig.getRetryTimes()) {
102                     throw new PipelineImporterJobWriteException(ex);
103                 }
104                 Thread.sleep(Math.min(5 * 60 * 1000L, 1000L << i));
105             }
106         }
107     }
108     
109     private void doWrite(final Collection<DataRecord> records, final boolean firstTimeRun) throws SQLException {
110         switch (records.iterator().next().getType()) {
111             case INSERT:
112                 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.INSERT, 1));
113                 executeBatchInsert(records, firstTimeRun);
114                 break;
115             case UPDATE:
116                 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.UPDATE, 1));
117                 executeUpdate(records, firstTimeRun);
118                 break;
119             case DELETE:
120                 Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional -> optional.intercept(PipelineSQLOperationType.DELETE, 1));
121                 executeBatchDelete(records);
122                 break;
123             default:
124                 break;
125         }
126     }
127     
128     private void executeBatchInsert(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
129         DataRecord dataRecord = dataRecords.iterator().next();
130         String sql = importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord);
131         try (
132                 Connection connection = dataSource.getConnection();
133                 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
134             runningStatement.set(preparedStatement);
135             if (firstTimeRun) {
136                 executeBatchInsertFirstTime(connection, preparedStatement, dataRecords);
137             } else {
138                 retryBatchInsert(preparedStatement, dataRecords);
139             }
140         } finally {
141             runningStatement.set(null);
142         }
143     }
144     
145     private void executeBatchInsertFirstTime(final Connection connection, final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
146         boolean transactionEnabled = dataRecords.size() > 1;
147         if (transactionEnabled) {
148             connection.setAutoCommit(false);
149         }
150         preparedStatement.setQueryTimeout(30);
151         for (DataRecord each : dataRecords) {
152             for (int i = 0; i < each.getColumnCount(); i++) {
153                 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
154             }
155             preparedStatement.addBatch();
156         }
157         preparedStatement.executeBatch();
158         if (transactionEnabled) {
159             connection.commit();
160         }
161     }
162     
163     private void retryBatchInsert(final PreparedStatement preparedStatement, final Collection<DataRecord> dataRecords) throws SQLException {
164         for (DataRecord each : dataRecords) {
165             for (int i = 0; i < each.getColumnCount(); i++) {
166                 preparedStatement.setObject(i + 1, each.getColumn(i).getValue());
167             }
168             preparedStatement.executeUpdate();
169         }
170     }
171     
172     private void executeUpdate(final Collection<DataRecord> dataRecords, final boolean firstTimeRun) throws SQLException {
173         try (Connection connection = dataSource.getConnection()) {
174             boolean transactionEnabled = dataRecords.size() > 1 && firstTimeRun;
175             if (transactionEnabled) {
176                 connection.setAutoCommit(false);
177             }
178             for (DataRecord each : dataRecords) {
179                 executeUpdate(connection, each);
180             }
181             if (transactionEnabled) {
182                 connection.commit();
183             }
184         }
185     }
186     
187     private void executeUpdate(final Connection connection, final DataRecord dataRecord) throws SQLException {
188         Set<String> shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName());
189         List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
190         List<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
191         String sql = importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns);
192         try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
193             runningStatement.set(preparedStatement);
194             for (int i = 0; i < setColumns.size(); i++) {
195                 preparedStatement.setObject(i + 1, setColumns.get(i).getValue());
196             }
197             for (int i = 0; i < conditionColumns.size(); i++) {
198                 Column keyColumn = conditionColumns.get(i);
199                 // TODO There to be compatible with PostgreSQL before value is null except primary key and unsupported updating sharding value now.
200                 if (shardingColumns.contains(keyColumn.getName()) && null == keyColumn.getOldValue()) {
201                     preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getValue());
202                     continue;
203                 }
204                 preparedStatement.setObject(setColumns.size() + i + 1, keyColumn.getOldValue());
205             }
206             // TODO if table without unique key the conditionColumns before values is null, so update will fail at PostgreSQL
207             int updateCount = preparedStatement.executeUpdate();
208             if (1 != updateCount) {
209                 log.warn("execute update failed, update count: {}, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}",
210                         updateCount, sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns));
211             }
212         } catch (final SQLException ex) {
213             log.error("execute update failed, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}, error message: {}, data record: {}",
214                     sql, setColumns, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(conditionColumns), ex.getMessage(), dataRecord);
215             throw ex;
216         } finally {
217             runningStatement.set(null);
218         }
219     }
220     
221     private void executeBatchDelete(final Collection<DataRecord> dataRecords) throws SQLException {
222         try (Connection connection = dataSource.getConnection()) {
223             boolean transactionEnabled = dataRecords.size() > 1;
224             if (transactionEnabled) {
225                 connection.setAutoCommit(false);
226             }
227             executeBatchDelete(connection, dataRecords, importerConfig.getShardingColumns(dataRecords.iterator().next().getTableName()));
228             if (transactionEnabled) {
229                 connection.commit();
230             }
231         }
232     }
233     
234     private void executeBatchDelete(final Connection connection, final Collection<DataRecord> dataRecords, final Set<String> shardingColumns) throws SQLException {
235         DataRecord dataRecord = dataRecords.iterator().next();
236         String deleteSQL = importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord,
237                 RecordUtils.extractConditionColumns(dataRecord, shardingColumns));
238         try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
239             runningStatement.set(preparedStatement);
240             preparedStatement.setQueryTimeout(30);
241             for (DataRecord each : dataRecords) {
242                 List<Column> conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
243                 for (int i = 0; i < conditionColumns.size(); i++) {
244                     Object oldValue = conditionColumns.get(i).getOldValue();
245                     if (null == oldValue) {
246                         log.warn("Record old value is null, record: {}", each);
247                     }
248                     preparedStatement.setObject(i + 1, oldValue);
249                 }
250                 preparedStatement.addBatch();
251             }
252             preparedStatement.executeBatch();
253         } finally {
254             runningStatement.set(null);
255         }
256     }
257     
258     @Override
259     public void close() {
260         Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
261     }
262 }