View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc;
19  
20  import lombok.Getter;
21  import lombok.RequiredArgsConstructor;
22  import lombok.extern.slf4j.Slf4j;
23  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
25  import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
26  import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
27  import org.apache.shardingsphere.data.pipeline.cdc.config.CDCTaskConfiguration;
28  import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
29  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
30  import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
31  import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
32  import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
33  import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
34  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
35  import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
36  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
37  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
38  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
39  import org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
40  import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
41  import org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
42  import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
43  import org.apache.shardingsphere.data.pipeline.core.importer.PipelineRequiredColumnsExtractor;
44  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
45  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
46  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
47  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
48  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
49  import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
50  import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
51  import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
52  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
53  import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
54  import org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
55  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
56  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
57  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
58  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
59  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
60  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
61  import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
62  import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
63  import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
64  import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
65  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
66  import org.apache.shardingsphere.elasticjob.api.ShardingContext;
67  import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
68  import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
69  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
70  import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
71  
72  import java.util.Collection;
73  import java.util.HashMap;
74  import java.util.LinkedList;
75  import java.util.Map;
76  import java.util.Map.Entry;
77  import java.util.Optional;
78  import java.util.concurrent.CompletableFuture;
79  import java.util.stream.Collectors;
80  
81  /**
82   * CDC job.
83   */
84  @Slf4j
85  public final class CDCJob implements PipelineJob {
86      
87      @Getter
88      private final PipelineJobRunnerManager jobRunnerManager;
89      
90      private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
91      
92      @Getter
93      private final PipelineSink sink;
94      
95      public CDCJob(final PipelineSink sink) {
96          jobRunnerManager = new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink));
97          this.sink = sink;
98      }
99      
100     @Override
101     public void execute(final ShardingContext shardingContext) {
102         String jobId = shardingContext.getJobName();
103         log.info("Execute job {}", jobId);
104         PipelineJobType<?> jobType = PipelineJobIdUtils.parseJobType(jobId);
105         PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
106         CDCJobConfiguration jobConfig = (CDCJobConfiguration) jobType.getOption().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
107         PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
108         TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(
109                 jobId, PipelineProcessConfigurationUtils.fillInDefaultValue(new PipelineProcessConfigurationPersistService().load(contextKey, jobType.getType())));
110         PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
111         Collection<CDCJobItemContext> jobItemContexts = new LinkedList<>();
112         for (int shardingItem = 0; shardingItem < jobConfig.getJobShardingCount(); shardingItem++) {
113             if (jobRunnerManager.isStopping()) {
114                 log.info("Job is stopping, ignore.");
115                 return;
116             }
117             TransmissionJobItemProgress jobItemProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem).orElse(null);
118             CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getProcessConfiguration());
119             CDCJobItemContext jobItemContext = new CDCJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), sink);
120             if (!jobRunnerManager.addTasksRunner(shardingItem, new CDCTasksRunner(jobItemContext))) {
121                 continue;
122             }
123             jobItemContexts.add(jobItemContext);
124             governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId, shardingItem);
125             log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId, shardingItem);
126         }
127         if (jobItemContexts.isEmpty()) {
128             log.warn("Job item contexts are empty, ignore.");
129             return;
130         }
131         initTasks(jobItemContexts, governanceFacade, jobItemManager);
132         executeInventoryTasks(jobItemContexts, jobItemManager);
133         executeIncrementalTasks(jobItemContexts, jobItemManager);
134     }
135     
136     private CDCTaskConfiguration buildTaskConfiguration(final CDCJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
137         TableAndSchemaNameMapper mapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
138         IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, mapper);
139         ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, mapper);
140         return new CDCTaskConfiguration(dumperContext, importerConfig);
141     }
142     
143     private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper mapper) {
144         JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
145         String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
146         StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
147         DumperCommonContext dumperCommonContext = new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), mapper);
148         return new IncrementalDumperContext(dumperCommonContext, jobConfig.getJobId(), jobConfig.isDecodeWithTX());
149     }
150     
151     private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
152                                                              final TableAndSchemaNameMapper mapper) {
153         PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
154                 jobConfig.getDataSourceConfig().getType(), jobConfig.getDataSourceConfig().getParameter());
155         Map<ShardingSphereIdentifier, Collection<String>> tableAndRequiredColumnsMap = getTableAndRequiredColumnsMap(jobConfig);
156         PipelineWriteConfiguration write = pipelineProcessConfig.getWrite();
157         JobRateLimitAlgorithm writeRateLimitAlgorithm = null == write.getRateLimiter()
158                 ? null
159                 : TypedSPILoader.getService(JobRateLimitAlgorithm.class, write.getRateLimiter().getType(), write.getRateLimiter().getProps());
160         return new ImporterConfiguration(dataSourceConfig, tableAndRequiredColumnsMap, mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
161     }
162     
163     @SuppressWarnings({"rawtypes", "unchecked"})
164     private Map<ShardingSphereIdentifier, Collection<String>> getTableAndRequiredColumnsMap(final CDCJobConfiguration jobConfig) {
165         Map<ShardingSphereIdentifier, Collection<String>> result = new HashMap<>();
166         Collection<YamlRuleConfiguration> yamlRuleConfigs = jobConfig.getDataSourceConfig().getRootConfig().getRules();
167         Collection<ShardingSphereIdentifier> targetTableNames = jobConfig.getSchemaTableNames().stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet());
168         for (Entry<YamlRuleConfiguration, PipelineRequiredColumnsExtractor> entry : OrderedSPILoader.getServices(PipelineRequiredColumnsExtractor.class, yamlRuleConfigs).entrySet()) {
169             result.putAll(entry.getValue().getTableAndRequiredColumnsMap(entry.getKey(), targetTableNames));
170         }
171         return result;
172     }
173     
174     private void initTasks(final Collection<CDCJobItemContext> jobItemContexts,
175                            final PipelineGovernanceFacade governanceFacade, final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
176         try {
177             new CDCJobPreparer(jobItemManager).initTasks(jobItemContexts);
178             // CHECKSTYLE:OFF
179         } catch (final RuntimeException ex) {
180             // CHECKSTYLE:ON
181             for (PipelineJobItemContext each : jobItemContexts) {
182                 initTasksFailed(each.getJobId(), each.getShardingItem(), ex, governanceFacade);
183             }
184             throw ex;
185         }
186     }
187     
188     private void initTasksFailed(final String jobId, final int shardingItem, final Exception ex, final PipelineGovernanceFacade governanceFacade) {
189         log.error("Job {}-{} execution failed.", jobId, shardingItem, ex);
190         governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ex);
191         PipelineJobRegistry.stop(jobId);
192         jobAPI.disable(jobId);
193     }
194     
195     private void executeInventoryTasks(final Collection<CDCJobItemContext> jobItemContexts, final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
196         Collection<CompletableFuture<?>> futures = new LinkedList<>();
197         for (CDCJobItemContext each : jobItemContexts) {
198             updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK, jobItemManager);
199             for (PipelineTask task : each.getInventoryTasks()) {
200                 if (task.getTaskProgress().getPosition() instanceof IngestFinishedPosition) {
201                     continue;
202                 }
203                 futures.addAll(task.start());
204             }
205         }
206         if (futures.isEmpty()) {
207             return;
208         }
209         PipelineExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", jobItemContexts.iterator().next()));
210     }
211     
212     private void executeIncrementalTasks(final Collection<CDCJobItemContext> jobItemContexts, final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
213         Collection<CompletableFuture<?>> futures = new LinkedList<>();
214         for (CDCJobItemContext each : jobItemContexts) {
215             updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK, jobItemManager);
216             for (PipelineTask task : each.getIncrementalTasks()) {
217                 if (task.getTaskProgress().getPosition() instanceof IngestFinishedPosition) {
218                     continue;
219                 }
220                 futures.addAll(task.start());
221             }
222         }
223         PipelineExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", jobItemContexts.iterator().next()));
224     }
225     
226     private void updateJobItemStatus(final CDCJobItemContext jobItemContext, final JobStatus jobStatus, final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
227         jobItemContext.setStatus(jobStatus);
228         jobItemManager.updateStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
229     }
230     
231     @RequiredArgsConstructor
232     private class CDCExecuteCallback implements ExecuteCallback {
233         
234         private final String identifier;
235         
236         private final CDCJobItemContext jobItemContext;
237         
238         @Override
239         public void onSuccess() {
240             if (jobItemContext.isStopping()) {
241                 log.info("Job is stopping, ignore.");
242                 return;
243             }
244             log.info("All {} tasks finished successful.", identifier);
245         }
246         
247         @Override
248         public void onFailure(final Throwable throwable) {
249             log.error("Task {} execute failed.", identifier, throwable);
250             String jobId = jobItemContext.getJobId();
251             PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, jobItemContext.getShardingItem(), throwable);
252             if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {
253                 PipelineCDCSocketSink cdcSink = (PipelineCDCSocketSink) jobItemContext.getSink();
254                 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", "", Optional.ofNullable(throwable.getMessage()).orElse("")));
255             }
256             PipelineJobRegistry.stop(jobId);
257             jobAPI.disable(jobId);
258         }
259     }
260 }