View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.context;
19  
20  import lombok.Getter;
21  import lombok.Setter;
22  import lombok.SneakyThrows;
23  import org.apache.commons.lang3.concurrent.ConcurrentException;
24  import org.apache.commons.lang3.concurrent.LazyInitializer;
25  import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
26  import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
27  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
28  import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
29  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
30  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
31  import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
32  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
33  import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
34  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
35  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
36  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
37  import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
38  import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
39  
40  import java.util.Collection;
41  import java.util.LinkedList;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  /**
45   * CDC job item context.
46   */
47  @Getter
48  public final class CDCJobItemContext implements TransmissionJobItemContext {
49      
50      private final CDCJobConfiguration jobConfig;
51      
52      private final int shardingItem;
53      
54      @Setter
55      private volatile boolean stopping;
56      
57      @Setter
58      private volatile JobStatus status = JobStatus.RUNNING;
59      
60      private final TransmissionJobItemProgress initProgress;
61      
62      private final TransmissionProcessContext jobProcessContext;
63      
64      private final CDCTaskConfiguration taskConfig;
65      
66      private final PipelineDataSourceManager dataSourceManager;
67      
68      private final PipelineSink sink;
69      
70      private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
71      
72      private final Collection<PipelineTask> incrementalTasks = new LinkedList<>();
73      
74      private final AtomicLong processedRecordsCount = new AtomicLong(0L);
75      
76      private final AtomicLong inventoryRecordsCount = new AtomicLong(0L);
77      
78      private final LazyInitializer<PipelineDataSourceWrapper> sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSourceWrapper>() {
79          
80          @Override
81          protected PipelineDataSourceWrapper initialize() {
82              return dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig());
83          }
84      };
85      
86      private final LazyInitializer<PipelineTableMetaDataLoader> sourceMetaDataLoaderLazyInitializer = new LazyInitializer<PipelineTableMetaDataLoader>() {
87          
88          @Override
89          protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
90              return new StandardPipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
91          }
92      };
93      
94      public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress initProgress, final TransmissionProcessContext jobProcessContext,
95                               final CDCTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager, final PipelineSink sink) {
96          this.jobConfig = jobConfig;
97          this.shardingItem = shardingItem;
98          this.initProgress = initProgress;
99          if (null != initProgress) {
100             processedRecordsCount.set(initProgress.getProcessedRecordsCount());
101             inventoryRecordsCount.set(initProgress.getInventoryRecordsCount());
102         }
103         this.jobProcessContext = jobProcessContext;
104         this.taskConfig = taskConfig;
105         this.dataSourceManager = dataSourceManager;
106         this.sink = sink;
107     }
108     
109     @Override
110     public String getJobId() {
111         return jobConfig.getJobId();
112     }
113     
114     @Override
115     public String getDataSourceName() {
116         return taskConfig.getDumperContext().getCommonContext().getDataSourceName();
117     }
118     
119     @Override
120     public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) {
121         processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
122         PipelineJobProgressPersistService.notifyPersist(jobConfig.getJobId(), shardingItem);
123     }
124     
125     /**
126      * Get source data source.
127      *
128      * @return source data source
129      */
130     @SneakyThrows(ConcurrentException.class)
131     public PipelineDataSourceWrapper getSourceDataSource() {
132         return sourceDataSourceLazyInitializer.get();
133     }
134     
135     @Override
136     @SneakyThrows(ConcurrentException.class)
137     public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
138         return sourceMetaDataLoaderLazyInitializer.get();
139     }
140     
141     @Override
142     public long getProcessedRecordsCount() {
143         return processedRecordsCount.get();
144     }
145     
146     @Override
147     public void updateInventoryRecordsCount(final long recordsCount) {
148         inventoryRecordsCount.addAndGet(recordsCount);
149     }
150     
151     @Override
152     public long getInventoryRecordsCount() {
153         return inventoryRecordsCount.get();
154     }
155 }