1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
22 import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
23 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
24 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
25 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
26 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
27 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
28 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
29 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
30 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
31 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
32 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
33 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
34 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
38 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
39 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
40 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
41 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
42 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
43 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
44 import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
45 import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
46 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
47 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
48 import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
49 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
50 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
51
52 import java.sql.SQLException;
53 import java.util.Collection;
54 import java.util.List;
55 import java.util.Optional;
56 import java.util.concurrent.CopyOnWriteArrayList;
57 import java.util.concurrent.atomic.AtomicBoolean;
58 import java.util.concurrent.atomic.AtomicReference;
59
60
61
62
63 @Slf4j
64 public final class CDCJobPreparer {
65
66 private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new CDCJobType().getYamlJobItemProgressSwapper());
67
68
69
70
71
72
73 public void initTasks(final Collection<CDCJobItemContext> jobItemContexts) {
74
75 AtomicBoolean inventoryImporterUsed = new AtomicBoolean();
76 List<CDCChannelProgressPair> inventoryChannelProgressPairs = new CopyOnWriteArrayList<>();
77 AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
78 List<CDCChannelProgressPair> incrementalChannelProgressPairs = new CopyOnWriteArrayList<>();
79 for (CDCJobItemContext each : jobItemContexts) {
80 initTasks0(each, inventoryImporterUsed, inventoryChannelProgressPairs, incrementalImporterUsed, incrementalChannelProgressPairs);
81 }
82 }
83
84 private void initTasks0(final CDCJobItemContext jobItemContext, final AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> inventoryChannelProgressPairs,
85 final AtomicBoolean incrementalImporterUsed, final List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
86 Optional<TransmissionJobItemProgress> jobItemProgress = jobItemManager.getProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
87 if (!jobItemProgress.isPresent()) {
88 jobItemManager.persistProgress(jobItemContext);
89 }
90 if (jobItemContext.isStopping()) {
91 PipelineJobRegistry.stop(jobItemContext.getJobId());
92 return;
93 }
94 initIncrementalPosition(jobItemContext);
95 if (jobItemContext.getJobConfig().isFull()) {
96 initInventoryTasks(jobItemContext, inventoryImporterUsed, inventoryChannelProgressPairs);
97 }
98 initIncrementalTask(jobItemContext, incrementalImporterUsed, incrementalChannelProgressPairs);
99 }
100
101 private void initIncrementalPosition(final CDCJobItemContext jobItemContext) {
102 CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
103 JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
104 try {
105 DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
106 IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
107 taskConfig.getDumperContext().getCommonContext().setPosition(position);
108 } catch (final SQLException ex) {
109 throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
110 }
111 }
112
113 private void initInventoryTasks(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
114 long startTimeMillis = System.currentTimeMillis();
115 CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
116 ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
117 TransmissionProcessContext processContext = jobItemContext.getJobProcessContext();
118 for (InventoryDumperContext each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()), importerConfig)
119 .splitInventoryDumperContext(jobItemContext)) {
120 AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
121 PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfiguration().getStreamChannel(), importerConfig.getBatchSize(), position);
122 if (!(position.get() instanceof IngestFinishedPosition)) {
123 channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
124 }
125 Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
126 Importer importer = importerUsed.get() ? null
127 : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 100L, jobItemContext.getSink(), false, importerConfig.getRateLimitAlgorithm());
128 jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
129 processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
130 if (!(position.get() instanceof IngestFinishedPosition)) {
131 importerUsed.set(true);
132 }
133 }
134 log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - startTimeMillis);
135 }
136
137 private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
138 CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
139 IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
140 IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
141 PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
142 channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
143 Dumper dumper = DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
144 .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
145 boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
146 Importer importer = importerUsed.get() ? null
147 : new CDCImporter(channelProgressPairs, 1, 100L, jobItemContext.getSink(), needSorting, taskConfig.getImporterConfig().getRateLimitAlgorithm());
148 PipelineTask incrementalTask = new CDCIncrementalTask(
149 dumperContext.getCommonContext().getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress);
150 jobItemContext.getIncrementalTasks().add(incrementalTask);
151 importerUsed.set(true);
152 }
153 }