1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
23 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
24 import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
25 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
26 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
27 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
28 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
29 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
30 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
31 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
32 import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
33 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
34 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
35 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
36 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
37 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
38 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
39 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
40 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
41 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
42 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
43 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
44 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
45 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
46 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
47 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
48 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
49 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
50 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
51 import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
52 import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
53 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
54 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
55 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
56 import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
57 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
58 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
59 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
60 import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
61 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
62 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
63 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
64 import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
65 import org.apache.shardingsphere.infra.lock.GlobalLockNames;
66 import org.apache.shardingsphere.infra.lock.LockContext;
67 import org.apache.shardingsphere.infra.lock.LockDefinition;
68 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
69 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
70 import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
71 import org.apache.shardingsphere.parser.rule.SQLParserRule;
72
73 import java.sql.SQLException;
74 import java.util.Collection;
75 import java.util.Collections;
76 import java.util.Map.Entry;
77
78
79
80
81 @Slf4j
82 public final class MigrationJobPreparer {
83
84 private final MigrationJobType jobType = new MigrationJobType();
85
86 private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
87
88
89
90
91
92
93
94
95 public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException, PipelineJobCancelingException {
96 ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
97 jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
98 () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration"));
99 DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
100 new DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
101 if (jobItemContext.isStopping()) {
102 throw new PipelineJobCancelingException();
103 }
104 prepareAndCheckTargetWithLock(jobItemContext);
105 if (jobItemContext.isStopping()) {
106 throw new PipelineJobCancelingException();
107 }
108 boolean isIncrementalSupported = DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class, sourceDatabaseType).isPresent();
109 if (isIncrementalSupported) {
110 prepareIncremental(jobItemContext);
111 }
112 initInventoryTasks(jobItemContext);
113 if (isIncrementalSupported) {
114 initIncrementalTasks(jobItemContext);
115 if (jobItemContext.isStopping()) {
116 throw new PipelineJobCancelingException();
117 }
118 }
119 log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
120 jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
121 }
122
123 @SuppressWarnings({"unchecked", "rawtypes"})
124 private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) throws SQLException {
125 MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
126 String jobId = jobConfig.getJobId();
127 LockContext lockContext = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext();
128 if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
129 jobItemManager.persistProgress(jobItemContext);
130 }
131 LockDefinition lockDefinition = new GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(), jobConfig.getJobId()));
132 long startTimeMillis = System.currentTimeMillis();
133 if (lockContext.tryLock(lockDefinition, 600000)) {
134 log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
135 try {
136 JobOffsetInfo offsetInfo = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().load(jobId);
137 if (!offsetInfo.isTargetSchemaTableCreated()) {
138 jobItemContext.setStatus(JobStatus.PREPARING);
139 jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
140 prepareAndCheckTarget(jobItemContext);
141 PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId, new JobOffsetInfo(true));
142 }
143 } finally {
144 log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
145 lockContext.unlock(lockDefinition);
146 }
147 } else {
148 log.warn("try lock failed, jobId={}, shardingItem={}", jobId, jobItemContext.getShardingItem());
149 }
150 }
151
152 private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext) throws SQLException {
153 DatabaseType targetDatabaseType = jobItemContext.getJobConfig().getTargetDatabaseType();
154 if (jobItemContext.isSourceTargetDatabaseTheSame()) {
155 prepareTarget(jobItemContext, targetDatabaseType);
156 }
157 if (null == jobItemContext.getInitProgress()) {
158 PipelineDataSourceWrapper targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
159 new DataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource), jobItemContext.getTaskConfig().getImporterConfig());
160 }
161 }
162
163 private void prepareTarget(final MigrationJobItemContext jobItemContext, final DatabaseType targetDatabaseType) throws SQLException {
164 MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
165 Collection<CreateTableConfiguration> createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations();
166 PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
167 PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class, targetDatabaseType));
168 preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
169 ShardingSphereMetaData metaData = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
170 SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
171 .getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
172 preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine));
173 }
174
175 private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
176 MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
177 JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
178 try {
179 DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
180 IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
181 taskConfig.getDumperContext().getCommonContext().setPosition(position);
182 } catch (final SQLException ex) {
183 throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
184 }
185 }
186
187 private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
188 InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
189 InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig());
190 jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
191 }
192
193 private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
194 MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
195 PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
196 IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
197 ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
198 IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
199 PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
200 Dumper dumper = DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
201 .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
202 Collection<Importer> importers = Collections.singletonList(new SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(), jobItemContext));
203 PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
204 jobItemContext.getIncrementalTasks().add(incrementalTask);
205 }
206
207
208
209
210
211
212 public void cleanup(final MigrationJobConfiguration jobConfig) {
213 for (Entry<String, PipelineDataSourceConfiguration> entry : jobConfig.getSources().entrySet()) {
214 try {
215 new IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(), entry.getValue());
216 } catch (final SQLException ex) {
217 log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex);
218 }
219 }
220 }
221 }