1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
23 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
24 import org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
25 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
26 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
27 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
28 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
29 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
30 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
31 import org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
32 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
33 import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
34 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
38 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
39 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
40 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
41 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
42 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
43 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
44 import org.apache.shardingsphere.data.pipeline.core.job.preparer.PipelineJobPreparer;
45 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
46 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
47 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
48 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
49 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
50 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
51 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
52 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
53 import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
54 import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter.InventoryTaskSplitter;
55 import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobOffsetGovernanceRepository;
56 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
57 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
58 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
59 import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
60 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
61 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
62 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
63 import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
64 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
65 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
66 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
67 import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
68 import org.apache.shardingsphere.mode.lock.LockContext;
69 import org.apache.shardingsphere.mode.lock.LockDefinition;
70 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
71 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
72 import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
73 import org.apache.shardingsphere.mode.manager.ContextManager;
74 import org.apache.shardingsphere.parser.rule.SQLParserRule;
75
76 import java.sql.SQLException;
77 import java.util.Collection;
78 import java.util.Collections;
79
80
81
82
83 @Slf4j
84 public final class MigrationJobPreparer implements PipelineJobPreparer<MigrationJobItemContext> {
85
86 private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new MigrationJobType().getYamlJobItemProgressSwapper());
87
88 @Override
89 public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException {
90 ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
91 jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
92 () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration."));
93 DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
94 new PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
95 ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
96 prepareAndCheckTargetWithLock(jobItemContext);
97 ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
98 boolean isIncrementalSupported = DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class, sourceDatabaseType).isPresent();
99 if (isIncrementalSupported) {
100 prepareIncremental(jobItemContext);
101 }
102 initInventoryTasks(jobItemContext);
103 if (isIncrementalSupported) {
104 initIncrementalTasks(jobItemContext);
105 ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
106 }
107 log.info("Prepare job, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
108 jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
109 }
110
111 private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) throws SQLException {
112 MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
113 String jobId = jobConfig.getJobId();
114 PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
115 ContextManager contextManager = PipelineContextManager.getContext(contextKey).getContextManager();
116 LockContext lockContext = contextManager.getLockContext();
117 if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
118 jobItemManager.persistProgress(jobItemContext);
119 }
120 LockDefinition lockDefinition = new GlobalLockDefinition(new MigrationPrepareLock(jobConfig.getJobId()));
121 long startTimeMillis = System.currentTimeMillis();
122 if (lockContext.tryLock(lockDefinition, 600 * 1000L)) {
123 log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
124 try {
125 PipelineJobOffsetGovernanceRepository offsetRepository = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
126 JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
127 if (!offsetInfo.isTargetSchemaTableCreated()) {
128 jobItemContext.setStatus(JobStatus.PREPARING);
129 jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
130 prepareAndCheckTarget(jobItemContext, contextManager);
131 offsetRepository.persist(jobId, new JobOffsetInfo(true));
132 }
133 } finally {
134 log.info("Unlock, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
135 lockContext.unlock(lockDefinition);
136 }
137 } else {
138 log.warn("Lock failed, jobId={}, shardingItem={}.", jobId, jobItemContext.getShardingItem());
139 }
140 }
141
142 private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext, final ContextManager contextManager) throws SQLException {
143 if (jobItemContext.isSourceTargetDatabaseTheSame()) {
144 prepareTarget(jobItemContext, contextManager);
145 }
146 if (null == jobItemContext.getInitProgress()) {
147 PipelineDataSource targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
148 new PipelineDataSourceCheckEngine(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSources(Collections.singleton(targetDataSource),
149 jobItemContext.getTaskConfig().getImporterConfig());
150 }
151 }
152
153 private void prepareTarget(final MigrationJobItemContext jobItemContext, final ContextManager contextManager) throws SQLException {
154 MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
155 DatabaseType targetDatabaseType = jobItemContext.getJobConfig().getTargetDatabaseType();
156 Collection<CreateTableConfiguration> createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations();
157 PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
158 PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(targetDatabaseType);
159 preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
160 ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
161 SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(targetDatabaseType);
162 preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, jobConfig.getTargetDatabaseName()));
163 }
164
165 private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
166 MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
167 JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
168 try {
169 DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
170 IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
171 taskConfig.getDumperContext().getCommonContext().setPosition(position);
172 } catch (final SQLException ex) {
173 throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
174 }
175 }
176
177 private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
178 InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
179 InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig());
180 jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.split(jobItemContext));
181 }
182
183 private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
184 MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
185 IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
186 PipelineExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
187 IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
188 PipelineChannel channel = IncrementalChannelCreator.create(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
189 CreateIncrementalDumperParameter param = new CreateIncrementalDumperParameter(
190 dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager());
191 Dumper dumper = IncrementalDumperCreator.create(param);
192 Collection<Importer> importers = Collections.singletonList(new SingleChannelConsumerImporter(channel, taskConfig.getImporterConfig().getBatchSize(), 1000L,
193 jobItemContext.getSink(), jobItemContext));
194 PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
195 jobItemContext.getIncrementalTasks().add(incrementalTask);
196 }
197 }