1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.context;
19
20 import lombok.Getter;
21 import lombok.Setter;
22 import lombok.SneakyThrows;
23 import org.apache.commons.lang3.concurrent.ConcurrentException;
24 import org.apache.commons.lang3.concurrent.LazyInitializer;
25 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
26 import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
27 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
28 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
29 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
30 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
31 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
32 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
33 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
34 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
35 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
36 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
37 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
38 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
39
40 import java.util.Collection;
41 import java.util.LinkedList;
42 import java.util.concurrent.atomic.AtomicLong;
43
44
45
46
47 @Getter
48 public final class CDCJobItemContext implements TransmissionJobItemContext {
49
50 private final CDCJobConfiguration jobConfig;
51
52 private final int shardingItem;
53
54 @Setter
55 private volatile boolean stopping;
56
57 @Setter
58 private volatile JobStatus status = JobStatus.RUNNING;
59
60 private final TransmissionJobItemProgress initProgress;
61
62 private final TransmissionProcessContext jobProcessContext;
63
64 private final CDCTaskConfiguration taskConfig;
65
66 private final PipelineDataSourceManager dataSourceManager;
67
68 private final PipelineSink sink;
69
70 private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
71
72 private final Collection<PipelineTask> incrementalTasks = new LinkedList<>();
73
74 private final AtomicLong processedRecordsCount = new AtomicLong(0L);
75
76 private final AtomicLong inventoryRecordsCount = new AtomicLong(0L);
77
78 private final LazyInitializer<PipelineDataSourceWrapper> sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSourceWrapper>() {
79
80 @Override
81 protected PipelineDataSourceWrapper initialize() {
82 return dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
83 }
84 };
85
86 private final LazyInitializer<PipelineTableMetaDataLoader> sourceMetaDataLoaderLazyInitializer = new LazyInitializer<PipelineTableMetaDataLoader>() {
87
88 @Override
89 protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
90 return new StandardPipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
91 }
92 };
93
94 public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress initProgress, final TransmissionProcessContext jobProcessContext,
95 final CDCTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager, final PipelineSink sink) {
96 this.jobConfig = jobConfig;
97 this.shardingItem = shardingItem;
98 this.initProgress = initProgress;
99 if (null != initProgress) {
100 processedRecordsCount.set(initProgress.getProcessedRecordsCount());
101 inventoryRecordsCount.set(initProgress.getInventoryRecordsCount());
102 }
103 this.jobProcessContext = jobProcessContext;
104 this.taskConfig = taskConfig;
105 this.dataSourceManager = dataSourceManager;
106 this.sink = sink;
107 }
108
109 @Override
110 public String getJobId() {
111 return jobConfig.getJobId();
112 }
113
114 @Override
115 public String getDataSourceName() {
116 return taskConfig.getDumperContext().getCommonContext().getDataSourceName();
117 }
118
119 @Override
120 public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) {
121 processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
122 PipelineJobProgressPersistService.notifyPersist(jobConfig.getJobId(), shardingItem);
123 }
124
125
126
127
128
129
130 @SneakyThrows(ConcurrentException.class)
131 public PipelineDataSourceWrapper getSourceDataSource() {
132 return sourceDataSourceLazyInitializer.get();
133 }
134
135 @Override
136 @SneakyThrows(ConcurrentException.class)
137 public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
138 return sourceMetaDataLoaderLazyInitializer.get();
139 }
140
141 @Override
142 public long getProcessedRecordsCount() {
143 return processedRecordsCount.get();
144 }
145
146 @Override
147 public void updateInventoryRecordsCount(final long recordsCount) {
148 inventoryRecordsCount.addAndGet(recordsCount);
149 }
150
151 @Override
152 public long getInventoryRecordsCount() {
153 return inventoryRecordsCount.get();
154 }
155 }