View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.preparer.incremental;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
25  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
26  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
27  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
28  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
29  import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
30  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
31  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
32  import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
33  import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
34  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
35  
36  import java.sql.SQLException;
37  import java.util.Optional;
38  
39  /**
40   * Incremental task position manager.
41   */
42  @Slf4j
43  public final class IncrementalTaskPositionManager {
44      
45      private final DatabaseType databaseType;
46      
47      private final DialectIngestPositionManager positionInitializer;
48      
49      public IncrementalTaskPositionManager(final DatabaseType databaseType) {
50          this.databaseType = databaseType;
51          positionInitializer = DatabaseTypedSPILoader.getService(DialectIngestPositionManager.class, databaseType);
52      }
53      
54      /**
55       * Get ingest position.
56       *
57       * @param initialProgress initial job item incremental tasks progress
58       * @param dumperContext incremental dumper context
59       * @param dataSourceManager pipeline data source manager
60       * @return ingest position
61       * @throws SQLException SQL exception
62       */
63      public IngestPosition getPosition(final JobItemIncrementalTasksProgress initialProgress,
64                                        final IncrementalDumperContext dumperContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
65          if (null != initialProgress) {
66              Optional<IngestPosition> position = initialProgress.getIncrementalPosition();
67              if (position.isPresent()) {
68                  return position.get();
69              }
70          }
71          return positionInitializer.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()), dumperContext.getJobId());
72      }
73      
74      /**
75       * Destroy ingest position.
76       *
77       * @param jobId pipeline job id
78       * @param pipelineDataSourceConfig pipeline data source configuration
79       * @throws SQLException SQL exception
80       */
81      public void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
82          final long startTimeMillis = System.currentTimeMillis();
83          log.info("Cleanup position, database type: {}, pipeline data source type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
84          if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
85              destroyPosition(jobId, (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer);
86          } else if (pipelineDataSourceConfig instanceof StandardPipelineDataSourceConfiguration) {
87              destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer);
88          }
89          log.info("destroyPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
90      }
91      
92      private void destroyPosition(final String jobId,
93                                   final ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final DialectIngestPositionManager positionInitializer) throws SQLException {
94          for (DataSourcePoolProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values()) {
95              try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
96                  positionInitializer.destroy(dataSource, jobId);
97              }
98          }
99      }
100     
101     private void destroyPosition(final String jobId, final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig,
102                                  final DialectIngestPositionManager positionInitializer) throws SQLException {
103         try (
104                 PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(
105                         DataSourcePoolCreator.create((DataSourcePoolProperties) pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
106             positionInitializer.destroy(dataSource, jobId);
107         }
108     }
109 }