View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.scenario.migration.api;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
25  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
26  import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
27  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
28  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
29  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
30  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
31  import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
32  import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
33  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
34  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
35  import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
36  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
37  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
38  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
39  import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
40  import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
41  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
42  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
43  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
44  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
45  import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
46  import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
47  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
48  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
49  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
50  import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
51  import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
52  import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
53  import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
54  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
55  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
56  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
57  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
58  import org.apache.shardingsphere.infra.datanode.DataNode;
59  import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
60  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
61  import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.DuplicateStorageUnitException;
62  import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
63  import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
64  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
65  import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
66  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
67  import org.apache.shardingsphere.infra.util.json.JsonUtils;
68  import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
69  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
70  import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
71  import org.apache.shardingsphere.mode.manager.ContextManager;
72  
73  import java.sql.Connection;
74  import java.sql.SQLException;
75  import java.sql.Statement;
76  import java.util.ArrayList;
77  import java.util.Collection;
78  import java.util.Comparator;
79  import java.util.HashMap;
80  import java.util.HashSet;
81  import java.util.LinkedHashMap;
82  import java.util.LinkedList;
83  import java.util.List;
84  import java.util.Map;
85  import java.util.Map.Entry;
86  import java.util.stream.Collectors;
87  
88  /**
89   * Migration job API.
90   */
91  @Slf4j
92  public final class MigrationJobAPI implements TransmissionJobAPI {
93      
94      private final PipelineJobManager jobManager;
95      
96      private final PipelineJobConfigurationManager jobConfigManager;
97      
98      private final PipelineDataSourcePersistService dataSourcePersistService;
99      
100     public MigrationJobAPI() {
101         PipelineJobType jobType = new MigrationJobType();
102         jobManager = new PipelineJobManager(jobType);
103         jobConfigManager = new PipelineJobConfigurationManager(jobType);
104         dataSourcePersistService = new PipelineDataSourcePersistService();
105     }
106     
107     /**
108      * Start migration job.
109      *
110      * @param contextKey context key
111      * @param param create migration job parameter
112      * @return job id
113      */
114     public String start(final PipelineContextKey contextKey, final MigrateTableStatement param) {
115         MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param));
116         jobManager.start(jobConfig);
117         return jobConfig.getJobId();
118     }
119     
120     private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) {
121         YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
122         result.setTargetDatabaseName(param.getTargetDatabaseName());
123         Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION");
124         Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
125         Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>();
126         List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
127                 .thenComparing(each -> DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList());
128         YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
129         for (SourceTargetEntry each : sourceTargetEntries) {
130             sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource());
131             ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(),
132                     () -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName()));
133             String dataSourceName = each.getSource().getDataSourceName();
134             if (configSources.containsKey(dataSourceName)) {
135                 continue;
136             }
137             ShardingSpherePreconditions.checkContainsKey(metaDataDataSource, dataSourceName,
138                     () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
139             Map<String, Object> sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
140             StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
141             configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
142             DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
143             if (null == each.getSource().getSchemaName() && dialectDatabaseMetaData.isSchemaAvailable()) {
144                 each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
145             }
146             DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType();
147             if (null == result.getSourceDatabaseType()) {
148                 result.setSourceDatabaseType(sourceDatabaseType.getType());
149             } else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
150                 throw new PipelineInvalidParameterException("Source storage units have different database types");
151             }
152         }
153         result.setSources(configSources);
154         ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
155         PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase);
156         result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter()));
157         result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
158         List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream()
159                 .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList());
160         result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
161         result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
162         result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
163         result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
164         result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, result.getJobShardingDataNodes())));
165         return result;
166     }
167     
168     private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) {
169         YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
170         result.setType(type);
171         result.setParameter(param);
172         return result;
173     }
174     
175     private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) {
176         Map<String, Map<String, Object>> targetPoolProps = new HashMap<>();
177         YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
178         for (Entry<String, StorageUnit> entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
179             targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
180         }
181         YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations());
182         return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
183     }
184     
185     private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
186         ShardingSpherePreconditions.checkNotEmpty(rules, () -> new EmptyRuleException(databaseName));
187         YamlRootConfiguration result = new YamlRootConfiguration();
188         result.setDatabaseName(databaseName);
189         result.setDataSources(yamlDataSources);
190         result.setRules(new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules));
191         return result;
192     }
193     
194     private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<DataNode>> sourceDataNodes) {
195         Map<String, String> result = new LinkedHashMap<>();
196         sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName()));
197         return result;
198     }
199     
200     /**
201      * Register migration source storage units.
202      *
203      * @param contextKey context key
204      * @param propsMap data source pool properties map
205      */
206     public void registerMigrationSourceStorageUnits(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
207         Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType());
208         Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F);
209         for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
210             if (existDataSources.containsKey(entry.getKey())) {
211                 duplicateDataSourceNames.add(entry.getKey());
212             }
213         }
214         ShardingSpherePreconditions.checkMustEmpty(duplicateDataSourceNames, () -> new DuplicateStorageUnitException(contextKey.getDatabaseName(), duplicateDataSourceNames));
215         Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources);
216         result.putAll(propsMap);
217         dataSourcePersistService.persist(contextKey, getType(), result);
218     }
219     
220     /**
221      * Drop migration source resources.
222      *
223      * @param contextKey context key
224      * @param resourceNames resource names
225      */
226     public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) {
227         Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType());
228         Collection<String> notExistedResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList());
229         ShardingSpherePreconditions.checkMustEmpty(notExistedResources, () -> new MissingRequiredStorageUnitsException(contextKey.getDatabaseName(), notExistedResources));
230         for (String each : resourceNames) {
231             metaDataDataSource.remove(each);
232         }
233         dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource);
234     }
235     
236     /**
237      * Query migration source resources list.
238      *
239      * @param contextKey context key
240      * @return migration source resources
241      */
242     public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) {
243         Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType());
244         Collection<Collection<Object>> result = new ArrayList<>(propsMap.size());
245         for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
246             String dataSourceName = entry.getKey();
247             DataSourcePoolProperties value = entry.getValue();
248             Collection<Object> props = new LinkedList<>();
249             props.add(dataSourceName);
250             String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
251             DatabaseType databaseType = DatabaseTypeFactory.get(url);
252             props.add(databaseType.getType());
253             ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null);
254             props.add(connectionProps.getHostname());
255             props.add(connectionProps.getPort());
256             props.add(connectionProps.getCatalog());
257             Map<String, Object> standardProps = value.getPoolPropertySynonyms().getStandardProperties();
258             props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds"));
259             props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds"));
260             props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds"));
261             props.add(getStandardProperty(standardProps, "maxPoolSize"));
262             props.add(getStandardProperty(standardProps, "minPoolSize"));
263             props.add(getStandardProperty(standardProps, "readOnly"));
264             Map<String, Object> otherProps = value.getCustomProperties().getProperties();
265             props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps));
266             result.add(props);
267         }
268         return result;
269     }
270     
271     private String getStandardProperty(final Map<String, Object> standardProps, final String key) {
272         return standardProps.containsKey(key) && null != standardProps.get(key) ? standardProps.get(key).toString() : "";
273     }
274     
275     @Override
276     public void commit(final String jobId) {
277         log.info("Commit job {}", jobId);
278         final long startTimeMillis = System.currentTimeMillis();
279         jobManager.stop(jobId);
280         dropCheckJobs(jobId);
281         MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
282         refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
283         jobManager.drop(jobId);
284         log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
285     }
286     
287     private void refreshTableMetadata(final String jobId, final String databaseName) {
288         // TODO use origin database name now, wait reloadDatabaseMetaData fix case-sensitive probelm
289         ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager();
290         ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
291         contextManager.refreshTableMetaData(database);
292     }
293     
294     @Override
295     public void rollback(final String jobId) throws SQLException {
296         final long startTimeMillis = System.currentTimeMillis();
297         dropCheckJobs(jobId);
298         cleanTempTableOnRollback(jobId);
299         jobManager.drop(jobId);
300         log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
301     }
302     
303     private void dropCheckJobs(final String jobId) {
304         Collection<String> checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId);
305         if (checkJobIds.isEmpty()) {
306             return;
307         }
308         for (String each : checkJobIds) {
309             try {
310                 jobManager.drop(each);
311                 // CHECKSTYLE:OFF
312             } catch (final RuntimeException ex) {
313                 // CHECKSTYLE:ON
314                 log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage());
315             }
316         }
317     }
318     
319     private void cleanTempTableOnRollback(final String jobId) throws SQLException {
320         MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType())).getJobConfiguration(jobId);
321         PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
322         TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
323         try (
324                 PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(jobConfig.getTarget());
325                 Connection connection = dataSource.getConnection()) {
326             for (String each : jobConfig.getTargetTableNames()) {
327                 String targetSchemaName = mapping.getSchemaName(each);
328                 String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, each);
329                 log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, each, sql);
330                 try (Statement statement = connection.createStatement()) {
331                     statement.execute(sql);
332                 }
333             }
334         }
335     }
336     
337     @Override
338     public String getType() {
339         return "MIGRATION";
340     }
341 }