1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.preparer.incremental;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
25 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
26 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
27 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
28 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
29 import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
30 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
31 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
32 import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
33 import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
34 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
35
36 import java.sql.SQLException;
37 import java.util.Optional;
38
39
40
41
42 @Slf4j
43 public final class IncrementalTaskPositionManager {
44
45 private final DatabaseType databaseType;
46
47 private final DialectIngestPositionManager positionInitializer;
48
49 public IncrementalTaskPositionManager(final DatabaseType databaseType) {
50 this.databaseType = databaseType;
51 positionInitializer = DatabaseTypedSPILoader.getService(DialectIngestPositionManager.class, databaseType);
52 }
53
54
55
56
57
58
59
60
61
62
63 public IngestPosition getPosition(final JobItemIncrementalTasksProgress initialProgress,
64 final IncrementalDumperContext dumperContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
65 if (null != initialProgress) {
66 Optional<IngestPosition> position = initialProgress.getIncrementalPosition();
67 if (position.isPresent()) {
68 return position.get();
69 }
70 }
71 return positionInitializer.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()), dumperContext.getJobId());
72 }
73
74
75
76
77
78
79
80
81 public void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
82 final long startTimeMillis = System.currentTimeMillis();
83 log.info("Cleanup position, database type: {}, pipeline data source type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
84 if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
85 destroyPosition(jobId, (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer);
86 } else if (pipelineDataSourceConfig instanceof StandardPipelineDataSourceConfiguration) {
87 destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer);
88 }
89 log.info("destroyPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
90 }
91
92 private void destroyPosition(final String jobId,
93 final ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final DialectIngestPositionManager positionInitializer) throws SQLException {
94 for (DataSourcePoolProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values()) {
95 try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
96 positionInitializer.destroy(dataSource, jobId);
97 }
98 }
99 }
100
101 private void destroyPosition(final String jobId, final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig,
102 final DialectIngestPositionManager positionInitializer) throws SQLException {
103 try (
104 PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(
105 DataSourcePoolCreator.create((DataSourcePoolProperties) pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
106 positionInitializer.destroy(dataSource, jobId);
107 }
108 }
109 }