1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;
19
20 import lombok.RequiredArgsConstructor;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.commons.lang3.Range;
23 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
24 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
25 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
26 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
27 import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
28 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
29 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
30 import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
31 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
33 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
34 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
38 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
39 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
40 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
41 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
42 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
43 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
44 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
45 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
46 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
47 import org.apache.shardingsphere.data.pipeline.core.util.IntervalToRangeIterator;
48 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
49
50 import javax.sql.DataSource;
51 import java.sql.Connection;
52 import java.sql.ResultSet;
53 import java.sql.SQLException;
54 import java.sql.Statement;
55 import java.util.Collection;
56 import java.util.Collections;
57 import java.util.LinkedList;
58 import java.util.List;
59 import java.util.concurrent.atomic.AtomicReference;
60
61
62
63
64 @RequiredArgsConstructor
65 @Slf4j
66 public final class InventoryTaskSplitter {
67
68 private final PipelineDataSourceWrapper sourceDataSource;
69
70 private final InventoryDumperContext dumperContext;
71
72 private final ImporterConfiguration importerConfig;
73
74
75
76
77
78
79
80 public List<InventoryTask> splitInventoryData(final TransmissionJobItemContext jobItemContext) {
81 List<InventoryTask> result = new LinkedList<>();
82 long startTimeMillis = System.currentTimeMillis();
83 TransmissionProcessContext processContext = jobItemContext.getJobProcessContext();
84 for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) {
85 AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
86 PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfiguration().getStreamChannel(), importerConfig.getBatchSize(), position);
87 Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
88 Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
89 result.add(new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
90 processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
91 }
92 log.info("splitInventoryData cost {} ms", System.currentTimeMillis() - startTimeMillis);
93 return result;
94 }
95
96
97
98
99
100
101
102 public Collection<InventoryDumperContext> splitInventoryDumperContext(final TransmissionJobItemContext jobItemContext) {
103 Collection<InventoryDumperContext> result = new LinkedList<>();
104 for (InventoryDumperContext each : splitByTable(dumperContext)) {
105 result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource));
106 }
107 return result;
108 }
109
110 private Collection<InventoryDumperContext> splitByTable(final InventoryDumperContext dumperContext) {
111 Collection<InventoryDumperContext> result = new LinkedList<>();
112 dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key, value) -> {
113 InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
114
115 inventoryDumperContext.setActualTableName(key.toString());
116 inventoryDumperContext.setLogicTableName(value.toString());
117 inventoryDumperContext.getCommonContext().setPosition(new IngestPlaceholderPosition());
118 inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
119 inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
120 result.add(inventoryDumperContext);
121 });
122 return result;
123 }
124
125 private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext,
126 final PipelineDataSourceWrapper dataSource) {
127 if (null == dumperContext.getUniqueKeyColumns()) {
128 String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
129 String actualTableName = dumperContext.getActualTableName();
130 List<PipelineColumnMetaData> uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader());
131 dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
132 }
133 Collection<InventoryDumperContext> result = new LinkedList<>();
134 TransmissionProcessContext jobProcessContext = jobItemContext.getJobProcessContext();
135 PipelineReadConfiguration readConfig = jobProcessContext.getProcessConfiguration().getRead();
136 int batchSize = readConfig.getBatchSize();
137 JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
138 Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource);
139 int i = 0;
140 for (IngestPosition each : inventoryPositions) {
141 InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
142 splitDumperContext.getCommonContext().setPosition(each);
143 splitDumperContext.setShardingItem(i++);
144 splitDumperContext.setActualTableName(dumperContext.getActualTableName());
145 splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
146 splitDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
147 splitDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
148 splitDumperContext.setBatchSize(batchSize);
149 splitDumperContext.setRateLimitAlgorithm(rateLimitAlgorithm);
150 result.add(splitDumperContext);
151 }
152 return result;
153 }
154
155 private Collection<IngestPosition> getInventoryPositions(final InventoryDumperContext dumperContext, final TransmissionJobItemContext jobItemContext,
156 final PipelineDataSourceWrapper dataSource) {
157 TransmissionJobItemProgress initProgress = jobItemContext.getInitProgress();
158 if (null != initProgress) {
159
160 Collection<IngestPosition> result = initProgress.getInventory().getInventoryPosition(dumperContext.getActualTableName()).values();
161 if (!result.isEmpty()) {
162 return result;
163 }
164 }
165 long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource);
166 jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
167 if (!dumperContext.hasUniqueKey()) {
168 return Collections.singleton(new UnsupportedKeyIngestPosition());
169 }
170 List<PipelineColumnMetaData> uniqueKeyColumns = dumperContext.getUniqueKeyColumns();
171 if (1 == uniqueKeyColumns.size()) {
172 int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
173 if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
174 return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, dataSource);
175 }
176 if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
177
178 return Collections.singleton(new StringPrimaryKeyIngestPosition(null, null));
179 }
180 }
181 return Collections.singleton(new UnsupportedKeyIngestPosition());
182 }
183
184 private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount,
185 final TransmissionJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) {
186 if (0 == tableRecordsCount) {
187 return Collections.singletonList(new IntegerPrimaryKeyIngestPosition(0, 0));
188 }
189 Collection<IngestPosition> result = new LinkedList<>();
190 Range<Long> uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext);
191 int shardingSize = jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
192 long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0 ? 1 : 0);
193 long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount;
194 IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval);
195 while (rangeIterator.hasNext()) {
196 Range<Long> range = rangeIterator.next();
197 result.add(new IntegerPrimaryKeyIngestPosition(range.getMinimum(), range.getMaximum()));
198 }
199 return result;
200 }
201
202 private Range<Long> getUniqueKeyValuesRange(final TransmissionJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperContext dumperContext) {
203 String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName();
204 PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
205 String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
206 dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey);
207 try (
208 Connection connection = dataSource.getConnection();
209 Statement statement = connection.createStatement();
210 ResultSet resultSet = statement.executeQuery(sql)) {
211 resultSet.next();
212 return Range.between(resultSet.getLong(1), resultSet.getLong(2));
213 } catch (final SQLException ex) {
214 throw new SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), uniqueKey, ex);
215 }
216 }
217 }