1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc;
19
20 import lombok.Getter;
21 import lombok.RequiredArgsConstructor;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
25 import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
26 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
27 import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
28 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
29 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
30 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
31 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
32 import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
33 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
34 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
35 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
36 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
37 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
38 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
39 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
40 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
41 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
42 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
43 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
44 import org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipelineJob;
45 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
46 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
47 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
48 import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
49 import org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
50 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
51 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
52 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
53 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
54 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
55 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
56 import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
57 import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
58 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
59 import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
60 import org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
61 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
62
63 import java.util.Collection;
64 import java.util.Map;
65 import java.util.Optional;
66 import java.util.Set;
67 import java.util.stream.Collectors;
68
69
70
71
72 @Slf4j
73 public final class CDCJob extends AbstractInseparablePipelineJob<CDCJobItemContext> {
74
75 private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
76
77 private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new CDCJobType().getYamlJobItemProgressSwapper());
78
79 private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
80
81 private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
82
83 @Getter
84 private final PipelineSink sink;
85
86 public CDCJob(final PipelineSink sink) {
87 super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
88 this.sink = sink;
89 }
90
91 @Override
92 protected CDCJobItemContext buildJobItemContext(final PipelineJobConfiguration jobConfig, final int shardingItem) {
93 Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
94 PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
95 processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "STREAMING"));
96 TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
97 CDCTaskConfiguration taskConfig = buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, jobProcessContext.getProcessConfiguration());
98 return new CDCJobItemContext((CDCJobConfiguration) jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), sink);
99 }
100
101 private CDCTaskConfiguration buildTaskConfiguration(final CDCJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
102 TableAndSchemaNameMapper mapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
103 IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, mapper);
104 ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, jobConfig.getSchemaTableNames(), mapper);
105 return new CDCTaskConfiguration(dumperContext, importerConfig);
106 }
107
108 private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper mapper) {
109 JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
110 String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
111 StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
112 DumperCommonContext dumperCommonContext = new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), mapper);
113 return new IncrementalDumperContext(dumperCommonContext, jobConfig.getJobId(), jobConfig.isDecodeWithTX());
114 }
115
116 private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection<String> schemaTableNames,
117 final TableAndSchemaNameMapper mapper) {
118 PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
119 jobConfig.getDataSourceConfig().getType(), jobConfig.getDataSourceConfig().getParameter());
120 Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new ShardingColumnsExtractor()
121 .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
122 PipelineWriteConfiguration write = pipelineProcessConfig.getWrite();
123 JobRateLimitAlgorithm writeRateLimitAlgorithm = null == write.getRateLimiter() ? null
124 : TypedSPILoader.getService(JobRateLimitAlgorithm.class, write.getRateLimiter().getType(), write.getRateLimiter().getProps());
125 return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
126 }
127
128 @Override
129 protected PipelineTasksRunner buildTasksRunner(final CDCJobItemContext jobItemContext) {
130 return new CDCTasksRunner(jobItemContext);
131 }
132
133 @Override
134 protected void doPrepare(final Collection<CDCJobItemContext> jobItemContexts) {
135 jobPreparer.initTasks(jobItemContexts);
136 }
137
138 @Override
139 protected void processFailed(final String jobId) {
140 jobAPI.disable(jobId);
141 }
142
143 @Override
144 protected ExecuteCallback buildExecuteCallback(final String identifier, final CDCJobItemContext jobItemContext) {
145 return new CDCExecuteCallback(identifier, jobItemContext);
146 }
147
148 @RequiredArgsConstructor
149 private final class CDCExecuteCallback implements ExecuteCallback {
150
151 private final String identifier;
152
153 private final CDCJobItemContext jobItemContext;
154
155 @Override
156 public void onSuccess() {
157 if (jobItemContext.isStopping()) {
158 log.info("onSuccess, stopping true, ignore");
159 return;
160 }
161 log.info("onSuccess, all {} tasks finished.", identifier);
162 }
163
164 @Override
165 public void onFailure(final Throwable throwable) {
166 log.error("onFailure, {} task execute failed.", identifier, throwable);
167 String jobId = jobItemContext.getJobId();
168 PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable);
169 if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {
170 PipelineCDCSocketSink cdcSink = (PipelineCDCSocketSink) jobItemContext.getSink();
171 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", Optional.ofNullable(throwable.getMessage()).orElse("")));
172 }
173 PipelineJobRegistry.stop(jobId);
174 jobAPI.disable(jobId);
175 }
176 }
177 }