View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
23  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
24  import org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
25  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
26  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
27  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
28  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
29  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
30  import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
31  import org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
32  import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
33  import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
34  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.CreateIncrementalDumperParameter;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
38  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
39  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
40  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
41  import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
42  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
43  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
44  import org.apache.shardingsphere.data.pipeline.core.job.preparer.PipelineJobPreparer;
45  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
46  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
47  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
48  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
49  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
50  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
51  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
52  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
53  import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
54  import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.splitter.InventoryTaskSplitter;
55  import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.job.PipelineJobOffsetGovernanceRepository;
56  import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
57  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
58  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
59  import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
60  import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
61  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
62  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
63  import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
64  import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
65  import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
66  import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
67  import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
68  import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
69  import org.apache.shardingsphere.infra.parser.SQLParserEngine;
70  import org.apache.shardingsphere.mode.manager.ContextManager;
71  import org.apache.shardingsphere.parser.rule.SQLParserRule;
72  
73  import java.sql.SQLException;
74  import java.util.Collection;
75  import java.util.Collections;
76  
77  /**
78   * Migration job preparer.
79   */
80  @Slf4j
81  public final class MigrationJobPreparer implements PipelineJobPreparer<MigrationJobItemContext> {
82      
83      private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new MigrationJobType().getOption().getYamlJobItemProgressSwapper());
84      
85      @Override
86      public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException {
87          ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
88                  jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
89                  () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration."));
90          DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
91          new PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
92          ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
93          prepareAndCheckTargetWithLock(jobItemContext);
94          ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
95          boolean isIncrementalSupported = DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class, sourceDatabaseType).isPresent();
96          if (isIncrementalSupported) {
97              prepareIncremental(jobItemContext);
98          }
99          initInventoryTasks(jobItemContext);
100         if (isIncrementalSupported) {
101             initIncrementalTasks(jobItemContext);
102             ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
103         }
104         log.info("Prepare job, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
105                 jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
106     }
107     
108     private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) throws SQLException {
109         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
110         String jobId = jobConfig.getJobId();
111         PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
112         ContextManager contextManager = PipelineContextManager.getContext(contextKey);
113         if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
114             jobItemManager.persistProgress(jobItemContext);
115         }
116         contextManager.getExclusiveOperatorEngine().operate(new MigrationPrepareOperation(jobId), 600 * 1000L, () -> {
117             PipelineJobOffsetGovernanceRepository offsetRepository = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
118             JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
119             if (!offsetInfo.isTargetSchemaTableCreated()) {
120                 jobItemContext.setStatus(JobStatus.PREPARING);
121                 jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
122                 prepareAndCheckTarget(jobItemContext, contextManager);
123                 offsetRepository.persist(jobId, new JobOffsetInfo(true));
124             }
125         });
126     }
127     
128     private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext, final ContextManager contextManager) throws SQLException {
129         if (jobItemContext.isSourceTargetDatabaseTheSame()) {
130             prepareTarget(jobItemContext, contextManager);
131         }
132         if (null == jobItemContext.getInitProgress()) {
133             PipelineDataSource targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
134             new PipelineDataSourceCheckEngine(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSources(Collections.singleton(targetDataSource),
135                     jobItemContext.getTaskConfig().getImporterConfig());
136         }
137     }
138     
139     private void prepareTarget(final MigrationJobItemContext jobItemContext, final ContextManager contextManager) throws SQLException {
140         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
141         DatabaseType targetDatabaseType = jobItemContext.getJobConfig().getTargetDatabaseType();
142         Collection<CreateTableConfiguration> createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations();
143         PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
144         PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(targetDatabaseType);
145         preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
146         ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
147         SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(targetDatabaseType);
148         preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, jobConfig.getTargetDatabaseName()));
149     }
150     
151     private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
152         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
153         JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
154         try {
155             DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
156             IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
157             taskConfig.getDumperContext().getCommonContext().setPosition(position);
158         } catch (final SQLException ex) {
159             throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
160         }
161     }
162     
163     private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
164         InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
165         InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig());
166         jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.split(jobItemContext));
167     }
168     
169     private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
170         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
171         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
172         PipelineExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
173         IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
174         PipelineChannel channel = IncrementalChannelCreator.create(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
175         CreateIncrementalDumperParameter param = new CreateIncrementalDumperParameter(
176                 dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager());
177         Dumper dumper = IncrementalDumperCreator.create(param);
178         Collection<Importer> importers = Collections.singletonList(new SingleChannelConsumerImporter(channel, taskConfig.getImporterConfig().getBatchSize(), 1000L,
179                 jobItemContext.getSink(), jobItemContext));
180         PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
181         jobItemContext.getIncrementalTasks().add(incrementalTask);
182     }
183 }