View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.scenario.migration.api;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
25  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
26  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
27  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
28  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
29  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
30  import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
31  import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
33  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
34  import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
35  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
36  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
37  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
38  import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
39  import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
40  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
41  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
42  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
43  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
44  import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
45  import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
46  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
47  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
48  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
49  import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
50  import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
51  import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
52  import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
53  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
54  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
55  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
56  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
57  import org.apache.shardingsphere.infra.datanode.DataNode;
58  import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
59  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
60  import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.DuplicateStorageUnitException;
61  import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
62  import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
63  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
64  import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
65  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
66  import org.apache.shardingsphere.infra.util.json.JsonUtils;
67  import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
68  import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
69  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
70  import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
71  import org.apache.shardingsphere.mode.manager.ContextManager;
72  import org.apache.shardingsphere.single.constant.SingleTableConstants;
73  import org.apache.shardingsphere.single.yaml.config.YamlSingleRuleConfiguration;
74  
75  import java.sql.Connection;
76  import java.sql.SQLException;
77  import java.sql.Statement;
78  import java.util.ArrayList;
79  import java.util.Collection;
80  import java.util.Collections;
81  import java.util.Comparator;
82  import java.util.HashMap;
83  import java.util.HashSet;
84  import java.util.LinkedHashMap;
85  import java.util.LinkedList;
86  import java.util.List;
87  import java.util.Map;
88  import java.util.Map.Entry;
89  import java.util.Optional;
90  import java.util.stream.Collectors;
91  
92  /**
93   * Migration job API.
94   */
95  @Slf4j
96  public final class MigrationJobAPI implements TransmissionJobAPI {
97      
98      private final PipelineJobManager jobManager;
99      
100     private final PipelineJobConfigurationManager jobConfigManager;
101     
102     private final PipelineDataSourcePersistService dataSourcePersistService;
103     
104     public MigrationJobAPI() {
105         PipelineJobType jobType = new MigrationJobType();
106         jobManager = new PipelineJobManager(jobType);
107         jobConfigManager = new PipelineJobConfigurationManager(jobType);
108         dataSourcePersistService = new PipelineDataSourcePersistService();
109     }
110     
111     /**
112      * Schedule migration job.
113      *
114      * @param contextKey context key
115      * @param param create migration job parameter
116      * @return job id
117      */
118     public String schedule(final PipelineContextKey contextKey, final MigrateTableStatement param) {
119         MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param));
120         jobManager.start(jobConfig);
121         return jobConfig.getJobId();
122     }
123     
124     private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) {
125         YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
126         result.setTargetDatabaseName(param.getTargetDatabaseName());
127         Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION");
128         Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
129         Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>();
130         List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
131                 .thenComparing(each -> each.getSource().format())).collect(Collectors.toList());
132         YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
133         for (SourceTargetEntry each : sourceTargetEntries) {
134             sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource());
135             ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(),
136                     () -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName()));
137             String dataSourceName = each.getSource().getDataSourceName();
138             if (configSources.containsKey(dataSourceName)) {
139                 continue;
140             }
141             ShardingSpherePreconditions.checkContainsKey(metaDataDataSource, dataSourceName,
142                     () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
143             Map<String, Object> sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
144             StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
145             configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
146             DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
147             if (null == each.getSource().getSchemaName() && dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()) {
148                 each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
149             }
150             DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType();
151             if (null == result.getSourceDatabaseType()) {
152                 result.setSourceDatabaseType(sourceDatabaseType.getType());
153             } else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
154                 throw new PipelineInvalidParameterException("Source storage units have different database types");
155             }
156         }
157         result.setSources(configSources);
158         ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
159         PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase);
160         result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter()));
161         result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
162         List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream()
163                 .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList());
164         result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
165         result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
166         result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
167         result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
168         result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, result.getJobShardingDataNodes())));
169         return result;
170     }
171     
172     private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) {
173         YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
174         result.setType(type);
175         result.setParameter(param);
176         return result;
177     }
178     
179     private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) {
180         Map<String, Map<String, Object>> targetPoolProps = new HashMap<>(targetDatabase.getResourceMetaData().getStorageUnits().size(), 1F);
181         YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
182         for (Entry<String, StorageUnit> entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
183             targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
184         }
185         YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations());
186         return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
187     }
188     
189     private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
190         ShardingSpherePreconditions.checkNotEmpty(rules, () -> new EmptyRuleException(databaseName));
191         YamlRootConfiguration result = new YamlRootConfiguration();
192         result.setDatabaseName(databaseName);
193         result.setDataSources(yamlDataSources);
194         result.setRules(getYamlRuleConfigurations(rules));
195         return result;
196     }
197     
198     private Collection<YamlRuleConfiguration> getYamlRuleConfigurations(final Collection<RuleConfiguration> rules) {
199         Collection<YamlRuleConfiguration> result = new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules);
200         Optional<YamlSingleRuleConfiguration> originalSingleRuleConfig = result.stream()
201                 .filter(YamlSingleRuleConfiguration.class::isInstance).map(YamlSingleRuleConfiguration.class::cast).findFirst();
202         result.removeIf(YamlSingleRuleConfiguration.class::isInstance);
203         YamlSingleRuleConfiguration singleRuleConfig = new YamlSingleRuleConfiguration();
204         // TODO Provide only the necessary tables.
205         singleRuleConfig.setTables(Collections.singletonList(SingleTableConstants.ALL_TABLES));
206         originalSingleRuleConfig.ifPresent(optional -> singleRuleConfig.setDefaultDataSource(optional.getDefaultDataSource()));
207         result.add(singleRuleConfig);
208         return result;
209     }
210     
211     private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<DataNode>> sourceDataNodes) {
212         Map<String, String> result = new LinkedHashMap<>(sourceDataNodes.size(), 1F);
213         sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName()));
214         return result;
215     }
216     
217     /**
218      * Register migration source storage units.
219      *
220      * @param contextKey context key
221      * @param propsMap data source pool properties map
222      */
223     public void registerMigrationSourceStorageUnits(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
224         Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType());
225         Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F);
226         for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
227             if (existDataSources.containsKey(entry.getKey())) {
228                 duplicateDataSourceNames.add(entry.getKey());
229             }
230         }
231         ShardingSpherePreconditions.checkMustEmpty(duplicateDataSourceNames, () -> new DuplicateStorageUnitException(contextKey.getDatabaseName(), duplicateDataSourceNames));
232         Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources);
233         result.putAll(propsMap);
234         dataSourcePersistService.persist(contextKey, getType(), result);
235     }
236     
237     /**
238      * Drop migration source resources.
239      *
240      * @param contextKey context key
241      * @param resourceNames resource names
242      */
243     public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) {
244         Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType());
245         Collection<String> notExistedResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList());
246         ShardingSpherePreconditions.checkMustEmpty(notExistedResources, () -> new MissingRequiredStorageUnitsException(contextKey.getDatabaseName(), notExistedResources));
247         for (String each : resourceNames) {
248             metaDataDataSource.remove(each);
249         }
250         dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource);
251     }
252     
253     /**
254      * Query migration source resources list.
255      *
256      * @param contextKey context key
257      * @return migration source resources
258      */
259     public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) {
260         Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType());
261         Collection<Collection<Object>> result = new ArrayList<>(propsMap.size());
262         for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
263             String dataSourceName = entry.getKey();
264             DataSourcePoolProperties value = entry.getValue();
265             Collection<Object> props = new LinkedList<>();
266             props.add(dataSourceName);
267             String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
268             DatabaseType databaseType = DatabaseTypeFactory.get(url);
269             props.add(databaseType.getType());
270             ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null);
271             props.add(connectionProps.getHostname());
272             props.add(connectionProps.getPort());
273             props.add(connectionProps.getCatalog());
274             Map<String, Object> standardProps = value.getPoolPropertySynonyms().getStandardProperties();
275             props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds"));
276             props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds"));
277             props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds"));
278             props.add(getStandardProperty(standardProps, "maxPoolSize"));
279             props.add(getStandardProperty(standardProps, "minPoolSize"));
280             props.add(getStandardProperty(standardProps, "readOnly"));
281             Map<String, Object> otherProps = value.getCustomProperties().getProperties();
282             props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps));
283             result.add(props);
284         }
285         return result;
286     }
287     
288     private String getStandardProperty(final Map<String, Object> standardProps, final String key) {
289         return standardProps.containsKey(key) && null != standardProps.get(key) ? standardProps.get(key).toString() : "";
290     }
291     
292     @Override
293     public void commit(final String jobId) {
294         log.info("Commit job {}", jobId);
295         final long startTimeMillis = System.currentTimeMillis();
296         jobManager.stop(jobId);
297         dropCheckJobs(jobId);
298         MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
299         refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
300         jobManager.drop(jobId);
301         log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
302     }
303     
304     private void refreshTableMetadata(final String jobId, final String databaseName) {
305         // TODO use origin database name for now. It can be reduce metadata refresh scope after reloadDatabaseMetaData case-sensitive problem fixed.
306         ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager();
307         ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
308         contextManager.reloadDatabase(database);
309     }
310     
311     @Override
312     public void rollback(final String jobId) throws SQLException {
313         final long startTimeMillis = System.currentTimeMillis();
314         dropCheckJobs(jobId);
315         cleanTempTableOnRollback(jobId);
316         jobManager.drop(jobId);
317         log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
318     }
319     
320     private void dropCheckJobs(final String jobId) {
321         Collection<String> checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId);
322         if (checkJobIds.isEmpty()) {
323             return;
324         }
325         for (String each : checkJobIds) {
326             try {
327                 jobManager.drop(each);
328                 // CHECKSTYLE:OFF
329             } catch (final RuntimeException ex) {
330                 // CHECKSTYLE:ON
331                 log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage());
332             }
333         }
334     }
335     
336     private void cleanTempTableOnRollback(final String jobId) throws SQLException {
337         MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType())).getJobConfiguration(jobId);
338         PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
339         TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
340         try (
341                 PipelineDataSource dataSource = new PipelineDataSource(jobConfig.getTarget());
342                 Connection connection = dataSource.getConnection()) {
343             for (String each : jobConfig.getTargetTableNames()) {
344                 String targetSchemaName = mapping.getSchemaName(each);
345                 String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, each);
346                 log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, each, sql);
347                 try (Statement statement = connection.createStatement()) {
348                     statement.execute(sql);
349                 }
350             }
351         }
352     }
353     
354     @Override
355     public String getType() {
356         return "MIGRATION";
357     }
358 }