View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.scenario.migration.context;
19  
20  import lombok.Getter;
21  import lombok.Setter;
22  import lombok.SneakyThrows;
23  import org.apache.commons.lang3.concurrent.ConcurrentException;
24  import org.apache.commons.lang3.concurrent.LazyInitializer;
25  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
26  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
27  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
28  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
29  import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
30  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
31  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
32  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
33  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
34  import org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
35  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
36  import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
37  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
38  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
39  import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
40  
41  import java.util.Collection;
42  import java.util.LinkedList;
43  import java.util.concurrent.atomic.AtomicLong;
44  
45  /**
46   * Migration job item context.
47   */
48  @Getter
49  @Setter
50  public final class MigrationJobItemContext implements TransmissionJobItemContext {
51      
52      private final String jobId;
53      
54      private final int shardingItem;
55      
56      private final String dataSourceName;
57      
58      private volatile boolean stopping;
59      
60      private volatile JobStatus status = JobStatus.RUNNING;
61      
62      private final TransmissionJobItemProgress initProgress;
63      
64      private final MigrationTaskConfiguration taskConfig;
65      
66      private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
67      
68      private final Collection<PipelineTask> incrementalTasks = new LinkedList<>();
69      
70      private final AtomicLong processedRecordsCount = new AtomicLong(0L);
71      
72      private final AtomicLong inventoryRecordsCount = new AtomicLong(0L);
73      
74      private final MigrationJobConfiguration jobConfig;
75      
76      private final TransmissionProcessContext jobProcessContext;
77      
78      private final PipelineDataSourceManager dataSourceManager;
79      
80      private final LazyInitializer<PipelineDataSourceWrapper> sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSourceWrapper>() {
81          
82          @Override
83          protected PipelineDataSourceWrapper initialize() {
84              return dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
85          }
86      };
87      
88      private final LazyInitializer<PipelineTableMetaDataLoader> sourceMetaDataLoaderLazyInitializer = new LazyInitializer<PipelineTableMetaDataLoader>() {
89          
90          @Override
91          protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
92              return new StandardPipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
93          }
94      };
95      
96      public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress initProgress,
97                                     final TransmissionProcessContext jobProcessContext, final MigrationTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager) {
98          this.jobConfig = jobConfig;
99          jobId = jobConfig.getJobId();
100         this.shardingItem = shardingItem;
101         dataSourceName = taskConfig.getDataSourceName();
102         this.initProgress = initProgress;
103         if (null != initProgress) {
104             processedRecordsCount.set(initProgress.getProcessedRecordsCount());
105             inventoryRecordsCount.set(initProgress.getInventoryRecordsCount());
106         }
107         this.jobProcessContext = jobProcessContext;
108         this.taskConfig = taskConfig;
109         this.dataSourceManager = dataSourceManager;
110     }
111     
112     /**
113      * Get source data source.
114      *
115      * @return source data source
116      */
117     @SneakyThrows(ConcurrentException.class)
118     public PipelineDataSourceWrapper getSourceDataSource() {
119         return sourceDataSourceLazyInitializer.get();
120     }
121     
122     @Override
123     @SneakyThrows(ConcurrentException.class)
124     public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
125         return sourceMetaDataLoaderLazyInitializer.get();
126     }
127     
128     @Override
129     public PipelineSink getSink() {
130         return new PipelineDataSourceSink(taskConfig.getImporterConfig(), dataSourceManager);
131     }
132     
133     /**
134      * Is source and target database the same or not.
135      *
136      * @return true if source and target database the same, otherwise false
137      */
138     public boolean isSourceTargetDatabaseTheSame() {
139         return jobConfig.getSourceDatabaseType() == jobConfig.getTargetDatabaseType();
140     }
141     
142     @Override
143     public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) {
144         processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
145         PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
146     }
147     
148     @Override
149     public long getProcessedRecordsCount() {
150         return processedRecordsCount.get();
151     }
152     
153     @Override
154     public void updateInventoryRecordsCount(final long recordsCount) {
155         inventoryRecordsCount.addAndGet(recordsCount);
156     }
157     
158     @Override
159     public long getInventoryRecordsCount() {
160         return inventoryRecordsCount.get();
161     }
162 }