View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.preparer.incremental;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
23  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
25  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
26  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
27  import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
28  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
29  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
30  import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
31  import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
32  import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
33  import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
34  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
35  
36  import java.sql.SQLException;
37  import java.util.Optional;
38  
39  /**
40   * Incremental task position manager.
41   */
42  @Slf4j
43  public final class IncrementalTaskPositionManager {
44      
45      private final DatabaseType databaseType;
46      
47      private final DialectIncrementalPositionManager dialectPositionManager;
48      
49      public IncrementalTaskPositionManager(final DatabaseType databaseType) {
50          this.databaseType = databaseType;
51          dialectPositionManager = DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, databaseType);
52      }
53      
54      /**
55       * Get ingest position.
56       *
57       * @param initialProgress initial job item incremental tasks progress
58       * @param dumperContext incremental dumper context
59       * @param dataSourceManager pipeline data source manager
60       * @return ingest position
61       * @throws SQLException SQL exception
62       */
63      public IngestPosition getPosition(final JobItemIncrementalTasksProgress initialProgress,
64                                        final IncrementalDumperContext dumperContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
65          if (null != initialProgress) {
66              Optional<IngestPosition> position = initialProgress.getIncrementalPosition();
67              if (position.isPresent()) {
68                  return position.get();
69              }
70          }
71          return dialectPositionManager.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()), dumperContext.getJobId());
72      }
73      
74      /**
75       * Destroy ingest position.
76       *
77       * @param jobId pipeline job id
78       * @param pipelineDataSourceConfig pipeline data source configuration
79       * @throws SQLException SQL exception
80       */
81      public void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
82          final long startTimeMillis = System.currentTimeMillis();
83          log.info("Cleanup position, database type: {}, pipeline data source type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
84          if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
85              destroyPosition(jobId, (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig);
86          } else if (pipelineDataSourceConfig instanceof StandardPipelineDataSourceConfiguration) {
87              destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig);
88          }
89          log.info("Destroy position cost {} ms.", System.currentTimeMillis() - startTimeMillis);
90      }
91      
92      private void destroyPosition(final String jobId, final ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
93          for (DataSourcePoolProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values()) {
94              try (PipelineDataSource dataSource = new PipelineDataSource(DataSourcePoolCreator.create(each), databaseType)) {
95                  dialectPositionManager.destroy(dataSource, jobId);
96              }
97          }
98      }
99      
100     private void destroyPosition(final String jobId, final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
101         try (PipelineDataSource dataSource = new PipelineDataSource(DataSourcePoolCreator.create((DataSourcePoolProperties) pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
102             dialectPositionManager.destroy(dataSource, jobId);
103         }
104     }
105 }