View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
19  
20  import lombok.RequiredArgsConstructor;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.cdc.config.CDCTaskConfiguration;
23  import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
24  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
25  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
26  import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
27  import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
28  import org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
29  import org.apache.shardingsphere.data.pipeline.core.channel.InventoryChannelCreator;
30  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
31  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
32  import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
33  import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
34  import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
38  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
39  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
40  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
41  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
42  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.PlaceholderInventoryDataRecordPositionCreator;
43  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.UniqueKeyInventoryDataRecordPositionCreator;
44  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
45  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
46  import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
47  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
48  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
49  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
50  import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
51  import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter.InventoryDumperContextSplitter;
52  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
53  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
54  import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
55  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
56  
57  import java.sql.SQLException;
58  import java.util.Collection;
59  import java.util.List;
60  import java.util.Optional;
61  import java.util.concurrent.CopyOnWriteArrayList;
62  import java.util.concurrent.atomic.AtomicBoolean;
63  import java.util.concurrent.atomic.AtomicReference;
64  
65  /**
66   * CDC job preparer.
67   */
68  @RequiredArgsConstructor
69  @Slf4j
70  public final class CDCJobPreparer {
71      
72      private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager;
73      
74      /**
75       * Init tasks.
76       *
77       * @param jobItemContexts job item contexts
78       */
79      public void initTasks(final Collection<CDCJobItemContext> jobItemContexts) {
80          // TODO Use DAG to build it
81          AtomicBoolean inventoryImporterUsed = new AtomicBoolean();
82          List<CDCChannelProgressPair> inventoryChannelProgressPairs = new CopyOnWriteArrayList<>();
83          AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
84          List<CDCChannelProgressPair> incrementalChannelProgressPairs = new CopyOnWriteArrayList<>();
85          for (CDCJobItemContext each : jobItemContexts) {
86              initTasks(each, inventoryImporterUsed, inventoryChannelProgressPairs, incrementalImporterUsed, incrementalChannelProgressPairs);
87          }
88      }
89      
90      private void initTasks(final CDCJobItemContext jobItemContext, final AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> inventoryChannelProgressPairs,
91                             final AtomicBoolean incrementalImporterUsed, final List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
92          Optional<TransmissionJobItemProgress> jobItemProgress = jobItemManager.getProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
93          if (!jobItemProgress.isPresent()) {
94              jobItemManager.persistProgress(jobItemContext);
95          }
96          if (jobItemContext.isStopping()) {
97              PipelineJobRegistry.stop(jobItemContext.getJobId());
98              return;
99          }
100         initIncrementalPosition(jobItemContext);
101         if (jobItemContext.getJobConfig().isFull()) {
102             initInventoryTasks(jobItemContext, inventoryImporterUsed, inventoryChannelProgressPairs);
103         }
104         initIncrementalTask(jobItemContext, incrementalImporterUsed, incrementalChannelProgressPairs);
105     }
106     
107     private void initIncrementalPosition(final CDCJobItemContext jobItemContext) {
108         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
109         JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
110         try {
111             DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
112             IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
113             taskConfig.getDumperContext().getCommonContext().setPosition(position);
114         } catch (final SQLException ex) {
115             throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
116         }
117     }
118     
119     private void initInventoryTasks(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
120         long startTimeMillis = System.currentTimeMillis();
121         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
122         ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
123         TransmissionProcessContext processContext = jobItemContext.getJobProcessContext();
124         InventoryDumperContextSplitter dumperContextSplitter = new InventoryDumperContextSplitter(
125                 jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()));
126         for (InventoryDumperContext each : dumperContextSplitter.split(jobItemContext)) {
127             AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
128             PipelineChannel channel = InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(), importerConfig.getBatchSize(), position);
129             if (!(position.get() instanceof IngestFinishedPosition)) {
130                 channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
131             }
132             InventoryDataRecordPositionCreator positionCreator = each.hasUniqueKey() ? new UniqueKeyInventoryDataRecordPositionCreator() : new PlaceholderInventoryDataRecordPositionCreator();
133             Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader(), positionCreator);
134             Importer importer = importerUsed.get() ? null
135                     : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 100L, jobItemContext.getSink(), false, importerConfig.getRateLimitAlgorithm());
136             jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
137                     processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
138             if (!(position.get() instanceof IngestFinishedPosition)) {
139                 importerUsed.set(true);
140             }
141         }
142         log.info("Init inventory tasks cost {} ms", System.currentTimeMillis() - startTimeMillis);
143     }
144     
145     private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
146         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
147         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
148         IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
149         PipelineChannel channel = IncrementalChannelCreator.create(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
150         channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
151         CreateIncrementalDumperParameter param = new CreateIncrementalDumperParameter(
152                 dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager());
153         Dumper dumper = IncrementalDumperCreator.create(param);
154         boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
155         Importer importer = importerUsed.get() ? null
156                 : new CDCImporter(channelProgressPairs, 1, 100L, jobItemContext.getSink(), needSorting, taskConfig.getImporterConfig().getRateLimitAlgorithm());
157         PipelineTask incrementalTask = new CDCIncrementalTask(
158                 dumperContext.getCommonContext().getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress);
159         jobItemContext.getIncrementalTasks().add(incrementalTask);
160         importerUsed.set(true);
161     }
162 }