View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
23  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
24  import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
25  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
26  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
27  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
28  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
29  import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
30  import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
31  import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
32  import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
33  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
34  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.DialectIncrementalDumperCreator;
35  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
36  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
37  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
38  import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
39  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
40  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
41  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
42  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
43  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
44  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
45  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
46  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
47  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
48  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
49  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
50  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
51  import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
52  import org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
53  import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
54  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
55  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
56  import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
57  import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
58  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
59  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
60  import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
61  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
62  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
63  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
64  import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
65  import org.apache.shardingsphere.infra.lock.GlobalLockNames;
66  import org.apache.shardingsphere.infra.lock.LockContext;
67  import org.apache.shardingsphere.infra.lock.LockDefinition;
68  import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
69  import org.apache.shardingsphere.infra.parser.SQLParserEngine;
70  import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
71  import org.apache.shardingsphere.parser.rule.SQLParserRule;
72  
73  import java.sql.SQLException;
74  import java.util.Collection;
75  import java.util.Collections;
76  import java.util.Map.Entry;
77  
78  /**
79   * Migration job preparer.
80   */
81  @Slf4j
82  public final class MigrationJobPreparer {
83      
84      private final MigrationJobType jobType = new MigrationJobType();
85      
86      private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
87      
88      /**
89       * Do prepare work.
90       *
91       * @param jobItemContext job item context
92       * @throws SQLException SQL exception
93       * @throws PipelineJobCancelingException pipeline job canceled exception
94       */
95      public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException, PipelineJobCancelingException {
96          ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
97                  jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
98                  () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration"));
99          DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
100         new DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
101         if (jobItemContext.isStopping()) {
102             throw new PipelineJobCancelingException();
103         }
104         prepareAndCheckTargetWithLock(jobItemContext);
105         if (jobItemContext.isStopping()) {
106             throw new PipelineJobCancelingException();
107         }
108         boolean isIncrementalSupported = DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class, sourceDatabaseType).isPresent();
109         if (isIncrementalSupported) {
110             prepareIncremental(jobItemContext);
111         }
112         initInventoryTasks(jobItemContext);
113         if (isIncrementalSupported) {
114             initIncrementalTasks(jobItemContext);
115             if (jobItemContext.isStopping()) {
116                 throw new PipelineJobCancelingException();
117             }
118         }
119         log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
120                 jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
121     }
122     
123     @SuppressWarnings({"unchecked", "rawtypes"})
124     private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) throws SQLException {
125         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
126         String jobId = jobConfig.getJobId();
127         LockContext lockContext = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext();
128         if (!jobItemManager.getProgress(jobId, jobItemContext.getShardingItem()).isPresent()) {
129             jobItemManager.persistProgress(jobItemContext);
130         }
131         LockDefinition lockDefinition = new GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(), jobConfig.getJobId()));
132         long startTimeMillis = System.currentTimeMillis();
133         if (lockContext.tryLock(lockDefinition, 600000)) {
134             log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
135             try {
136                 JobOffsetInfo offsetInfo = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().load(jobId);
137                 if (!offsetInfo.isTargetSchemaTableCreated()) {
138                     jobItemContext.setStatus(JobStatus.PREPARING);
139                     jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
140                     prepareAndCheckTarget(jobItemContext);
141                     PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId, new JobOffsetInfo(true));
142                 }
143             } finally {
144                 log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
145                 lockContext.unlock(lockDefinition);
146             }
147         } else {
148             log.warn("try lock failed, jobId={}, shardingItem={}", jobId, jobItemContext.getShardingItem());
149         }
150     }
151     
152     private void prepareAndCheckTarget(final MigrationJobItemContext jobItemContext) throws SQLException {
153         DatabaseType targetDatabaseType = jobItemContext.getJobConfig().getTargetDatabaseType();
154         if (jobItemContext.isSourceTargetDatabaseTheSame()) {
155             prepareTarget(jobItemContext, targetDatabaseType);
156         }
157         if (null == jobItemContext.getInitProgress()) {
158             PipelineDataSourceWrapper targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
159             new DataSourceCheckEngine(targetDatabaseType).checkTargetDataSources(Collections.singleton(targetDataSource), jobItemContext.getTaskConfig().getImporterConfig());
160         }
161     }
162     
163     private void prepareTarget(final MigrationJobItemContext jobItemContext, final DatabaseType targetDatabaseType) throws SQLException {
164         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
165         Collection<CreateTableConfiguration> createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations();
166         PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
167         PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class, targetDatabaseType));
168         preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
169         ShardingSphereMetaData metaData = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
170         SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
171                 .getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
172         preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine));
173     }
174     
175     private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
176         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
177         JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
178         try {
179             DatabaseType databaseType = taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
180             IngestPosition position = new IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
181             taskConfig.getDumperContext().getCommonContext().setPosition(position);
182         } catch (final SQLException ex) {
183             throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
184         }
185     }
186     
187     private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
188         InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext());
189         InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig());
190         jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
191     }
192     
193     private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
194         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
195         PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
196         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
197         ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
198         IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
199         PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(jobItemContext.getJobProcessContext().getProcessConfiguration().getStreamChannel(), taskProgress);
200         Dumper dumper = DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
201                 .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
202         Collection<Importer> importers = Collections.singletonList(new SingleChannelConsumerImporter(channel, 1, 5L, jobItemContext.getSink(), jobItemContext));
203         PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
204         jobItemContext.getIncrementalTasks().add(incrementalTask);
205     }
206     
207     /**
208      * Do cleanup work.
209      *
210      * @param jobConfig job configuration
211      */
212     public void cleanup(final MigrationJobConfiguration jobConfig) {
213         for (Entry<String, PipelineDataSourceConfiguration> entry : jobConfig.getSources().entrySet()) {
214             try {
215                 new IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(), entry.getValue());
216             } catch (final SQLException ex) {
217                 log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex);
218             }
219         }
220     }
221 }