1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc;
19
20 import lombok.Getter;
21 import lombok.RequiredArgsConstructor;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
25 import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
26 import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
27 import org.apache.shardingsphere.data.pipeline.cdc.config.CDCTaskConfiguration;
28 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
29 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
30 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
31 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
32 import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
33 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
34 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
35 import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
36 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
37 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
38 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
39 import org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
40 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
41 import org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
42 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
43 import org.apache.shardingsphere.data.pipeline.core.importer.PipelineRequiredColumnsExtractor;
44 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
45 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
46 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
47 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
48 import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
49 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
50 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
51 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
52 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
53 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
54 import org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
55 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
56 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
57 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
58 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
59 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
60 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
61 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
62 import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
63 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
64 import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
65 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
66 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
67 import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
68 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
69 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
70 import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
71
72 import java.util.Collection;
73 import java.util.HashMap;
74 import java.util.LinkedList;
75 import java.util.Map;
76 import java.util.Map.Entry;
77 import java.util.Optional;
78 import java.util.concurrent.CompletableFuture;
79 import java.util.stream.Collectors;
80
81
82
83
84 @Slf4j
85 public final class CDCJob implements PipelineJob {
86
87 @Getter
88 private final PipelineJobRunnerManager jobRunnerManager;
89
90 private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
91
92 @Getter
93 private final PipelineSink sink;
94
95 public CDCJob(final PipelineSink sink) {
96 jobRunnerManager = new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink));
97 this.sink = sink;
98 }
99
100 @Override
101 public void execute(final ShardingContext shardingContext) {
102 String jobId = shardingContext.getJobName();
103 log.info("Execute job {}", jobId);
104 PipelineJobType<?> jobType = PipelineJobIdUtils.parseJobType(jobId);
105 PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
106 CDCJobConfiguration jobConfig = (CDCJobConfiguration) jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
107 PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
108 TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(
109 jobId, PipelineProcessConfigurationUtils.fillInDefaultValue(new PipelineProcessConfigurationPersistService().load(contextKey, jobType.getType())));
110 PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
111 Collection<CDCJobItemContext> jobItemContexts = new LinkedList<>();
112 for (int shardingItem = 0; shardingItem < jobConfig.getJobShardingCount(); shardingItem++) {
113 if (jobRunnerManager.isStopping()) {
114 log.info("Job is stopping, ignore.");
115 return;
116 }
117 TransmissionJobItemProgress jobItemProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem).orElse(null);
118 CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getProcessConfiguration());
119 CDCJobItemContext jobItemContext = new CDCJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), sink);
120 if (!jobRunnerManager.addTasksRunner(shardingItem, new CDCTasksRunner(jobItemContext))) {
121 continue;
122 }
123 jobItemContexts.add(jobItemContext);
124 governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId, shardingItem);
125 log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId, shardingItem);
126 }
127 if (jobItemContexts.isEmpty()) {
128 log.warn("Job item contexts are empty, ignore.");
129 return;
130 }
131 initTasks(jobItemContexts, governanceFacade, jobItemManager);
132 executeInventoryTasks(jobItemContexts, jobItemManager);
133 executeIncrementalTasks(jobItemContexts, jobItemManager);
134 }
135
136 private CDCTaskConfiguration buildTaskConfiguration(final CDCJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
137 TableAndSchemaNameMapper mapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
138 IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, mapper);
139 ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, mapper);
140 return new CDCTaskConfiguration(dumperContext, importerConfig);
141 }
142
143 private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper mapper) {
144 JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
145 String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
146 StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
147 DumperCommonContext dumperCommonContext = new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), mapper);
148 return new IncrementalDumperContext(dumperCommonContext, jobConfig.getJobId(), jobConfig.isDecodeWithTX());
149 }
150
151 private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
152 final TableAndSchemaNameMapper mapper) {
153 PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
154 jobConfig.getDataSourceConfig().getType(), jobConfig.getDataSourceConfig().getParameter());
155 Map<ShardingSphereIdentifier, Collection<String>> tableAndRequiredColumnsMap = getTableAndRequiredColumnsMap(jobConfig);
156 PipelineWriteConfiguration write = pipelineProcessConfig.getWrite();
157 JobRateLimitAlgorithm writeRateLimitAlgorithm = null == write.getRateLimiter()
158 ? null
159 : TypedSPILoader.getService(JobRateLimitAlgorithm.class, write.getRateLimiter().getType(), write.getRateLimiter().getProps());
160 return new ImporterConfiguration(dataSourceConfig, tableAndRequiredColumnsMap, mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
161 }
162
163 @SuppressWarnings({"rawtypes", "unchecked"})
164 private Map<ShardingSphereIdentifier, Collection<String>> getTableAndRequiredColumnsMap(final CDCJobConfiguration jobConfig) {
165 Map<ShardingSphereIdentifier, Collection<String>> result = new HashMap<>();
166 Collection<YamlRuleConfiguration> yamlRuleConfigs = jobConfig.getDataSourceConfig().getRootConfig().getRules();
167 Collection<ShardingSphereIdentifier> targetTableNames = jobConfig.getSchemaTableNames().stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet());
168 for (Entry<YamlRuleConfiguration, PipelineRequiredColumnsExtractor> entry : OrderedSPILoader.getServices(PipelineRequiredColumnsExtractor.class, yamlRuleConfigs).entrySet()) {
169 result.putAll(entry.getValue().getTableAndRequiredColumnsMap(entry.getKey(), targetTableNames));
170 }
171 return result;
172 }
173
174 private void initTasks(final Collection<CDCJobItemContext> jobItemContexts,
175 final PipelineGovernanceFacade governanceFacade, final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
176 try {
177 new CDCJobPreparer(jobItemManager).initTasks(jobItemContexts);
178
179 } catch (final RuntimeException ex) {
180
181 for (PipelineJobItemContext each : jobItemContexts) {
182 initTasksFailed(each.getJobId(), each.getShardingItem(), ex, governanceFacade);
183 }
184 throw ex;
185 }
186 }
187
188 private void initTasksFailed(final String jobId, final int shardingItem, final Exception ex, final PipelineGovernanceFacade governanceFacade) {
189 log.error("Job {}-{} execution failed.", jobId, shardingItem, ex);
190 governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ex);
191 PipelineJobRegistry.stop(jobId);
192 jobAPI.disable(jobId);
193 }
194
195 private void executeInventoryTasks(final Collection<CDCJobItemContext> jobItemContexts, final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
196 Collection<CompletableFuture<?>> futures = new LinkedList<>();
197 for (CDCJobItemContext each : jobItemContexts) {
198 updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK, jobItemManager);
199 for (PipelineTask task : each.getInventoryTasks()) {
200 if (task.getTaskProgress().getPosition() instanceof IngestFinishedPosition) {
201 continue;
202 }
203 futures.addAll(task.start());
204 }
205 }
206 if (futures.isEmpty()) {
207 return;
208 }
209 PipelineExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", jobItemContexts.iterator().next()));
210 }
211
212 private void executeIncrementalTasks(final Collection<CDCJobItemContext> jobItemContexts, final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
213 Collection<CompletableFuture<?>> futures = new LinkedList<>();
214 for (CDCJobItemContext each : jobItemContexts) {
215 updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK, jobItemManager);
216 for (PipelineTask task : each.getIncrementalTasks()) {
217 if (task.getTaskProgress().getPosition() instanceof IngestFinishedPosition) {
218 continue;
219 }
220 futures.addAll(task.start());
221 }
222 }
223 PipelineExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", jobItemContexts.iterator().next()));
224 }
225
226 private void updateJobItemStatus(final CDCJobItemContext jobItemContext, final JobStatus jobStatus, final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
227 jobItemContext.setStatus(jobStatus);
228 jobItemManager.updateStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
229 }
230
231 @RequiredArgsConstructor
232 private class CDCExecuteCallback implements ExecuteCallback {
233
234 private final String identifier;
235
236 private final CDCJobItemContext jobItemContext;
237
238 @Override
239 public void onSuccess() {
240 if (jobItemContext.isStopping()) {
241 log.info("Job is stopping, ignore.");
242 return;
243 }
244 log.info("All {} tasks finished successful.", identifier);
245 }
246
247 @Override
248 public void onFailure(final Throwable throwable) {
249 log.error("Task {} execute failed.", identifier, throwable);
250 String jobId = jobItemContext.getJobId();
251 PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable);
252 if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {
253 PipelineCDCSocketSink cdcSink = (PipelineCDCSocketSink) jobItemContext.getSink();
254 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", Optional.ofNullable(throwable.getMessage()).orElse("")));
255 }
256 PipelineJobRegistry.stop(jobId);
257 jobAPI.disable(jobId);
258 }
259 }
260 }