1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
19
20 import lombok.RequiredArgsConstructor;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.cdc.config.CDCTaskConfiguration;
23 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
24 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
25 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
26 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
27 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
28 import org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
29 import org.apache.shardingsphere.data.pipeline.core.channel.InventoryChannelCreator;
30 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
31 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
32 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
33 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
34 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
38 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
39 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
40 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
41 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
42 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.PlaceholderInventoryDataRecordPositionCreator;
43 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.UniqueKeyInventoryDataRecordPositionCreator;
44 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
45 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
46 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
47 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
48 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
49 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
50 import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
51 import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter.InventoryDumperContextSplitter;
52 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
53 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
54 import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
55 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
56
57 import java.sql.SQLException;
58 import java.util.Collection;
59 import java.util.List;
60 import java.util.Optional;
61 import java.util.concurrent.CopyOnWriteArrayList;
62 import java.util.concurrent.atomic.AtomicBoolean;
63 import java.util.concurrent.atomic.AtomicReference;
64
65
66
67
68 @RequiredArgsConstructor
69 @Slf4j
70 public final class CDCJobPreparer {
71
72 private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager;
73
74
75
76
77
78
79 public void initTasks(final Collection<CDCJobItemContext> jobItemContexts) {
80
81 AtomicBoolean inventoryImporterUsed = new AtomicBoolean();
82 List<CDCChannelProgressPair> inventoryChannelProgressPairs = new CopyOnWriteArrayList<>();
83 AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
84 List<CDCChannelProgressPair> incrementalChannelProgressPairs = new CopyOnWriteArrayList<>();
85 for (CDCJobItemContext each : jobItemContexts) {
86 initTasks(each, inventoryImporterUsed, inventoryChannelProgressPairs, incrementalImporterUsed, incrementalChannelProgressPairs);
87 }
88 }
89
90 private void initTasks(final CDCJobItemContext jobItemContext, final AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> inventoryChannelProgressPairs,
91 final AtomicBoolean incrementalImporterUsed, final List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
92 Optional<TransmissionJobItemProgress> jobItemProgress = jobItemManager.getProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
93 if (!jobItemProgress.isPresent()) {
94 jobItemManager.persistProgress(jobItemContext);
95 }
96 if (jobItemContext.isStopping()) {
97 PipelineJobRegistry.stop(jobItemContext.getJobId());
98 return;
99 }
100 initIncrementalPosition(jobItemContext);
101 if (jobItemContext.getJobConfig().isFull()) {
102 initInventoryTasks(jobItemContext, inventoryImporterUsed, inventoryChannelProgressPairs);
103 }
104 initIncrementalTask(jobItemContext, incrementalImporterUsed, incrementalChannelProgressPairs);
105 }
106
107 private void initIncrementalPosition(final CDCJobItemContext jobItemContext) {
108 CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
109 JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
110 try {
111 DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
112 IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
113 taskConfig.getDumperContext().getCommonContext().setPosition(position);
114 } catch (final SQLException ex) {
115 throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
116 }
117 }
118
119 private void initInventoryTasks(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
120 long startTimeMillis = System.currentTimeMillis();
121 CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
122 ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
123 TransmissionProcessContext processContext = jobItemContext.getJobProcessContext();
124 InventoryDumperContextSplitter dumperContextSplitter = new InventoryDumperContextSplitter(
125 jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()));
126 for (InventoryDumperContext each : dumperContextSplitter.split(jobItemContext)) {
127 AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
128 PipelineChannel channel = InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(), importerConfig.getBatchSize(), position);
129 if (!(position.get() instanceof IngestFinishedPosition)) {
130 channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
131 }
132 InventoryDataRecordPositionCreator positionCreator = each.hasUniqueKey() ? new UniqueKeyInventoryDataRecordPositionCreator() : new PlaceholderInventoryDataRecordPositionCreator();
133 Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader(), positionCreator);
134 Importer importer = importerUsed.get() ? null
135 : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 100L, jobItemContext.getSink(), false, importerConfig.getRateLimitAlgorithm());
136 jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
137 processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
138 if (!(position.get() instanceof IngestFinishedPosition)) {
139 importerUsed.set(true);
140 }
141 }
142 log.info("Init inventory tasks cost {} ms", System.currentTimeMillis() - startTimeMillis);
143 }
144
145 private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
146 CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
147 IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
148 IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
149 PipelineChannel channel = IncrementalChannelCreator.create(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
150 channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
151 CreateIncrementalDumperParameter param = new CreateIncrementalDumperParameter(
152 dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager());
153 Dumper dumper = IncrementalDumperCreator.create(param);
154 boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
155 Importer importer = importerUsed.get() ? null
156 : new CDCImporter(channelProgressPairs, 1, 100L, jobItemContext.getSink(), needSorting, taskConfig.getImporterConfig().getRateLimitAlgorithm());
157 PipelineTask incrementalTask = new CDCIncrementalTask(
158 dumperContext.getCommonContext().getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress);
159 jobItemContext.getIncrementalTasks().add(incrementalTask);
160 importerUsed.set(true);
161 }
162 }