1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.scenario.migration.api;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
25 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
26 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
27 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
28 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
29 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
30 import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
31 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
32 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
33 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
34 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
35 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
36 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
37 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
38 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
39 import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
40 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
41 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
42 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
43 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
44 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
45 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
46 import org.apache.shardingsphere.database.connector.core.jdbcurl.parser.ConnectionProperties;
47 import org.apache.shardingsphere.database.connector.core.jdbcurl.parser.ConnectionPropertiesParser;
48 import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
49 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
50 import org.apache.shardingsphere.database.connector.core.type.DatabaseTypeFactory;
51 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
52 import org.apache.shardingsphere.infra.datanode.DataNode;
53 import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
54 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
55 import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.DuplicateStorageUnitException;
56 import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
57 import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
58 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
59 import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
60 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
61 import org.apache.shardingsphere.infra.util.json.JsonUtils;
62 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
63 import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
64 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
65 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
66 import org.apache.shardingsphere.mode.manager.ContextManager;
67 import org.apache.shardingsphere.single.constant.SingleTableConstants;
68 import org.apache.shardingsphere.single.yaml.config.YamlSingleRuleConfiguration;
69
70 import java.sql.Connection;
71 import java.sql.SQLException;
72 import java.sql.Statement;
73 import java.util.ArrayList;
74 import java.util.Collection;
75 import java.util.Collections;
76 import java.util.Comparator;
77 import java.util.HashMap;
78 import java.util.HashSet;
79 import java.util.LinkedHashMap;
80 import java.util.LinkedList;
81 import java.util.List;
82 import java.util.Map;
83 import java.util.Map.Entry;
84 import java.util.Optional;
85 import java.util.stream.Collectors;
86
87
88
89
90 @Slf4j
91 public final class MigrationJobAPI implements TransmissionJobAPI {
92
93 private final PipelineJobManager jobManager;
94
95 private final PipelineJobConfigurationManager jobConfigManager;
96
97 private final PipelineDataSourcePersistService dataSourcePersistService;
98
99 public MigrationJobAPI() {
100 MigrationJobType jobType = new MigrationJobType();
101 jobManager = new PipelineJobManager(jobType);
102 jobConfigManager = new PipelineJobConfigurationManager(jobType.getOption());
103 dataSourcePersistService = new PipelineDataSourcePersistService();
104 }
105
106
107
108
109
110
111
112
113
114 public String schedule(final PipelineContextKey contextKey, final Collection<MigrationSourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) {
115 MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, sourceTargetEntries, targetDatabaseName));
116 jobManager.start(jobConfig);
117 return jobConfig.getJobId();
118 }
119
120 private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey,
121 final Collection<MigrationSourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) {
122 YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
123 result.setTargetDatabaseName(targetDatabaseName);
124 Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION");
125 Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>(sourceTargetEntries.size(), 1F);
126 Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>(sourceTargetEntries.size(), 1F);
127 YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
128 for (MigrationSourceTargetEntry each : new HashSet<>(sourceTargetEntries).stream()
129 .sorted(Comparator.comparing(MigrationSourceTargetEntry::getTargetTableName).thenComparing(each -> each.getSource().format())).collect(Collectors.toList())) {
130 sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource());
131 ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(),
132 () -> new PipelineInvalidParameterException("More than one source table for " + each.getTargetTableName()));
133 String dataSourceName = each.getSource().getDataSourceName();
134 if (configSources.containsKey(dataSourceName)) {
135 continue;
136 }
137 ShardingSpherePreconditions.checkContainsKey(metaDataDataSource, dataSourceName,
138 () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
139 Map<String, Object> sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
140 StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
141 configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
142 DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType();
143 if (null == result.getSourceDatabaseType()) {
144 result.setSourceDatabaseType(sourceDatabaseType.getType());
145 } else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
146 throw new PipelineInvalidParameterException("Source storage units have different database types");
147 }
148 }
149 result.setSources(configSources);
150 ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(targetDatabaseName);
151 PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase);
152 result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter()));
153 result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
154 List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream()
155 .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList());
156 result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
157 result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
158 result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
159 result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
160 result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, result.getJobShardingDataNodes())));
161 return result;
162 }
163
164 private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) {
165 YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
166 result.setType(type);
167 result.setParameter(param);
168 return result;
169 }
170
171 private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) {
172 Map<String, Map<String, Object>> targetPoolProps = new HashMap<>(targetDatabase.getResourceMetaData().getStorageUnits().size(), 1F);
173 YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
174 for (Entry<String, StorageUnit> entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
175 targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
176 }
177 YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations());
178 return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
179 }
180
181 private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
182 ShardingSpherePreconditions.checkNotEmpty(rules, () -> new EmptyRuleException(databaseName));
183 YamlRootConfiguration result = new YamlRootConfiguration();
184 result.setDatabaseName(databaseName);
185 result.setDataSources(yamlDataSources);
186 result.setRules(getYamlRuleConfigurations(rules));
187 return result;
188 }
189
190 private Collection<YamlRuleConfiguration> getYamlRuleConfigurations(final Collection<RuleConfiguration> rules) {
191 Collection<YamlRuleConfiguration> result = new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules);
192 Optional<YamlSingleRuleConfiguration> originalSingleRuleConfig = result.stream()
193 .filter(YamlSingleRuleConfiguration.class::isInstance).map(YamlSingleRuleConfiguration.class::cast).findFirst();
194 result.removeIf(YamlSingleRuleConfiguration.class::isInstance);
195 YamlSingleRuleConfiguration singleRuleConfig = new YamlSingleRuleConfiguration();
196
197 singleRuleConfig.setTables(Collections.singletonList(SingleTableConstants.ALL_TABLES));
198 originalSingleRuleConfig.ifPresent(optional -> singleRuleConfig.setDefaultDataSource(optional.getDefaultDataSource()));
199 result.add(singleRuleConfig);
200 return result;
201 }
202
203 private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<DataNode>> sourceDataNodes) {
204 Map<String, String> result = new LinkedHashMap<>(sourceDataNodes.size(), 1F);
205 sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName()));
206 return result;
207 }
208
209
210
211
212
213
214
215 public void registerMigrationSourceStorageUnits(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
216 Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType());
217 Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F);
218 for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
219 if (existDataSources.containsKey(entry.getKey())) {
220 duplicateDataSourceNames.add(entry.getKey());
221 }
222 }
223 ShardingSpherePreconditions.checkMustEmpty(duplicateDataSourceNames, () -> new DuplicateStorageUnitException(contextKey.getDatabaseName(), duplicateDataSourceNames));
224 Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources);
225 result.putAll(propsMap);
226 dataSourcePersistService.persist(contextKey, getType(), result);
227 }
228
229
230
231
232
233
234
235 public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) {
236 Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType());
237 Collection<String> notExistedResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList());
238 ShardingSpherePreconditions.checkMustEmpty(notExistedResources, () -> new MissingRequiredStorageUnitsException(contextKey.getDatabaseName(), notExistedResources));
239 for (String each : resourceNames) {
240 metaDataDataSource.remove(each);
241 }
242 dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource);
243 }
244
245
246
247
248
249
250
251 public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) {
252 Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType());
253 Collection<Collection<Object>> result = new ArrayList<>(propsMap.size());
254 for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
255 String dataSourceName = entry.getKey();
256 DataSourcePoolProperties value = entry.getValue();
257 Collection<Object> props = new LinkedList<>();
258 props.add(dataSourceName);
259 String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
260 DatabaseType databaseType = DatabaseTypeFactory.get(url);
261 props.add(databaseType.getType());
262 ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null);
263 props.add(connectionProps.getHostname());
264 props.add(connectionProps.getPort());
265 props.add(connectionProps.getCatalog());
266 Map<String, Object> standardProps = value.getPoolPropertySynonyms().getStandardProperties();
267 props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds"));
268 props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds"));
269 props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds"));
270 props.add(getStandardProperty(standardProps, "maxPoolSize"));
271 props.add(getStandardProperty(standardProps, "minPoolSize"));
272 props.add(getStandardProperty(standardProps, "readOnly"));
273 Map<String, Object> otherProps = value.getCustomProperties().getProperties();
274 props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps));
275 result.add(props);
276 }
277 return result;
278 }
279
280 private String getStandardProperty(final Map<String, Object> standardProps, final String key) {
281 return standardProps.containsKey(key) && null != standardProps.get(key) ? standardProps.get(key).toString() : "";
282 }
283
284 @Override
285 public void commit(final String jobId) {
286 log.info("Commit job {}", jobId);
287 final long startTimeMillis = System.currentTimeMillis();
288 jobManager.stop(jobId);
289 dropCheckJobs(jobId);
290 MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
291 refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
292 jobManager.drop(jobId);
293 log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
294 }
295
296 private void refreshTableMetadata(final String jobId, final String databaseName) {
297
298 ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId));
299 ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
300 contextManager.reloadDatabase(database);
301 }
302
303 @Override
304 public void rollback(final String jobId) throws SQLException {
305 final long startTimeMillis = System.currentTimeMillis();
306 dropCheckJobs(jobId);
307 cleanTempTableOnRollback(jobId);
308 jobManager.drop(jobId);
309 log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
310 }
311
312 private void dropCheckJobs(final String jobId) {
313 Collection<String> checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId);
314 if (checkJobIds.isEmpty()) {
315 return;
316 }
317 for (String each : checkJobIds) {
318 try {
319 jobManager.drop(each);
320
321 } catch (final RuntimeException ex) {
322
323 log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage());
324 }
325 }
326 }
327
328 private void cleanTempTableOnRollback(final String jobId) throws SQLException {
329 MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType()).getOption()).getJobConfiguration(jobId);
330 PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
331 TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
332 try (
333 PipelineDataSource dataSource = new PipelineDataSource(jobConfig.getTarget());
334 Connection connection = dataSource.getConnection()) {
335 for (String each : jobConfig.getTargetTableNames()) {
336 String targetSchemaName = mapping.getSchemaName(each);
337 String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, each);
338 log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, each, sql);
339 try (Statement statement = connection.createStatement()) {
340 statement.execute(sql);
341 }
342 }
343 }
344 }
345
346 @Override
347 public String getType() {
348 return "MIGRATION";
349 }
350 }