1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
23 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
24 import org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
25 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
26 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
27 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
28 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
29 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
30 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
31 import org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
32 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
33 import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
34 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
38 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
39 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
40 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
41 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
42 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
43 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
44 import org.apache.shardingsphere.data.pipeline.core.job.preparer.PipelineJobPreparer;
45 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
46 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
47 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
48 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
49 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
50 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
51 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
52 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
53 import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
54 import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter.InventoryTaskSplitter;
55 import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobOffsetGovernanceRepository;
56 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
57 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
58 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
59 import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
60 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
61 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
62 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
63 import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
64 import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
65 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
66 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
67 import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
68 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
69 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
70 import org.apache.shardingsphere.mode.manager.ContextManager;
71 import org.apache.shardingsphere.parser.rule.SQLParserRule;
72
73 import java.sql.SQLException;
74 import java.util.Collection;
75 import java.util.Collections;
76
77
78
79
80 @Slf4j
81 public final class MigrationJobPreparer implements PipelineJobPreparer<MigrationJobItemContext> {
82
83 private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new MigrationJobType().getOption().getYamlJobItemProgressSwapper());
84
85 @Override
86 public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException {
87 ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
88 jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
89 () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration."));
90 DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
91 new PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
92 ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
93 prepareAndCheckTargetWithLock(jobItemContext);
94 ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
95 boolean isIncrementalSupported = DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class, sourceDatabaseType).isPresent();
96 if (isIncrementalSupported) {
97 prepareIncremental(jobItemContext);
98 }
99 initInventoryTasks(jobItemContext);
100 if (isIncrementalSupported) {
101 initIncrementalTasks(jobItemContext);
102 ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
103 }
104 log.info("Prepare job, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
105 jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
106 }
107
108 private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) throws SQLException {
109 MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
110 String jobId = jobConfig.getJobId();
111 PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
112 ContextManager contextManager = PipelineContextManager.getContext(contextKey);
113 if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
114 jobItemManager.persistProgress(jobItemContext);
115 }
116 contextManager.getExclusiveOperatorEngine().operate(new MigrationPrepareOperation(jobId), 600 * 1000L, () -> {
117 PipelineJobOffsetGovernanceRepository offsetRepository = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
118 JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
119 if (!offsetInfo.isTargetSchemaTableCreated()) {
120 jobItemContext.setStatus(JobStatus.PREPARING);
121 jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
122 prepareAndCheckTarget(jobItemContext, contextManager);
123 offsetRepository.persist(jobId, new JobOffsetInfo(true));
124 }
125 });
126 }
127
128 private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext, final ContextManager contextManager) throws SQLException {
129 if (jobItemContext.isSourceTargetDatabaseTheSame()) {
130 prepareTarget(jobItemContext, contextManager);
131 }
132 if (null == jobItemContext.getInitProgress()) {
133 PipelineDataSource targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
134 new PipelineDataSourceCheckEngine(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSources(Collections.singleton(targetDataSource),
135 jobItemContext.getTaskConfig().getImporterConfig());
136 }
137 }
138
139 private void prepareTarget(final MigrationJobItemContext jobItemContext, final ContextManager contextManager) throws SQLException {
140 MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
141 DatabaseType targetDatabaseType = jobItemContext.getJobConfig().getTargetDatabaseType();
142 Collection<CreateTableConfiguration> createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations();
143 PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
144 PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(targetDatabaseType);
145 preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
146 ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
147 SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(targetDatabaseType);
148 preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, jobConfig.getTargetDatabaseName()));
149 }
150
151 private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
152 MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
153 JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
154 try {
155 DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
156 IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
157 taskConfig.getDumperContext().getCommonContext().setPosition(position);
158 } catch (final SQLException ex) {
159 throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
160 }
161 }
162
163 private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
164 InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
165 InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig());
166 jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.split(jobItemContext));
167 }
168
169 private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
170 MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
171 IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
172 PipelineExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
173 IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
174 PipelineChannel channel = IncrementalChannelCreator.create(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
175 CreateIncrementalDumperParameter param = new CreateIncrementalDumperParameter(
176 dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager());
177 Dumper dumper = IncrementalDumperCreator.create(param);
178 Collection<Importer> importers = Collections.singletonList(new SingleChannelConsumerImporter(channel, taskConfig.getImporterConfig().getBatchSize(), 1000L,
179 jobItemContext.getSink(), jobItemContext));
180 PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
181 jobItemContext.getIncrementalTasks().add(incrementalTask);
182 }
183 }