View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper;
19  
20  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
21  import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.swapper.YamlPipelineDataSourceConfigurationSwapper;
22  import org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
23  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
24  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
25  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
26  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
27  import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
28  
29  import java.util.LinkedHashMap;
30  import java.util.Map.Entry;
31  import java.util.stream.Collectors;
32  
33  /**
34   * YAML migration job configuration swapper.
35   */
36  public final class YamlMigrationJobConfigurationSwapper implements YamlPipelineJobConfigurationSwapper<YamlMigrationJobConfiguration, MigrationJobConfiguration> {
37      
38      private final YamlPipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
39      
40      @Override
41      public YamlMigrationJobConfiguration swapToYamlConfiguration(final MigrationJobConfiguration data) {
42          YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
43          result.setJobId(data.getJobId());
44          result.setTargetDatabaseName(data.getTargetDatabaseName());
45          result.setSourceDatabaseType(data.getSourceDatabaseType().getType());
46          result.setTargetDatabaseType(data.getTargetDatabaseType().getType());
47          result.setSources(data.getSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
48                  entry -> dataSourceConfigSwapper.swapToYamlConfiguration(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)));
49          result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
50          result.setTargetTableNames(data.getTargetTableNames());
51          result.setTargetTableSchemaMap(data.getTargetTableSchemaMap());
52          result.setTablesFirstDataNodes(data.getTablesFirstDataNodes().marshal());
53          result.setJobShardingDataNodes(data.getJobShardingDataNodes().stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
54          result.setConcurrency(data.getConcurrency());
55          result.setRetryTimes(data.getRetryTimes());
56          return result;
57      }
58      
59      @Override
60      public MigrationJobConfiguration swapToObject(final YamlMigrationJobConfiguration yamlConfig) {
61          return new MigrationJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(),
62                  TypedSPILoader.getService(DatabaseType.class, yamlConfig.getSourceDatabaseType()), TypedSPILoader.getService(DatabaseType.class, yamlConfig.getTargetDatabaseType()),
63                  yamlConfig.getSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
64                          entry -> dataSourceConfigSwapper.swapToObject(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
65                  dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
66                  yamlConfig.getTargetTableNames(), yamlConfig.getTargetTableSchemaMap(),
67                  JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes()), yamlConfig.getJobShardingDataNodes().stream().map(JobDataNodeLine::unmarshal).collect(Collectors.toList()),
68                  yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
69      }
70      
71      @Override
72      public MigrationJobConfiguration swapToObject(final String jobParam) {
73          return swapToObject(YamlEngine.unmarshal(jobParam, YamlMigrationJobConfiguration.class, true));
74      }
75  }