View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
23  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
24  import org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
25  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
26  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
27  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
28  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
29  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
30  import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
31  import org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
32  import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
33  import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
34  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
38  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
39  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
40  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
41  import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
42  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
43  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
44  import org.apache.shardingsphere.data.pipeline.core.job.preparer.PipelineJobPreparer;
45  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
46  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
47  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
48  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
49  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
50  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
51  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
52  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
53  import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
54  import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter.InventoryTaskSplitter;
55  import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobOffsetGovernanceRepository;
56  import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
57  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
58  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
59  import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
60  import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
61  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
62  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
63  import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
64  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
65  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
66  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
67  import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
68  import org.apache.shardingsphere.mode.lock.LockContext;
69  import org.apache.shardingsphere.mode.lock.LockDefinition;
70  import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
71  import org.apache.shardingsphere.infra.parser.SQLParserEngine;
72  import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
73  import org.apache.shardingsphere.mode.manager.ContextManager;
74  import org.apache.shardingsphere.parser.rule.SQLParserRule;
75  
76  import java.sql.SQLException;
77  import java.util.Collection;
78  import java.util.Collections;
79  
80  /**
81   * Migration job preparer.
82   */
83  @Slf4j
84  public final class MigrationJobPreparer implements PipelineJobPreparer<MigrationJobItemContext> {
85      
86      private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new MigrationJobType().getYamlJobItemProgressSwapper());
87      
88      @Override
89      public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException {
90          ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
91                  jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
92                  () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration."));
93          DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
94          new PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
95          ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
96          prepareAndCheckTargetWithLock(jobItemContext);
97          ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
98          boolean isIncrementalSupported = DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class, sourceDatabaseType).isPresent();
99          if (isIncrementalSupported) {
100             prepareIncremental(jobItemContext);
101         }
102         initInventoryTasks(jobItemContext);
103         if (isIncrementalSupported) {
104             initIncrementalTasks(jobItemContext);
105             ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
106         }
107         log.info("Prepare job, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
108                 jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
109     }
110     
111     private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) throws SQLException {
112         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
113         String jobId = jobConfig.getJobId();
114         PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
115         ContextManager contextManager = PipelineContextManager.getContext(contextKey).getContextManager();
116         LockContext lockContext = contextManager.getLockContext();
117         if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
118             jobItemManager.persistProgress(jobItemContext);
119         }
120         LockDefinition lockDefinition = new GlobalLockDefinition(new MigrationPrepareLock(jobConfig.getJobId()));
121         long startTimeMillis = System.currentTimeMillis();
122         if (lockContext.tryLock(lockDefinition, 600 * 1000L)) {
123             log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
124             try {
125                 PipelineJobOffsetGovernanceRepository offsetRepository = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
126                 JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
127                 if (!offsetInfo.isTargetSchemaTableCreated()) {
128                     jobItemContext.setStatus(JobStatus.PREPARING);
129                     jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
130                     prepareAndCheckTarget(jobItemContext, contextManager);
131                     offsetRepository.persist(jobId, new JobOffsetInfo(true));
132                 }
133             } finally {
134                 log.info("Unlock, jobId={}, shardingItem={}, cost {} ms.", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
135                 lockContext.unlock(lockDefinition);
136             }
137         } else {
138             log.warn("Lock failed, jobId={}, shardingItem={}.", jobId, jobItemContext.getShardingItem());
139         }
140     }
141     
142     private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext, final ContextManager contextManager) throws SQLException {
143         if (jobItemContext.isSourceTargetDatabaseTheSame()) {
144             prepareTarget(jobItemContext, contextManager);
145         }
146         if (null == jobItemContext.getInitProgress()) {
147             PipelineDataSource targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
148             new PipelineDataSourceCheckEngine(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSources(Collections.singleton(targetDataSource),
149                     jobItemContext.getTaskConfig().getImporterConfig());
150         }
151     }
152     
153     private void prepareTarget(final MigrationJobItemContext jobItemContext, final ContextManager contextManager) throws SQLException {
154         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
155         DatabaseType targetDatabaseType = jobItemContext.getJobConfig().getTargetDatabaseType();
156         Collection<CreateTableConfiguration> createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations();
157         PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
158         PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(targetDatabaseType);
159         preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
160         ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
161         SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(targetDatabaseType);
162         preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, jobConfig.getTargetDatabaseName()));
163     }
164     
165     private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
166         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
167         JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
168         try {
169             DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
170             IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
171             taskConfig.getDumperContext().getCommonContext().setPosition(position);
172         } catch (final SQLException ex) {
173             throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
174         }
175     }
176     
177     private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
178         InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
179         InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig());
180         jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.split(jobItemContext));
181     }
182     
183     private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
184         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
185         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
186         PipelineExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
187         IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
188         PipelineChannel channel = IncrementalChannelCreator.create(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
189         CreateIncrementalDumperParameter param = new CreateIncrementalDumperParameter(
190                 dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager());
191         Dumper dumper = IncrementalDumperCreator.create(param);
192         Collection<Importer> importers = Collections.singletonList(new SingleChannelConsumerImporter(channel, taskConfig.getImporterConfig().getBatchSize(), 1000L,
193                 jobItemContext.getSink(), jobItemContext));
194         PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
195         jobItemContext.getIncrementalTasks().add(incrementalTask);
196     }
197 }