View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
22  import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
23  import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
24  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
25  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
26  import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
27  import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
28  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
29  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
30  import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
31  import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
32  import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
33  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
34  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
38  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
39  import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
40  import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
41  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
42  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
43  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
44  import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
45  import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
46  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
47  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
48  import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
49  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
50  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
51  
52  import java.sql.SQLException;
53  import java.util.Collection;
54  import java.util.List;
55  import java.util.Optional;
56  import java.util.concurrent.CopyOnWriteArrayList;
57  import java.util.concurrent.atomic.AtomicBoolean;
58  import java.util.concurrent.atomic.AtomicReference;
59  
60  /**
61   * CDC job preparer.
62   */
63  @Slf4j
64  public final class CDCJobPreparer {
65      
66      private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new CDCJobType().getYamlJobItemProgressSwapper());
67      
68      /**
69       * Do prepare work.
70       *
71       * @param jobItemContexts job item contexts
72       */
73      public void initTasks(final Collection<CDCJobItemContext> jobItemContexts) {
74          // TODO Use pipeline tree to build it
75          AtomicBoolean inventoryImporterUsed = new AtomicBoolean();
76          List<CDCChannelProgressPair> inventoryChannelProgressPairs = new CopyOnWriteArrayList<>();
77          AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
78          List<CDCChannelProgressPair> incrementalChannelProgressPairs = new CopyOnWriteArrayList<>();
79          for (CDCJobItemContext each : jobItemContexts) {
80              initTasks0(each, inventoryImporterUsed, inventoryChannelProgressPairs, incrementalImporterUsed, incrementalChannelProgressPairs);
81          }
82      }
83      
84      private void initTasks0(final CDCJobItemContext jobItemContext, final AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> inventoryChannelProgressPairs,
85                              final AtomicBoolean incrementalImporterUsed, final List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
86          Optional<TransmissionJobItemProgress> jobItemProgress = jobItemManager.getProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
87          if (!jobItemProgress.isPresent()) {
88              jobItemManager.persistProgress(jobItemContext);
89          }
90          if (jobItemContext.isStopping()) {
91              PipelineJobRegistry.stop(jobItemContext.getJobId());
92              return;
93          }
94          initIncrementalPosition(jobItemContext);
95          if (jobItemContext.getJobConfig().isFull()) {
96              initInventoryTasks(jobItemContext, inventoryImporterUsed, inventoryChannelProgressPairs);
97          }
98          initIncrementalTask(jobItemContext, incrementalImporterUsed, incrementalChannelProgressPairs);
99      }
100     
101     private void initIncrementalPosition(final CDCJobItemContext jobItemContext) {
102         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
103         JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
104         try {
105             DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
106             IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
107             taskConfig.getDumperContext().getCommonContext().setPosition(position);
108         } catch (final SQLException ex) {
109             throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
110         }
111     }
112     
113     private void initInventoryTasks(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
114         long startTimeMillis = System.currentTimeMillis();
115         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
116         ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
117         TransmissionProcessContext processContext = jobItemContext.getJobProcessContext();
118         for (InventoryDumperContext each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()), importerConfig)
119                 .splitInventoryDumperContext(jobItemContext)) {
120             AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
121             PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfiguration().getStreamChannel(), importerConfig.getBatchSize(), position);
122             if (!(position.get() instanceof IngestFinishedPosition)) {
123                 channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
124             }
125             Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
126             Importer importer = importerUsed.get() ? null
127                     : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 100L, jobItemContext.getSink(), false, importerConfig.getRateLimitAlgorithm());
128             jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
129                     processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
130             if (!(position.get() instanceof IngestFinishedPosition)) {
131                 importerUsed.set(true);
132             }
133         }
134         log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - startTimeMillis);
135     }
136     
137     private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
138         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
139         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
140         IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
141         PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
142         channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
143         Dumper dumper = DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
144                 .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
145         boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
146         Importer importer = importerUsed.get() ? null
147                 : new CDCImporter(channelProgressPairs, 1, 100L, jobItemContext.getSink(), needSorting, taskConfig.getImporterConfig().getRateLimitAlgorithm());
148         PipelineTask incrementalTask = new CDCIncrementalTask(
149                 dumperContext.getCommonContext().getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress);
150         jobItemContext.getIncrementalTasks().add(incrementalTask);
151         importerUsed.set(true);
152     }
153 }