View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;
19  
20  import lombok.RequiredArgsConstructor;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.commons.lang3.Range;
23  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
24  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
25  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
26  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
27  import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
28  import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
29  import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
30  import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
31  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
33  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
34  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
38  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
39  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
40  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
41  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
42  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
43  import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
44  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
45  import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
46  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
47  import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
48  import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
49  
50  import javax.sql.DataSource;
51  import java.sql.Connection;
52  import java.sql.ResultSet;
53  import java.sql.SQLException;
54  import java.sql.Statement;
55  import java.util.Collection;
56  import java.util.Collections;
57  import java.util.LinkedList;
58  import java.util.List;
59  import java.util.concurrent.atomic.AtomicReference;
60  
61  /**
62   * Inventory task splitter.
63   */
64  @RequiredArgsConstructor
65  @Slf4j
66  public final class InventoryTaskSplitter {
67      
68      private final PipelineDataSourceWrapper sourceDataSource;
69      
70      private final InventoryDumperContext dumperContext;
71      
72      private final ImporterConfiguration importerConfig;
73      
74      /**
75       * Split inventory data to multi-tasks.
76       *
77       * @param jobItemContext job item context
78       * @return split inventory data task
79       */
80      public List<InventoryTask> splitInventoryData(final TransmissionJobItemContext jobItemContext) {
81          List<InventoryTask> result = new LinkedList<>();
82          long startTimeMillis = System.currentTimeMillis();
83          TransmissionProcessContext processContext = jobItemContext.getJobProcessContext();
84          for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) {
85              AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
86              PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfiguration().getStreamChannel(), importerConfig.getBatchSize(), position);
87              Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
88              Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
89              result.add(new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
90                      processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
91          }
92          log.info("splitInventoryData cost {} ms", System.currentTimeMillis() - startTimeMillis);
93          return result;
94      }
95      
96      /**
97       * Split inventory dumper context.
98       *
99       * @param jobItemContext job item context
100      * @return inventory dumper contexts
101      */
102     public Collection<InventoryDumperContext> splitInventoryDumperContext(final TransmissionJobItemContext jobItemContext) {
103         Collection<InventoryDumperContext> result = new LinkedList<>();
104         for (InventoryDumperContext each : splitByTable(dumperContext)) {
105             result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource));
106         }
107         return result;
108     }
109     
110     private Collection<InventoryDumperContext> splitByTable(final InventoryDumperContext dumperContext) {
111         Collection<InventoryDumperContext> result = new LinkedList<>();
112         dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key, value) -> {
113             InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
114             // use original table name, for metadata loader, since some database table name case-sensitive
115             inventoryDumperContext.setActualTableName(key.toString());
116             inventoryDumperContext.setLogicTableName(value.toString());
117             inventoryDumperContext.getCommonContext().setPosition(new IngestPlaceholderPosition());
118             inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
119             inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
120             result.add(inventoryDumperContext);
121         });
122         return result;
123     }
124     
125     private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext,
126                                                                  final PipelineDataSourceWrapper dataSource) {
127         if (null == dumperContext.getUniqueKeyColumns()) {
128             String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
129             String actualTableName = dumperContext.getActualTableName();
130             List<PipelineColumnMetaData> uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader());
131             dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
132         }
133         Collection<InventoryDumperContext> result = new LinkedList<>();
134         TransmissionProcessContext jobProcessContext = jobItemContext.getJobProcessContext();
135         PipelineReadConfiguration readConfig = jobProcessContext.getProcessConfiguration().getRead();
136         int batchSize = readConfig.getBatchSize();
137         JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
138         Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource);
139         int i = 0;
140         for (IngestPosition each : inventoryPositions) {
141             InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
142             splitDumperContext.getCommonContext().setPosition(each);
143             splitDumperContext.setShardingItem(i++);
144             splitDumperContext.setActualTableName(dumperContext.getActualTableName());
145             splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
146             splitDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
147             splitDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
148             splitDumperContext.setBatchSize(batchSize);
149             splitDumperContext.setRateLimitAlgorithm(rateLimitAlgorithm);
150             result.add(splitDumperContext);
151         }
152         return result;
153     }
154     
155     private Collection<IngestPosition> getInventoryPositions(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext,
156                                                              final PipelineDataSourceWrapper dataSource) {
157         TransmissionJobItemProgress initProgress = jobItemContext.getInitProgress();
158         if (null != initProgress) {
159             // Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
160             Collection<IngestPosition> result = initProgress.getInventory().getInventoryPosition(dumperContext.getActualTableName()).values();
161             if (!result.isEmpty()) {
162                 return result;
163             }
164         }
165         long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource);
166         jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
167         if (!dumperContext.hasUniqueKey()) {
168             return Collections.singleton(new UnsupportedKeyIngestPosition());
169         }
170         List<PipelineColumnMetaData> uniqueKeyColumns = dumperContext.getUniqueKeyColumns();
171         if (1 == uniqueKeyColumns.size()) {
172             int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
173             if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
174                 return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, dataSource);
175             }
176             if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
177                 // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases.
178                 return Collections.singleton(new StringPrimaryKeyIngestPosition(null, null));
179             }
180         }
181         return Collections.singleton(new UnsupportedKeyIngestPosition());
182     }
183     
184     private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount,
185                                                                           final TransmissionJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) {
186         if (0 == tableRecordsCount) {
187             return Collections.singletonList(new IntegerPrimaryKeyIngestPosition(0, 0));
188         }
189         Collection<IngestPosition> result = new LinkedList<>();
190         Range<Long> uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext);
191         int shardingSize = jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
192         long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0 ? 1 : 0);
193         long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount;
194         IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval);
195         while (rangeIterator.hasNext()) {
196             Range<Long> range = rangeIterator.next();
197             result.add(new IntegerPrimaryKeyIngestPosition(range.getMinimum(), range.getMaximum()));
198         }
199         return result;
200     }
201     
202     private Range<Long> getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperContext dumperContext) {
203         String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName();
204         PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
205         String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
206                 dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey);
207         try (
208                 Connection connection = dataSource.getConnection();
209                 Statement statement = connection.createStatement();
210                 ResultSet resultSet = statement.executeQuery(sql)) {
211             resultSet.next();
212             return Range.between(resultSet.getLong(1), resultSet.getLong(2));
213         } catch (final SQLException ex) {
214             throw new SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), uniqueKey, ex);
215         }
216     }
217 }