1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.context;
19
20 import lombok.Getter;
21 import lombok.SneakyThrows;
22 import org.apache.commons.lang3.concurrent.ConcurrentException;
23 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
24 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
25 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
26 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
27 import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
28 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
29 import org.apache.shardingsphere.data.pipeline.core.util.PipelineLazyInitializer;
30 import org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
31 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
32
33
34
35
36 public final class TransmissionProcessContext implements PipelineProcessContext {
37
38 @Getter
39 private final PipelineProcessConfiguration processConfiguration;
40
41 @Getter
42 private final JobRateLimitAlgorithm readRateLimitAlgorithm;
43
44 @Getter
45 private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
46
47 private final PipelineLazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer;
48
49 private final PipelineLazyInitializer<ExecuteEngine> inventoryImporterExecuteEngineLazyInitializer;
50
51 private final PipelineLazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;
52
53 public TransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
54 processConfiguration = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
55 PipelineReadConfiguration readConfig = processConfiguration.getRead();
56 AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
57 readRateLimitAlgorithm = null == readRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, readRateLimiter.getType(), readRateLimiter.getProps());
58 PipelineWriteConfiguration writeConfig = processConfiguration.getWrite();
59 AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
60 writeRateLimitAlgorithm = null == writeRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, writeRateLimiter.getType(), writeRateLimiter.getProps());
61 inventoryDumperExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
62
63 @Override
64 protected ExecuteEngine doInitialize() {
65 return ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-" + jobId);
66 }
67 };
68 inventoryImporterExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
69
70 @Override
71 protected ExecuteEngine doInitialize() {
72 return ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-" + jobId);
73 }
74 };
75 incrementalExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
76
77 @Override
78 protected ExecuteEngine doInitialize() {
79 return ExecuteEngine.newCachedThreadInstance("Incremental-" + jobId);
80 }
81 };
82 }
83
84
85
86
87
88
89 @SneakyThrows(ConcurrentException.class)
90 public ExecuteEngine getInventoryDumperExecuteEngine() {
91 return inventoryDumperExecuteEngineLazyInitializer.get();
92 }
93
94
95
96
97
98
99 @SneakyThrows(ConcurrentException.class)
100 public ExecuteEngine getInventoryImporterExecuteEngine() {
101 return inventoryImporterExecuteEngineLazyInitializer.get();
102 }
103
104
105
106
107
108
109 @SneakyThrows(ConcurrentException.class)
110 public ExecuteEngine getIncrementalExecuteEngine() {
111 return incrementalExecuteEngineLazyInitializer.get();
112 }
113
114 @Override
115 public void close() throws Exception {
116 shutdownExecuteEngine(inventoryDumperExecuteEngineLazyInitializer);
117 shutdownExecuteEngine(inventoryImporterExecuteEngineLazyInitializer);
118 shutdownExecuteEngine(incrementalExecuteEngineLazyInitializer);
119 }
120
121 private void shutdownExecuteEngine(final PipelineLazyInitializer<ExecuteEngine> lazyInitializer) throws ConcurrentException {
122 if (lazyInitializer.isInitialized()) {
123 lazyInitializer.get().shutdown();
124 }
125 }
126 }