1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.scenario.migration.context;
19
20 import lombok.Getter;
21 import lombok.Setter;
22 import lombok.SneakyThrows;
23 import org.apache.commons.lang3.concurrent.ConcurrentException;
24 import org.apache.commons.lang3.concurrent.LazyInitializer;
25 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
26 import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
27 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
28 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
29 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
30 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
31 import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
32 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
33 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
34 import org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
35 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
36 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
37 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
38 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
39 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
40
41 import java.util.Collection;
42 import java.util.LinkedList;
43 import java.util.concurrent.atomic.AtomicLong;
44
45
46
47
48 @Getter
49 @Setter
50 public final class MigrationJobItemContext implements TransmissionJobItemContext {
51
52 private final String jobId;
53
54 private final int shardingItem;
55
56 private final String dataSourceName;
57
58 private volatile boolean stopping;
59
60 private volatile JobStatus status = JobStatus.RUNNING;
61
62 private final TransmissionJobItemProgress initProgress;
63
64 private final MigrationTaskConfiguration taskConfig;
65
66 private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
67
68 private final Collection<PipelineTask> incrementalTasks = new LinkedList<>();
69
70 private final AtomicLong processedRecordsCount = new AtomicLong(0L);
71
72 private final AtomicLong inventoryRecordsCount = new AtomicLong(0L);
73
74 private final MigrationJobConfiguration jobConfig;
75
76 private final TransmissionProcessContext jobProcessContext;
77
78 private final PipelineDataSourceManager dataSourceManager;
79
80 private final LazyInitializer<PipelineDataSourceWrapper> sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSourceWrapper>() {
81
82 @Override
83 protected PipelineDataSourceWrapper initialize() {
84 return dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
85 }
86 };
87
88 private final LazyInitializer<PipelineTableMetaDataLoader> sourceMetaDataLoaderLazyInitializer = new LazyInitializer<PipelineTableMetaDataLoader>() {
89
90 @Override
91 protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
92 return new StandardPipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
93 }
94 };
95
96 public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress initProgress,
97 final TransmissionProcessContext jobProcessContext, final MigrationTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) {
98 this.jobConfig = jobConfig;
99 jobId = jobConfig.getJobId();
100 this.shardingItem = shardingItem;
101 dataSourceName = taskConfig.getDataSourceName();
102 this.initProgress = initProgress;
103 if (null != initProgress) {
104 processedRecordsCount.set(initProgress.getProcessedRecordsCount());
105 inventoryRecordsCount.set(initProgress.getInventoryRecordsCount());
106 }
107 this.jobProcessContext = jobProcessContext;
108 this.taskConfig = taskConfig;
109 this.dataSourceManager = dataSourceManager;
110 }
111
112
113
114
115
116
117 @SneakyThrows(ConcurrentException.class)
118 public PipelineDataSourceWrapper getSourceDataSource() {
119 return sourceDataSourceLazyInitializer.get();
120 }
121
122 @Override
123 @SneakyThrows(ConcurrentException.class)
124 public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
125 return sourceMetaDataLoaderLazyInitializer.get();
126 }
127
128 @Override
129 public PipelineSink getSink() {
130 return new PipelineDataSourceSink(taskConfig.getImporterConfig(), dataSourceManager);
131 }
132
133
134
135
136
137
138 public boolean isSourceTargetDatabaseTheSame() {
139 return jobConfig.getSourceDatabaseType() == jobConfig.getTargetDatabaseType();
140 }
141
142 @Override
143 public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) {
144 processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
145 PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
146 }
147
148 @Override
149 public long getProcessedRecordsCount() {
150 return processedRecordsCount.get();
151 }
152
153 @Override
154 public void updateInventoryRecordsCount(final long recordsCount) {
155 inventoryRecordsCount.addAndGet(recordsCount);
156 }
157
158 @Override
159 public long getInventoryRecordsCount() {
160 return inventoryRecordsCount.get();
161 }
162 }