1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.scenario.migration.api;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
25 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
26 import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
27 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
28 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
29 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
30 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
31 import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
32 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
33 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
34 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
35 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
36 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
37 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
38 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
39 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
40 import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
41 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
42 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
43 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
44 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
45 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
46 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
47 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
48 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
49 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
50 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
51 import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
52 import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
53 import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
54 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
55 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
56 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
57 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
58 import org.apache.shardingsphere.infra.datanode.DataNode;
59 import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
60 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
61 import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.DuplicateStorageUnitException;
62 import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException;
63 import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
64 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
65 import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
66 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
67 import org.apache.shardingsphere.infra.util.json.JsonUtils;
68 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
69 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
70 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
71 import org.apache.shardingsphere.mode.manager.ContextManager;
72
73 import java.sql.Connection;
74 import java.sql.SQLException;
75 import java.sql.Statement;
76 import java.util.ArrayList;
77 import java.util.Collection;
78 import java.util.Comparator;
79 import java.util.HashMap;
80 import java.util.HashSet;
81 import java.util.LinkedHashMap;
82 import java.util.LinkedList;
83 import java.util.List;
84 import java.util.Map;
85 import java.util.Map.Entry;
86 import java.util.stream.Collectors;
87
88
89
90
91 @Slf4j
92 public final class MigrationJobAPI implements TransmissionJobAPI {
93
94 private final PipelineJobManager jobManager;
95
96 private final PipelineJobConfigurationManager jobConfigManager;
97
98 private final PipelineDataSourcePersistService dataSourcePersistService;
99
100 public MigrationJobAPI() {
101 PipelineJobType jobType = new MigrationJobType();
102 jobManager = new PipelineJobManager(jobType);
103 jobConfigManager = new PipelineJobConfigurationManager(jobType);
104 dataSourcePersistService = new PipelineDataSourcePersistService();
105 }
106
107
108
109
110
111
112
113
114 public String start(final PipelineContextKey contextKey, final MigrateTableStatement param) {
115 MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param));
116 jobManager.start(jobConfig);
117 return jobConfig.getJobId();
118 }
119
120 private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) {
121 YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
122 result.setTargetDatabaseName(param.getTargetDatabaseName());
123 Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION");
124 Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
125 Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>();
126 List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
127 .thenComparing(each -> DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList());
128 YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
129 for (SourceTargetEntry each : sourceTargetEntries) {
130 sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource());
131 ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(),
132 () -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName()));
133 String dataSourceName = each.getSource().getDataSourceName();
134 if (configSources.containsKey(dataSourceName)) {
135 continue;
136 }
137 ShardingSpherePreconditions.checkContainsKey(metaDataDataSource, dataSourceName,
138 () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
139 Map<String, Object> sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
140 StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
141 configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
142 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
143 if (null == each.getSource().getSchemaName() && dialectDatabaseMetaData.isSchemaAvailable()) {
144 each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
145 }
146 DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType();
147 if (null == result.getSourceDatabaseType()) {
148 result.setSourceDatabaseType(sourceDatabaseType.getType());
149 } else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
150 throw new PipelineInvalidParameterException("Source storage units have different database types");
151 }
152 }
153 result.setSources(configSources);
154 ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
155 PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase);
156 result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter()));
157 result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
158 List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream()
159 .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList());
160 result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
161 result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
162 result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
163 result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
164 result.setJobId(PipelineJobIdUtils.marshal(new MigrationJobId(contextKey, result.getJobShardingDataNodes())));
165 return result;
166 }
167
168 private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) {
169 YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
170 result.setType(type);
171 result.setParameter(param);
172 return result;
173 }
174
175 private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) {
176 Map<String, Map<String, Object>> targetPoolProps = new HashMap<>();
177 YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
178 for (Entry<String, StorageUnit> entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
179 targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
180 }
181 YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations());
182 return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
183 }
184
185 private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
186 ShardingSpherePreconditions.checkNotEmpty(rules, () -> new EmptyRuleException(databaseName));
187 YamlRootConfiguration result = new YamlRootConfiguration();
188 result.setDatabaseName(databaseName);
189 result.setDataSources(yamlDataSources);
190 result.setRules(new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules));
191 return result;
192 }
193
194 private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<DataNode>> sourceDataNodes) {
195 Map<String, String> result = new LinkedHashMap<>();
196 sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName()));
197 return result;
198 }
199
200
201
202
203
204
205
206 public void registerMigrationSourceStorageUnits(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
207 Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType());
208 Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F);
209 for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
210 if (existDataSources.containsKey(entry.getKey())) {
211 duplicateDataSourceNames.add(entry.getKey());
212 }
213 }
214 ShardingSpherePreconditions.checkMustEmpty(duplicateDataSourceNames, () -> new DuplicateStorageUnitException(contextKey.getDatabaseName(), duplicateDataSourceNames));
215 Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources);
216 result.putAll(propsMap);
217 dataSourcePersistService.persist(contextKey, getType(), result);
218 }
219
220
221
222
223
224
225
226 public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) {
227 Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType());
228 Collection<String> notExistedResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList());
229 ShardingSpherePreconditions.checkMustEmpty(notExistedResources, () -> new MissingRequiredStorageUnitsException(contextKey.getDatabaseName(), notExistedResources));
230 for (String each : resourceNames) {
231 metaDataDataSource.remove(each);
232 }
233 dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource);
234 }
235
236
237
238
239
240
241
242 public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) {
243 Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType());
244 Collection<Collection<Object>> result = new ArrayList<>(propsMap.size());
245 for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) {
246 String dataSourceName = entry.getKey();
247 DataSourcePoolProperties value = entry.getValue();
248 Collection<Object> props = new LinkedList<>();
249 props.add(dataSourceName);
250 String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
251 DatabaseType databaseType = DatabaseTypeFactory.get(url);
252 props.add(databaseType.getType());
253 ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null);
254 props.add(connectionProps.getHostname());
255 props.add(connectionProps.getPort());
256 props.add(connectionProps.getCatalog());
257 Map<String, Object> standardProps = value.getPoolPropertySynonyms().getStandardProperties();
258 props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds"));
259 props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds"));
260 props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds"));
261 props.add(getStandardProperty(standardProps, "maxPoolSize"));
262 props.add(getStandardProperty(standardProps, "minPoolSize"));
263 props.add(getStandardProperty(standardProps, "readOnly"));
264 Map<String, Object> otherProps = value.getCustomProperties().getProperties();
265 props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps));
266 result.add(props);
267 }
268 return result;
269 }
270
271 private String getStandardProperty(final Map<String, Object> standardProps, final String key) {
272 return standardProps.containsKey(key) && null != standardProps.get(key) ? standardProps.get(key).toString() : "";
273 }
274
275 @Override
276 public void commit(final String jobId) {
277 log.info("Commit job {}", jobId);
278 final long startTimeMillis = System.currentTimeMillis();
279 jobManager.stop(jobId);
280 dropCheckJobs(jobId);
281 MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
282 refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
283 jobManager.drop(jobId);
284 log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
285 }
286
287 private void refreshTableMetadata(final String jobId, final String databaseName) {
288
289 ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager();
290 ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
291 contextManager.refreshTableMetaData(database);
292 }
293
294 @Override
295 public void rollback(final String jobId) throws SQLException {
296 final long startTimeMillis = System.currentTimeMillis();
297 dropCheckJobs(jobId);
298 cleanTempTableOnRollback(jobId);
299 jobManager.drop(jobId);
300 log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis);
301 }
302
303 private void dropCheckJobs(final String jobId) {
304 Collection<String> checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId);
305 if (checkJobIds.isEmpty()) {
306 return;
307 }
308 for (String each : checkJobIds) {
309 try {
310 jobManager.drop(each);
311
312 } catch (final RuntimeException ex) {
313
314 log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage());
315 }
316 }
317 }
318
319 private void cleanTempTableOnRollback(final String jobId) throws SQLException {
320 MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType())).getJobConfiguration(jobId);
321 PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
322 TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
323 try (
324 PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(jobConfig.getTarget());
325 Connection connection = dataSource.getConnection()) {
326 for (String each : jobConfig.getTargetTableNames()) {
327 String targetSchemaName = mapping.getSchemaName(each);
328 String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, each);
329 log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, each, sql);
330 try (Statement statement = connection.createStatement()) {
331 statement.execute(sql);
332 }
333 }
334 }
335 }
336
337 @Override
338 public String getType() {
339 return "MIGRATION";
340 }
341 }