1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.scenario.migration.api;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
25 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
26 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
27 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
28 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
29 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
30 import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
31 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
33 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
34 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
35 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
36 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
37 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
38 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
39 import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
40 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
41 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
42 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
43 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
44 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
45 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
46 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
47 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
48 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
49 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
50 import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
51 import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
52 import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
53 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
54 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
55 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
56 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
57 import org.apache.shardingsphere.infra.datanode.DataNode;
58 import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
59 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
60 import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.DuplicateStorageUnitException;
61 import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
62 import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
63 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
64 import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
65 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
66 import org.apache.shardingsphere.infra.util.json.JsonUtils;
67 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
68 import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
69 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
70 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
71 import org.apache.shardingsphere.mode.manager.ContextManager;
72 import org.apache.shardingsphere.single.constant.SingleTableConstants;
73 import org.apache.shardingsphere.single.yaml.config.YamlSingleRuleConfiguration;
74
75 import java.sql.Connection;
76 import java.sql.SQLException;
77 import java.sql.Statement;
78 import java.util.ArrayList;
79 import java.util.Collection;
80 import java.util.Collections;
81 import java.util.Comparator;
82 import java.util.HashMap;
83 import java.util.HashSet;
84 import java.util.LinkedHashMap;
85 import java.util.LinkedList;
86 import java.util.List;
87 import java.util.Map;
88 import java.util.Map.Entry;
89 import java.util.Optional;
90 import java.util.stream.Collectors;
91
92
93
94
95 @Slf4j
96 public final class MigrationJobAPI implements TransmissionJobAPI {
97
98 private final PipelineJobManager jobManager;
99
100 private final PipelineJobConfigurationManager jobConfigManager;
101
102 private final PipelineDataSourcePersistService dataSourcePersistService;
103
104 public MigrationJobAPI() {
105 PipelineJobType jobType = new MigrationJobType();
106 jobManager = new PipelineJobManager(jobType);
107 jobConfigManager = new PipelineJobConfigurationManager(jobType);
108 dataSourcePersistService = new PipelineDataSourcePersistService();
109 }
110
111
112
113
114
115
116
117
118 public String schedule(final PipelineContextKey contextKey, final MigrateTableStatement param) {
119 MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param));
120 jobManager.start(jobConfig);
121 return jobConfig.getJobId();
122 }
123
124 private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) {
125 YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
126 result.setTargetDatabaseName(param.getTargetDatabaseName());
127 Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION");
128 Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
129 Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>();
130 List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
131 .thenComparing(each -> each.getSource().format())).collect(Collectors.toList());
132 YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
133 for (SourceTargetEntry each : sourceTargetEntries) {
134 sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource());
135 ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(),
136 () -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName()));
137 String dataSourceName = each.getSource().getDataSourceName();
138 if (configSources.containsKey(dataSourceName)) {
139 continue;
140 }
141 ShardingSpherePreconditions.checkContainsKey(metaDataDataSource, dataSourceName,
142 () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
143 Map<String, Object> sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
144 StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
145 configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
146 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
147 if (null == each.getSource().getSchemaName() && dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()) {
148 each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
149 }
150 DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType();
151 if (null == result.getSourceDatabaseType()) {
152 result.setSourceDatabaseType(sourceDatabaseType.getType());
153 } else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
154 throw new PipelineInvalidParameterException("Source storage units have different database types");
155 }
156 }
157 result.setSources(configSources);
158 ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
159 PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase);
160 result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter()));
161 result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
162 List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream()
163 .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList());
164 result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
165 result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
166 result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
167 result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
168 result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, result.getJobShardingDataNodes())));
169 return result;
170 }
171
172 private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) {
173 YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
174 result.setType(type);
175 result.setParameter(param);
176 return result;
177 }
178
179 private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) {
180 Map<String, Map<String, Object>> targetPoolProps = new HashMap<>(targetDatabase.getResourceMetaData().getStorageUnits().size(), 1F);
181 YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
182 for (Entry<String, StorageUnit> entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
183 targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
184 }
185 YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations());
186 return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
187 }
188
189 private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
190 ShardingSpherePreconditions.checkNotEmpty(rules, () -> new EmptyRuleException(databaseName));
191 YamlRootConfiguration result = new YamlRootConfiguration();
192 result.setDatabaseName(databaseName);
193 result.setDataSources(yamlDataSources);
194 result.setRules(getYamlRuleConfigurations(rules));
195 return result;
196 }
197
198 private Collection<YamlRuleConfiguration> getYamlRuleConfigurations(final Collection<RuleConfiguration> rules) {
199 Collection<YamlRuleConfiguration> result = new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules);
200 Optional<YamlSingleRuleConfiguration> originalSingleRuleConfig = result.stream()
201 .filter(YamlSingleRuleConfiguration.class::isInstance).map(YamlSingleRuleConfiguration.class::cast).findFirst();
202 result.removeIf(YamlSingleRuleConfiguration.class::isInstance);
203 YamlSingleRuleConfiguration singleRuleConfig = new YamlSingleRuleConfiguration();
204
205 singleRuleConfig.setTables(Collections.singletonList(SingleTableConstants.ALL_TABLES));
206 originalSingleRuleConfig.ifPresent(optional -> singleRuleConfig.setDefaultDataSource(optional.getDefaultDataSource()));
207 result.add(singleRuleConfig);
208 return result;
209 }
210
211 private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<DataNode>> sourceDataNodes) {
212 Map<String, String> result = new LinkedHashMap<>(sourceDataNodes.size(), 1F);
213 sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName()));
214 return result;
215 }
216
217
218
219
220
221
222
223 public void registerMigrationSourceStorageUnits(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
224 Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType());
225 Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F);
226 for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
227 if (existDataSources.containsKey(entry.getKey())) {
228 duplicateDataSourceNames.add(entry.getKey());
229 }
230 }
231 ShardingSpherePreconditions.checkMustEmpty(duplicateDataSourceNames, () -> new DuplicateStorageUnitException(contextKey.getDatabaseName(), duplicateDataSourceNames));
232 Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources);
233 result.putAll(propsMap);
234 dataSourcePersistService.persist(contextKey, getType(), result);
235 }
236
237
238
239
240
241
242
243 public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) {
244 Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType());
245 Collection<String> notExistedResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList());
246 ShardingSpherePreconditions.checkMustEmpty(notExistedResources, () -> new MissingRequiredStorageUnitsException(contextKey.getDatabaseName(), notExistedResources));
247 for (String each : resourceNames) {
248 metaDataDataSource.remove(each);
249 }
250 dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource);
251 }
252
253
254
255
256
257
258
259 public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) {
260 Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType());
261 Collection<Collection<Object>> result = new ArrayList<>(propsMap.size());
262 for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
263 String dataSourceName = entry.getKey();
264 DataSourcePoolProperties value = entry.getValue();
265 Collection<Object> props = new LinkedList<>();
266 props.add(dataSourceName);
267 String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
268 DatabaseType databaseType = DatabaseTypeFactory.get(url);
269 props.add(databaseType.getType());
270 ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null);
271 props.add(connectionProps.getHostname());
272 props.add(connectionProps.getPort());
273 props.add(connectionProps.getCatalog());
274 Map<String, Object> standardProps = value.getPoolPropertySynonyms().getStandardProperties();
275 props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds"));
276 props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds"));
277 props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds"));
278 props.add(getStandardProperty(standardProps, "maxPoolSize"));
279 props.add(getStandardProperty(standardProps, "minPoolSize"));
280 props.add(getStandardProperty(standardProps, "readOnly"));
281 Map<String, Object> otherProps = value.getCustomProperties().getProperties();
282 props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps));
283 result.add(props);
284 }
285 return result;
286 }
287
288 private String getStandardProperty(final Map<String, Object> standardProps, final String key) {
289 return standardProps.containsKey(key) && null != standardProps.get(key) ? standardProps.get(key).toString() : "";
290 }
291
292 @Override
293 public void commit(final String jobId) {
294 log.info("Commit job {}", jobId);
295 final long startTimeMillis = System.currentTimeMillis();
296 jobManager.stop(jobId);
297 dropCheckJobs(jobId);
298 MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
299 refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
300 jobManager.drop(jobId);
301 log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
302 }
303
304 private void refreshTableMetadata(final String jobId, final String databaseName) {
305
306 ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager();
307 ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
308 contextManager.reloadDatabase(database);
309 }
310
311 @Override
312 public void rollback(final String jobId) throws SQLException {
313 final long startTimeMillis = System.currentTimeMillis();
314 dropCheckJobs(jobId);
315 cleanTempTableOnRollback(jobId);
316 jobManager.drop(jobId);
317 log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
318 }
319
320 private void dropCheckJobs(final String jobId) {
321 Collection<String> checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId);
322 if (checkJobIds.isEmpty()) {
323 return;
324 }
325 for (String each : checkJobIds) {
326 try {
327 jobManager.drop(each);
328
329 } catch (final RuntimeException ex) {
330
331 log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage());
332 }
333 }
334 }
335
336 private void cleanTempTableOnRollback(final String jobId) throws SQLException {
337 MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType())).getJobConfiguration(jobId);
338 PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
339 TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
340 try (
341 PipelineDataSource dataSource = new PipelineDataSource(jobConfig.getTarget());
342 Connection connection = dataSource.getConnection()) {
343 for (String each : jobConfig.getTargetTableNames()) {
344 String targetSchemaName = mapping.getSchemaName(each);
345 String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, each);
346 log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, each, sql);
347 try (Statement statement = connection.createStatement()) {
348 statement.execute(sql);
349 }
350 }
351 }
352 }
353
354 @Override
355 public String getType() {
356 return "MIGRATION";
357 }
358 }