View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc;
19  
20  import lombok.Getter;
21  import lombok.RequiredArgsConstructor;
22  import lombok.extern.slf4j.Slf4j;
23  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
25  import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
26  import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
27  import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
28  import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
29  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
30  import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
31  import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
32  import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
33  import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
34  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
35  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
36  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
37  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
38  import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
39  import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
40  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
41  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
42  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
43  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
44  import org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipelineJob;
45  import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
46  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
47  import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
48  import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
49  import org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
50  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
51  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
52  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
53  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
54  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
55  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
56  import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
57  import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
58  import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
59  import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
60  import org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
61  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
62  
63  import java.util.Collection;
64  import java.util.Map;
65  import java.util.Optional;
66  import java.util.Set;
67  import java.util.stream.Collectors;
68  
69  /**
70   * CDC job.
71   */
72  @Slf4j
73  public final class CDCJob extends AbstractInseparablePipelineJob<CDCJobItemContext> {
74      
75      private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
76      
77      private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new CDCJobType().getYamlJobItemProgressSwapper());
78      
79      private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
80      
81      private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
82      
83      @Getter
84      private final PipelineSink sink;
85      
86      public CDCJob(final PipelineSink sink) {
87          super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
88          this.sink = sink;
89      }
90      
91      @Override
92      protected CDCJobItemContext buildJobItemContext(final PipelineJobConfiguration jobConfig, final int shardingItem) {
93          Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
94          PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
95                  processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "STREAMING"));
96          TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
97          CDCTaskConfiguration taskConfig = buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, jobProcessContext.getProcessConfiguration());
98          return new CDCJobItemContext((CDCJobConfiguration) jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), sink);
99      }
100     
101     private CDCTaskConfiguration buildTaskConfiguration(final CDCJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
102         TableAndSchemaNameMapper mapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
103         IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, mapper);
104         ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, jobConfig.getSchemaTableNames(), mapper);
105         return new CDCTaskConfiguration(dumperContext, importerConfig);
106     }
107     
108     private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper mapper) {
109         JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
110         String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
111         StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
112         DumperCommonContext dumperCommonContext = new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), mapper);
113         return new IncrementalDumperContext(dumperCommonContext, jobConfig.getJobId(), jobConfig.isDecodeWithTX());
114     }
115     
116     private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection<String> schemaTableNames,
117                                                              final TableAndSchemaNameMapper mapper) {
118         PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
119                 jobConfig.getDataSourceConfig().getType(), jobConfig.getDataSourceConfig().getParameter());
120         Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new ShardingColumnsExtractor()
121                 .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
122         PipelineWriteConfiguration write = pipelineProcessConfig.getWrite();
123         JobRateLimitAlgorithm writeRateLimitAlgorithm = null == write.getRateLimiter() ? null
124                 : TypedSPILoader.getService(JobRateLimitAlgorithm.class, write.getRateLimiter().getType(), write.getRateLimiter().getProps());
125         return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
126     }
127     
128     @Override
129     protected PipelineTasksRunner buildTasksRunner(final CDCJobItemContext jobItemContext) {
130         return new CDCTasksRunner(jobItemContext);
131     }
132     
133     @Override
134     protected void doPrepare(final Collection<CDCJobItemContext> jobItemContexts) {
135         jobPreparer.initTasks(jobItemContexts);
136     }
137     
138     @Override
139     protected void processFailed(final String jobId) {
140         jobAPI.disable(jobId);
141     }
142     
143     @Override
144     protected ExecuteCallback buildExecuteCallback(final String identifier, final CDCJobItemContext jobItemContext) {
145         return new CDCExecuteCallback(identifier, jobItemContext);
146     }
147     
148     @RequiredArgsConstructor
149     private final class CDCExecuteCallback implements ExecuteCallback {
150         
151         private final String identifier;
152         
153         private final CDCJobItemContext jobItemContext;
154         
155         @Override
156         public void onSuccess() {
157             if (jobItemContext.isStopping()) {
158                 log.info("onSuccess, stopping true, ignore");
159                 return;
160             }
161             log.info("onSuccess, all {} tasks finished.", identifier);
162         }
163         
164         @Override
165         public void onFailure(final Throwable throwable) {
166             log.error("onFailure, {} task execute failed.", identifier, throwable);
167             String jobId = jobItemContext.getJobId();
168             PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable);
169             if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {
170                 PipelineCDCSocketSink cdcSink = (PipelineCDCSocketSink) jobItemContext.getSink();
171                 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", Optional.ofNullable(throwable.getMessage()).orElse("")));
172             }
173             PipelineJobRegistry.stop(jobId);
174             jobAPI.disable(jobId);
175         }
176     }
177 }