1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.preparer.incremental;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
25 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
26 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
27 import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
28 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
29 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
30 import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
31 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
32 import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
33 import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
34 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
35
36 import java.sql.SQLException;
37 import java.util.Optional;
38
39
40
41
42 @Slf4j
43 public final class IncrementalTaskPositionManager {
44
45 private final DatabaseType databaseType;
46
47 private final DialectIncrementalPositionManager dialectPositionManager;
48
49 public IncrementalTaskPositionManager(final DatabaseType databaseType) {
50 this.databaseType = databaseType;
51 dialectPositionManager = DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, databaseType);
52 }
53
54
55
56
57
58
59
60
61
62
63 public IngestPosition getPosition(final JobItemIncrementalTasksProgress initialProgress,
64 final IncrementalDumperContext dumperContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
65 if (null != initialProgress) {
66 Optional<IngestPosition> position = initialProgress.getIncrementalPosition();
67 if (position.isPresent()) {
68 return position.get();
69 }
70 }
71 return dialectPositionManager.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()), dumperContext.getJobId());
72 }
73
74
75
76
77
78
79
80
81 public void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
82 final long startTimeMillis = System.currentTimeMillis();
83 log.info("Cleanup position, database type: {}, pipeline data source type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
84 if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
85 destroyPosition(jobId, (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig);
86 } else if (pipelineDataSourceConfig instanceof StandardPipelineDataSourceConfiguration) {
87 destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig);
88 }
89 log.info("Destroy position cost {} ms.", System.currentTimeMillis() - startTimeMillis);
90 }
91
92 private void destroyPosition(final String jobId, final ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
93 for (DataSourcePoolProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values()) {
94 try (PipelineDataSource dataSource = new PipelineDataSource(DataSourcePoolCreator.create(each), databaseType)) {
95 dialectPositionManager.destroy(dataSource, jobId);
96 }
97 }
98 }
99
100 private void destroyPosition(final String jobId, final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
101 try (PipelineDataSource dataSource = new PipelineDataSource(DataSourcePoolCreator.create((DataSourcePoolProperties) pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
102 dialectPositionManager.destroy(dataSource, jobId);
103 }
104 }
105 }