View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.scenario.migration.api;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
25  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
26  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
27  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
28  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
29  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
30  import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
31  import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
32  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
33  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
34  import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
35  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
36  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
37  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
38  import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
39  import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
40  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
41  import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
42  import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
43  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
44  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
45  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
46  import org.apache.shardingsphere.database.connector.core.jdbcurl.parser.ConnectionProperties;
47  import org.apache.shardingsphere.database.connector.core.jdbcurl.parser.ConnectionPropertiesParser;
48  import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
49  import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
50  import org.apache.shardingsphere.database.connector.core.type.DatabaseTypeFactory;
51  import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
52  import org.apache.shardingsphere.infra.datanode.DataNode;
53  import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
54  import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
55  import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.DuplicateStorageUnitException;
56  import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
57  import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
58  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
59  import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
60  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
61  import org.apache.shardingsphere.infra.util.json.JsonUtils;
62  import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
63  import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
64  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
65  import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
66  import org.apache.shardingsphere.mode.manager.ContextManager;
67  import org.apache.shardingsphere.single.constant.SingleTableConstants;
68  import org.apache.shardingsphere.single.yaml.config.YamlSingleRuleConfiguration;
69  
70  import java.sql.Connection;
71  import java.sql.SQLException;
72  import java.sql.Statement;
73  import java.util.ArrayList;
74  import java.util.Collection;
75  import java.util.Collections;
76  import java.util.Comparator;
77  import java.util.HashMap;
78  import java.util.HashSet;
79  import java.util.LinkedHashMap;
80  import java.util.LinkedList;
81  import java.util.List;
82  import java.util.Map;
83  import java.util.Map.Entry;
84  import java.util.Optional;
85  import java.util.stream.Collectors;
86  
87  /**
88   * Migration job API.
89   */
90  @Slf4j
91  public final class MigrationJobAPI implements TransmissionJobAPI {
92      
93      private final PipelineJobManager jobManager;
94      
95      private final PipelineJobConfigurationManager jobConfigManager;
96      
97      private final PipelineDataSourcePersistService dataSourcePersistService;
98      
99      public MigrationJobAPI() {
100         MigrationJobType jobType = new MigrationJobType();
101         jobManager = new PipelineJobManager(jobType);
102         jobConfigManager = new PipelineJobConfigurationManager(jobType.getOption());
103         dataSourcePersistService = new PipelineDataSourcePersistService();
104     }
105     
106     /**
107      * Schedule migration job.
108      *
109      * @param contextKey context key
110      * @param sourceTargetEntries migration source target entries
111      * @param targetDatabaseName migration target database name
112      * @return job ID
113      */
114     public String schedule(final PipelineContextKey contextKey, final Collection<MigrationSourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) {
115         MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, sourceTargetEntries, targetDatabaseName));
116         jobManager.start(jobConfig);
117         return jobConfig.getJobId();
118     }
119     
120     private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey,
121                                                                     final Collection<MigrationSourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) {
122         YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
123         result.setTargetDatabaseName(targetDatabaseName);
124         Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION");
125         Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>(sourceTargetEntries.size(), 1F);
126         Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>(sourceTargetEntries.size(), 1F);
127         YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
128         for (MigrationSourceTargetEntry each : new HashSet<>(sourceTargetEntries).stream()
129                 .sorted(Comparator.comparing(MigrationSourceTargetEntry::getTargetTableName).thenComparing(each -> each.getSource().format())).collect(Collectors.toList())) {
130             sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource());
131             ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(),
132                     () -> new PipelineInvalidParameterException("More than one source table for " + each.getTargetTableName()));
133             String dataSourceName = each.getSource().getDataSourceName();
134             if (configSources.containsKey(dataSourceName)) {
135                 continue;
136             }
137             ShardingSpherePreconditions.checkContainsKey(metaDataDataSource, dataSourceName,
138                     () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
139             Map<String, Object> sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
140             StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
141             configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
142             DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType();
143             if (null == result.getSourceDatabaseType()) {
144                 result.setSourceDatabaseType(sourceDatabaseType.getType());
145             } else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
146                 throw new PipelineInvalidParameterException("Source storage units have different database types");
147             }
148         }
149         result.setSources(configSources);
150         ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(targetDatabaseName);
151         PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase);
152         result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter()));
153         result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
154         List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream()
155                 .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList());
156         result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
157         result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
158         result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
159         result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
160         result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, result.getJobShardingDataNodes())));
161         return result;
162     }
163     
164     private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) {
165         YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
166         result.setType(type);
167         result.setParameter(param);
168         return result;
169     }
170     
171     private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) {
172         Map<String, Map<String, Object>> targetPoolProps = new HashMap<>(targetDatabase.getResourceMetaData().getStorageUnits().size(), 1F);
173         YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
174         for (Entry<String, StorageUnit> entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
175             targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
176         }
177         YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations());
178         return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
179     }
180     
181     private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
182         ShardingSpherePreconditions.checkNotEmpty(rules, () -> new EmptyRuleException(databaseName));
183         YamlRootConfiguration result = new YamlRootConfiguration();
184         result.setDatabaseName(databaseName);
185         result.setDataSources(yamlDataSources);
186         result.setRules(getYamlRuleConfigurations(rules));
187         return result;
188     }
189     
190     private Collection<YamlRuleConfiguration> getYamlRuleConfigurations(final Collection<RuleConfiguration> rules) {
191         Collection<YamlRuleConfiguration> result = new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules);
192         Optional<YamlSingleRuleConfiguration> originalSingleRuleConfig = result.stream()
193                 .filter(YamlSingleRuleConfiguration.class::isInstance).map(YamlSingleRuleConfiguration.class::cast).findFirst();
194         result.removeIf(YamlSingleRuleConfiguration.class::isInstance);
195         YamlSingleRuleConfiguration singleRuleConfig = new YamlSingleRuleConfiguration();
196         // TODO Provide only the necessary tables.
197         singleRuleConfig.setTables(Collections.singletonList(SingleTableConstants.ALL_TABLES));
198         originalSingleRuleConfig.ifPresent(optional -> singleRuleConfig.setDefaultDataSource(optional.getDefaultDataSource()));
199         result.add(singleRuleConfig);
200         return result;
201     }
202     
203     private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<DataNode>> sourceDataNodes) {
204         Map<String, String> result = new LinkedHashMap<>(sourceDataNodes.size(), 1F);
205         sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName()));
206         return result;
207     }
208     
209     /**
210      * Register migration source storage units.
211      *
212      * @param contextKey context key
213      * @param propsMap data source pool properties map
214      */
215     public void registerMigrationSourceStorageUnits(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
216         Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType());
217         Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F);
218         for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
219             if (existDataSources.containsKey(entry.getKey())) {
220                 duplicateDataSourceNames.add(entry.getKey());
221             }
222         }
223         ShardingSpherePreconditions.checkMustEmpty(duplicateDataSourceNames, () -> new DuplicateStorageUnitException(contextKey.getDatabaseName(), duplicateDataSourceNames));
224         Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources);
225         result.putAll(propsMap);
226         dataSourcePersistService.persist(contextKey, getType(), result);
227     }
228     
229     /**
230      * Drop migration source resources.
231      *
232      * @param contextKey context key
233      * @param resourceNames resource names
234      */
235     public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) {
236         Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType());
237         Collection<String> notExistedResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList());
238         ShardingSpherePreconditions.checkMustEmpty(notExistedResources, () -> new MissingRequiredStorageUnitsException(contextKey.getDatabaseName(), notExistedResources));
239         for (String each : resourceNames) {
240             metaDataDataSource.remove(each);
241         }
242         dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource);
243     }
244     
245     /**
246      * Query migration source resources list.
247      *
248      * @param contextKey context key
249      * @return migration source resources
250      */
251     public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) {
252         Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType());
253         Collection<Collection<Object>> result = new ArrayList<>(propsMap.size());
254         for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
255             String dataSourceName = entry.getKey();
256             DataSourcePoolProperties value = entry.getValue();
257             Collection<Object> props = new LinkedList<>();
258             props.add(dataSourceName);
259             String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
260             DatabaseType databaseType = DatabaseTypeFactory.get(url);
261             props.add(databaseType.getType());
262             ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null);
263             props.add(connectionProps.getHostname());
264             props.add(connectionProps.getPort());
265             props.add(connectionProps.getCatalog());
266             Map<String, Object> standardProps = value.getPoolPropertySynonyms().getStandardProperties();
267             props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds"));
268             props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds"));
269             props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds"));
270             props.add(getStandardProperty(standardProps, "maxPoolSize"));
271             props.add(getStandardProperty(standardProps, "minPoolSize"));
272             props.add(getStandardProperty(standardProps, "readOnly"));
273             Map<String, Object> otherProps = value.getCustomProperties().getProperties();
274             props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps));
275             result.add(props);
276         }
277         return result;
278     }
279     
280     private String getStandardProperty(final Map<String, Object> standardProps, final String key) {
281         return standardProps.containsKey(key) && null != standardProps.get(key) ? standardProps.get(key).toString() : "";
282     }
283     
284     @Override
285     public void commit(final String jobId) {
286         log.info("Commit job {}", jobId);
287         final long startTimeMillis = System.currentTimeMillis();
288         jobManager.stop(jobId);
289         dropCheckJobs(jobId);
290         MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
291         refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
292         jobManager.drop(jobId);
293         log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
294     }
295     
296     private void refreshTableMetadata(final String jobId, final String databaseName) {
297         // TODO use origin database name for now. It can be reduce metadata refresh scope after reloadDatabaseMetaData case-sensitive problem fixed.
298         ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId));
299         ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
300         contextManager.reloadDatabase(database);
301     }
302     
303     @Override
304     public void rollback(final String jobId) throws SQLException {
305         final long startTimeMillis = System.currentTimeMillis();
306         dropCheckJobs(jobId);
307         cleanTempTableOnRollback(jobId);
308         jobManager.drop(jobId);
309         log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
310     }
311     
312     private void dropCheckJobs(final String jobId) {
313         Collection<String> checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId);
314         if (checkJobIds.isEmpty()) {
315             return;
316         }
317         for (String each : checkJobIds) {
318             try {
319                 jobManager.drop(each);
320                 // CHECKSTYLE:OFF
321             } catch (final RuntimeException ex) {
322                 // CHECKSTYLE:ON
323                 log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage());
324             }
325         }
326     }
327     
328     private void cleanTempTableOnRollback(final String jobId) throws SQLException {
329         MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType()).getOption()).getJobConfiguration(jobId);
330         PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
331         TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
332         try (
333                 PipelineDataSource dataSource = new PipelineDataSource(jobConfig.getTarget());
334                 Connection connection = dataSource.getConnection()) {
335             for (String each : jobConfig.getTargetTableNames()) {
336                 String targetSchemaName = mapping.getSchemaName(each);
337                 String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, each);
338                 log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, each, sql);
339                 try (Statement statement = connection.createStatement()) {
340                     statement.execute(sql);
341                 }
342             }
343         }
344     }
345     
346     @Override
347     public String getType() {
348         return "MIGRATION";
349     }
350 }