1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper;
19
20 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
21 import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.swapper.YamlPipelineDataSourceConfigurationSwapper;
22 import org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
23 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
24 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
25 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
26 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
27 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
28
29 import java.util.LinkedHashMap;
30 import java.util.Map.Entry;
31 import java.util.stream.Collectors;
32
33
34
35
36 public final class YamlMigrationJobConfigurationSwapper implements YamlPipelineJobConfigurationSwapper<YamlMigrationJobConfiguration, MigrationJobConfiguration> {
37
38 private final YamlPipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
39
40 @Override
41 public YamlMigrationJobConfiguration swapToYamlConfiguration(final MigrationJobConfiguration data) {
42 YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
43 result.setJobId(data.getJobId());
44 result.setTargetDatabaseName(data.getTargetDatabaseName());
45 result.setSourceDatabaseType(data.getSourceDatabaseType().getType());
46 result.setTargetDatabaseType(data.getTargetDatabaseType().getType());
47 result.setSources(data.getSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
48 entry -> dataSourceConfigSwapper.swapToYamlConfiguration(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)));
49 result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
50 result.setTargetTableNames(data.getTargetTableNames());
51 result.setTargetTableSchemaMap(data.getTargetTableSchemaMap());
52 result.setTablesFirstDataNodes(data.getTablesFirstDataNodes().marshal());
53 result.setJobShardingDataNodes(data.getJobShardingDataNodes().stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
54 result.setConcurrency(data.getConcurrency());
55 result.setRetryTimes(data.getRetryTimes());
56 return result;
57 }
58
59 @Override
60 public MigrationJobConfiguration swapToObject(final YamlMigrationJobConfiguration yamlConfig) {
61 return new MigrationJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(),
62 TypedSPILoader.getService(DatabaseType.class, yamlConfig.getSourceDatabaseType()), TypedSPILoader.getService(DatabaseType.class, yamlConfig.getTargetDatabaseType()),
63 yamlConfig.getSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
64 entry -> dataSourceConfigSwapper.swapToObject(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
65 dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
66 yamlConfig.getTargetTableNames(), yamlConfig.getTargetTableSchemaMap(),
67 JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes()), yamlConfig.getJobShardingDataNodes().stream().map(JobDataNodeLine::unmarshal).collect(Collectors.toList()),
68 yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
69 }
70
71 @Override
72 public MigrationJobConfiguration swapToObject(final String jobParam) {
73 return swapToObject(YamlEngine.unmarshal(jobParam, YamlMigrationJobConfiguration.class, true));
74 }
75 }