View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.api;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
23  import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
24  import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId;
25  import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
26  import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
27  import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration;
28  import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
29  import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
30  import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
31  import org.apache.shardingsphere.data.pipeline.cdc.core.pojo.CDCJobItemInfo;
32  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
33  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
34  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
35  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
36  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
37  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
38  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
39  import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.swapper.YamlPipelineDataSourceConfigurationSwapper;
40  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
41  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
42  import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
43  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
44  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
45  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
46  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
47  import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
48  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
49  import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
50  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
51  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
52  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
53  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
54  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
55  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
56  import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
57  import org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
58  import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
59  import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
60  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
61  import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
62  import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
63  import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
64  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
65  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
66  import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
67  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
68  import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
69  import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
70  import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
71  import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
72  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
73  import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
74  
75  import java.sql.Connection;
76  import java.sql.ResultSet;
77  import java.sql.SQLException;
78  import java.time.LocalDateTime;
79  import java.util.Collection;
80  import java.util.Collections;
81  import java.util.HashMap;
82  import java.util.LinkedList;
83  import java.util.List;
84  import java.util.Map;
85  import java.util.Map.Entry;
86  import java.util.Optional;
87  import java.util.Properties;
88  import java.util.stream.Collectors;
89  
90  /**
91   * CDC job API.
92   */
93  @Slf4j
94  public final class CDCJobAPI implements TransmissionJobAPI {
95      
96      private final CDCJobType jobType;
97      
98      private final PipelineJobManager jobManager;
99      
100     private final PipelineJobConfigurationManager jobConfigManager;
101     
102     private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper;
103     
104     private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine;
105     
106     private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper;
107     
108     public CDCJobAPI() {
109         jobType = new CDCJobType();
110         jobManager = new PipelineJobManager(jobType);
111         jobConfigManager = new PipelineJobConfigurationManager(jobType);
112         dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
113         ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
114         pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
115     }
116     
117     /**
118      * Create CDC job.
119      *
120      * @param param CDC job parameter
121      * @param sinkType sink type
122      * @param sinkProps sink properties
123      * @return job id
124      */
125     public String create(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
126         PipelineContextKey contextKey = new PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
127         YamlCDCJobConfiguration yamlJobConfig = getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
128         CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
129         ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
130         PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
131         if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId())) {
132             log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
133         } else {
134             governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), jobType.getJobClass());
135             JobConfigurationPOJO jobConfigPOJO = jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
136             jobConfigPOJO.setDisabled(true);
137             governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO);
138             if (!param.isFull()) {
139                 initIncrementalPosition(jobConfig);
140             }
141         }
142         return jobConfig.getJobId();
143     }
144     
145     private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps, final PipelineContextKey contextKey) {
146         YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
147         List<String> schemaTableNames = param.getSchemaTableNames();
148         Collections.sort(schemaTableNames);
149         result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, schemaTableNames, param.isFull())));
150         result.setDatabaseName(param.getDatabaseName());
151         result.setSchemaTableNames(schemaTableNames);
152         result.setFull(param.isFull());
153         result.setDecodeWithTX(param.isDecodeWithTX());
154         YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
155         sinkConfig.setSinkType(sinkType.name());
156         sinkConfig.setProps(sinkProps);
157         result.setSinkConfig(sinkConfig);
158         ShardingSphereDatabase database = PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
159         result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
160         List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getDataNodesMap());
161         result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
162         JobDataNodeLine tableFirstDataNodes = new JobDataNodeLine(param.getDataNodesMap().entrySet().stream()
163                 .map(each -> new JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1))).collect(Collectors.toList()));
164         result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
165         result.setSourceDatabaseType(PipelineDataSourceConfigurationFactory.newInstance(
166                 result.getDataSourceConfiguration().getType(), result.getDataSourceConfiguration().getParameter()).getDatabaseType().getType());
167         return result;
168     }
169     
170     private ShardingSpherePipelineDataSourceConfiguration getDataSourceConfiguration(final ShardingSphereDatabase database) {
171         Map<String, Map<String, Object>> dataSourcePoolProps = new HashMap<>(database.getResourceMetaData().getStorageUnits().size(), 1F);
172         for (Entry<String, StorageUnit> entry : database.getResourceMetaData().getStorageUnits().entrySet()) {
173             dataSourcePoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
174         }
175         YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
176         targetRootConfig.setDatabaseName(database.getName());
177         targetRootConfig.setDataSources(dataSourcePoolProps);
178         targetRootConfig.setRules(ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations()));
179         return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
180     }
181     
182     private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
183         String jobId = jobConfig.getJobId();
184         PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
185         try (PipelineDataSourceManager pipelineDataSourceManager = new PipelineDataSourceManager()) {
186             for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
187                 if (jobItemManager.getProgress(jobId, i).isPresent()) {
188                     continue;
189                 }
190                 IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
191                 TransmissionJobItemProgress jobItemProgress = getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext);
192                 PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
193                         jobId, i, YamlEngine.marshal(jobType.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
194             }
195         } catch (final SQLException ex) {
196             throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
197         }
198     }
199     
200     private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) {
201         JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
202         String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
203         StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
204         return new IncrementalDumperContext(
205                 new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), tableAndSchemaNameMapper),
206                 jobConfig.getJobId(), jobConfig.isDecodeWithTX());
207     }
208     
209     private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager,
210                                                                        final IncrementalDumperContext incrementalDumperContext) throws SQLException {
211         TransmissionJobItemProgress result = new TransmissionJobItemProgress();
212         result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
213         result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
214         IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
215         IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager));
216         result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
217         return result;
218     }
219     
220     /**
221      * Start CDC job.
222      *
223      * @param jobId job id
224      * @param sink sink
225      */
226     public void start(final String jobId, final PipelineSink sink) {
227         CDCJob job = new CDCJob(sink);
228         PipelineJobRegistry.add(jobId, job);
229         enable(jobId);
230         JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
231         OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfigPOJO.toJobConfiguration());
232         job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
233         oneOffJobBootstrap.execute();
234     }
235     
236     private void enable(final String jobId) {
237         JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
238         jobConfigPOJO.setDisabled(false);
239         jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
240         jobConfigPOJO.getProps().remove("stop_time");
241         PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
242     }
243     
244     /**
245      * Disable CDC job.
246      *
247      * @param jobId job id
248      */
249     public void disable(final String jobId) {
250         JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
251         jobConfigPOJO.setDisabled(true);
252         jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
253         PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
254     }
255     
256     /**
257      * Drop CDC job.
258      *
259      * @param jobId job id
260      */
261     public void drop(final String jobId) {
262         CDCJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
263         jobManager.drop(jobId);
264         cleanup(jobConfig);
265     }
266     
267     private void cleanup(final CDCJobConfiguration jobConfig) {
268         for (Entry<String, Map<String, Object>> entry : jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
269             try {
270                 StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(entry.getValue());
271                 new IncrementalTaskPositionManager(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(), pipelineDataSourceConfig);
272             } catch (final SQLException ex) {
273                 log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex);
274             }
275         }
276     }
277     
278     /**
279      * Get job item infos.
280      *
281      * @param jobId job id
282      * @return job item infos
283      */
284     public List<CDCJobItemInfo> getJobItemInfos(final String jobId) {
285         CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
286         ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
287         Collection<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobType).getJobItemInfos(jobId);
288         List<CDCJobItemInfo> result = new LinkedList<>();
289         for (TransmissionJobItemInfo each : jobItemInfos) {
290             TransmissionJobItemProgress jobItemProgress = each.getJobItemProgress();
291             if (null == jobItemProgress) {
292                 result.add(new CDCJobItemInfo(each, "", ""));
293                 continue;
294             }
295             result.add(new CDCJobItemInfo(each, jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse(""),
296                     getCurrentPosition(database, jobItemProgress.getDataSourceName())));
297         }
298         return result;
299     }
300     
301     private String getCurrentPosition(final ShardingSphereDatabase database, final String dataSourceName) {
302         StorageUnit storageUnit = database.getResourceMetaData().getStorageUnits().get(dataSourceName);
303         DialectPipelineSQLBuilder sqlBuilder = DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, storageUnit.getStorageType());
304         Optional<String> queryCurrentPositionSQL = sqlBuilder.buildQueryCurrentPositionSQL();
305         if (!queryCurrentPositionSQL.isPresent()) {
306             return "";
307         }
308         try (Connection connection = storageUnit.getDataSource().getConnection()) {
309             ResultSet resultSet = connection.createStatement().executeQuery(queryCurrentPositionSQL.get());
310             resultSet.next();
311             return resultSet.getString(1);
312         } catch (final SQLException ex) {
313             throw new PipelineInternalException(ex);
314         }
315     }
316     
317     @Override
318     public void commit(final String jobId) throws SQLException {
319     }
320     
321     @Override
322     public void rollback(final String jobId) throws SQLException {
323     }
324     
325     @Override
326     public String getType() {
327         return "STREAMING";
328     }
329 }