View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.context;
19  
20  import lombok.Getter;
21  import lombok.SneakyThrows;
22  import org.apache.commons.lang3.concurrent.ConcurrentException;
23  import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
24  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
25  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
26  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
27  import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
28  import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
29  import org.apache.shardingsphere.data.pipeline.core.util.PipelineLazyInitializer;
30  import org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
31  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
32  
33  /**
34   * Transmission process context.
35   */
36  public final class TransmissionProcessContext implements PipelineProcessContext {
37      
38      @Getter
39      private final PipelineProcessConfiguration processConfiguration;
40      
41      @Getter
42      private final JobRateLimitAlgorithm readRateLimitAlgorithm;
43      
44      @Getter
45      private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
46      
47      private final PipelineLazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer;
48      
49      private final PipelineLazyInitializer<ExecuteEngine> inventoryImporterExecuteEngineLazyInitializer;
50      
51      private final PipelineLazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;
52      
53      public TransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
54          processConfiguration = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
55          PipelineReadConfiguration readConfig = processConfiguration.getRead();
56          AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
57          readRateLimitAlgorithm = null == readRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, readRateLimiter.getType(), readRateLimiter.getProps());
58          PipelineWriteConfiguration writeConfig = processConfiguration.getWrite();
59          AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
60          writeRateLimitAlgorithm = null == writeRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, writeRateLimiter.getType(), writeRateLimiter.getProps());
61          inventoryDumperExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
62              
63              @Override
64              protected ExecuteEngine doInitialize() {
65                  return ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-" + jobId);
66              }
67          };
68          inventoryImporterExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
69              
70              @Override
71              protected ExecuteEngine doInitialize() {
72                  return ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-" + jobId);
73              }
74          };
75          incrementalExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
76              
77              @Override
78              protected ExecuteEngine doInitialize() {
79                  return ExecuteEngine.newCachedThreadInstance("Incremental-" + jobId);
80              }
81          };
82      }
83      
84      /**
85       * Get inventory dumper execute engine.
86       *
87       * @return inventory dumper execute engine
88       */
89      @SneakyThrows(ConcurrentException.class)
90      public ExecuteEngine getInventoryDumperExecuteEngine() {
91          return inventoryDumperExecuteEngineLazyInitializer.get();
92      }
93      
94      /**
95       * Get inventory importer execute engine.
96       *
97       * @return inventory importer execute engine
98       */
99      @SneakyThrows(ConcurrentException.class)
100     public ExecuteEngine getInventoryImporterExecuteEngine() {
101         return inventoryImporterExecuteEngineLazyInitializer.get();
102     }
103     
104     /**
105      * Get incremental execute engine.
106      *
107      * @return incremental execute engine
108      */
109     @SneakyThrows(ConcurrentException.class)
110     public ExecuteEngine getIncrementalExecuteEngine() {
111         return incrementalExecuteEngineLazyInitializer.get();
112     }
113     
114     @Override
115     public void close() throws Exception {
116         shutdownExecuteEngine(inventoryDumperExecuteEngineLazyInitializer);
117         shutdownExecuteEngine(inventoryImporterExecuteEngineLazyInitializer);
118         shutdownExecuteEngine(incrementalExecuteEngineLazyInitializer);
119     }
120     
121     private void shutdownExecuteEngine(final PipelineLazyInitializer<ExecuteEngine> lazyInitializer) throws ConcurrentException {
122         if (lazyInitializer.isInitialized()) {
123             lazyInitializer.get().shutdown();
124         }
125     }
126 }