1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.scenario.migration;
19
20 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
21 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
22 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
23 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
24 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
25 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobTarget;
26 import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
27 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
28 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
29
30 import java.util.Collection;
31 import java.util.LinkedList;
32
33
34
35
36 public final class MigrationJobType implements PipelineJobType<MigrationJobConfiguration> {
37
38 @Override
39 public PipelineJobOption getOption() {
40 return new PipelineJobOption("01", MigrationJob.class, true, new YamlMigrationJobConfigurationSwapper(), false, "CONSISTENCY_CHECK", "CONSISTENCY_CHECK", false);
41 }
42
43 @Override
44 public PipelineJobTarget getJobTarget(final MigrationJobConfiguration jobConfig) {
45 Collection<String> sourceTables = new LinkedList<>();
46 jobConfig.getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(dataNode.format()))));
47 return new PipelineJobTarget(null, String.join(",", sourceTables));
48 }
49
50 @Override
51 public PipelineDataConsistencyChecker buildDataConsistencyChecker(final MigrationJobConfiguration jobConfig,
52 final TransmissionProcessContext processContext, final ConsistencyCheckJobItemProgressContext progressContext) {
53 return new MigrationDataConsistencyChecker(jobConfig, processContext, progressContext);
54 }
55
56 @Override
57 public String getType() {
58 return "MIGRATION";
59 }
60 }