View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.api;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
23  import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
24  import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId;
25  import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
26  import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
27  import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration;
28  import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
29  import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
30  import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
31  import org.apache.shardingsphere.data.pipeline.cdc.core.pojo.CDCJobItemInfo;
32  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
33  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
34  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
35  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
36  import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
37  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
38  import org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
39  import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.swapper.YamlPipelineDataSourceConfigurationSwapper;
40  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
41  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
42  import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
43  import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
44  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
45  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
46  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
47  import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
48  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
49  import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
50  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
51  import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
52  import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
53  import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlPipelineJobItemProgressSwapper;
54  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
55  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
56  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
57  import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
58  import org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
59  import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
60  import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
61  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
62  import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
63  import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
64  import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
65  import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
66  import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
67  import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
68  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
69  import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
70  import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
71  import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
72  import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
73  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
74  import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
75  
76  import java.sql.Connection;
77  import java.sql.ResultSet;
78  import java.sql.SQLException;
79  import java.sql.Statement;
80  import java.time.LocalDateTime;
81  import java.util.Collection;
82  import java.util.Collections;
83  import java.util.HashMap;
84  import java.util.LinkedList;
85  import java.util.List;
86  import java.util.Map;
87  import java.util.Map.Entry;
88  import java.util.Optional;
89  import java.util.Properties;
90  import java.util.stream.Collectors;
91  
92  /**
93   * CDC job API.
94   */
95  @Slf4j
96  public final class CDCJobAPI implements TransmissionJobAPI {
97      
98      private final CDCJobType jobType;
99      
100     private final PipelineJobManager jobManager;
101     
102     private final PipelineJobConfigurationManager jobConfigManager;
103     
104     private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper;
105     
106     private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine;
107     
108     private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper;
109     
110     public CDCJobAPI() {
111         jobType = new CDCJobType();
112         jobManager = new PipelineJobManager(jobType);
113         jobConfigManager = new PipelineJobConfigurationManager(jobType.getOption());
114         dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
115         ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
116         pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
117     }
118     
119     /**
120      * Create CDC job.
121      *
122      * @param param CDC job parameter
123      * @param sinkType sink type
124      * @param sinkProps sink properties
125      * @return job id
126      */
127     public String create(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
128         PipelineContextKey contextKey = new PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
129         YamlCDCJobConfiguration yamlJobConfig = getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
130         CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
131         ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
132         PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
133         if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId())) {
134             log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
135         } else {
136             governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), jobType.getOption().getJobClass());
137             JobConfigurationPOJO jobConfigPOJO = jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
138             jobConfigPOJO.setDisabled(true);
139             governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO);
140             if (!param.isFull()) {
141                 initIncrementalPosition(jobConfig);
142             }
143         }
144         return jobConfig.getJobId();
145     }
146     
147     private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps, final PipelineContextKey contextKey) {
148         YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
149         List<String> schemaTableNames = param.getSchemaTableNames();
150         Collections.sort(schemaTableNames);
151         result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, schemaTableNames, param.isFull(), sinkType)));
152         result.setDatabaseName(param.getDatabaseName());
153         result.setSchemaTableNames(schemaTableNames);
154         result.setFull(param.isFull());
155         result.setDecodeWithTX(param.isDecodeWithTransaction());
156         YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
157         sinkConfig.setSinkType(sinkType.name());
158         sinkConfig.setProps(sinkProps);
159         result.setSinkConfig(sinkConfig);
160         ShardingSphereDatabase database = PipelineContextManager.getContext(contextKey).getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
161         result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
162         List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getTableAndDataNodesMap());
163         result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
164         JobDataNodeLine tableFirstDataNodes = new JobDataNodeLine(param.getTableAndDataNodesMap().entrySet().stream()
165                 .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList()));
166         result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
167         result.setSourceDatabaseType(PipelineDataSourceConfigurationFactory.newInstance(
168                 result.getDataSourceConfiguration().getType(), result.getDataSourceConfiguration().getParameter()).getDatabaseType().getType());
169         return result;
170     }
171     
172     private ShardingSpherePipelineDataSourceConfiguration getDataSourceConfiguration(final ShardingSphereDatabase database) {
173         Map<String, Map<String, Object>> dataSourcePoolProps = new HashMap<>(database.getResourceMetaData().getStorageUnits().size(), 1F);
174         for (Entry<String, StorageUnit> entry : database.getResourceMetaData().getStorageUnits().entrySet()) {
175             dataSourcePoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
176         }
177         YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
178         targetRootConfig.setDatabaseName(database.getName());
179         targetRootConfig.setDataSources(dataSourcePoolProps);
180         targetRootConfig.setRules(ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations()));
181         return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
182     }
183     
184     @SuppressWarnings({"rawtypes", "unchecked"})
185     private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
186         String jobId = jobConfig.getJobId();
187         PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
188         try (PipelineDataSourceManager pipelineDataSourceManager = new PipelineDataSourceManager()) {
189             for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
190                 if (jobItemManager.getProgress(jobId, i).isPresent()) {
191                     continue;
192                 }
193                 IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
194                 TransmissionJobItemProgress jobItemProgress = getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext);
195                 YamlPipelineJobItemProgressSwapper swapper = jobType.getOption().getYamlJobItemProgressSwapper();
196                 PipelineAPIFactory.getPipelineGovernanceFacade(
197                         PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(jobId, i, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress)));
198             }
199         } catch (final SQLException ex) {
200             throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
201         }
202     }
203     
204     private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) {
205         JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
206         String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
207         StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
208         return new IncrementalDumperContext(new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), tableAndSchemaNameMapper),
209                 jobConfig.getJobId(), jobConfig.isDecodeWithTX());
210     }
211     
212     private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager,
213                                                                        final IncrementalDumperContext incrementalDumperContext) throws SQLException {
214         TransmissionJobItemProgress result = new TransmissionJobItemProgress();
215         result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
216         result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
217         IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
218         IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager));
219         result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
220         return result;
221     }
222     
223     /**
224      * Start CDC job.
225      *
226      * @param jobId job id
227      * @param sink sink
228      */
229     public void start(final String jobId, final PipelineSink sink) {
230         CDCJob job = new CDCJob(sink);
231         PipelineJobRegistry.add(jobId, job);
232         enable(jobId);
233         JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
234         OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfigPOJO.toJobConfiguration());
235         job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
236         oneOffJobBootstrap.execute();
237     }
238     
239     private void enable(final String jobId) {
240         JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
241         jobConfigPOJO.setDisabled(false);
242         jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
243         jobConfigPOJO.getProps().remove("stop_time");
244         PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
245     }
246     
247     /**
248      * Disable CDC job.
249      *
250      * @param jobId job id
251      */
252     public void disable(final String jobId) {
253         JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
254         jobConfigPOJO.setDisabled(true);
255         jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DateTimeFormatterFactory.getDatetimeFormatter()));
256         PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
257     }
258     
259     /**
260      * Drop CDC job.
261      *
262      * @param jobId job id
263      */
264     public void drop(final String jobId) {
265         CDCJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
266         jobManager.drop(jobId);
267         cleanup(jobConfig);
268     }
269     
270     private void cleanup(final CDCJobConfiguration jobConfig) {
271         for (Entry<String, Map<String, Object>> entry : jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
272             try {
273                 StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(entry.getValue());
274                 new IncrementalTaskPositionManager(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(), pipelineDataSourceConfig);
275             } catch (final SQLException ex) {
276                 log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex);
277             }
278         }
279     }
280     
281     /**
282      * Get job item infos.
283      *
284      * @param jobId job id
285      * @return job item infos
286      */
287     public Collection<CDCJobItemInfo> getJobItemInfos(final String jobId) {
288         CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(jobId);
289         ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
290         Collection<CDCJobItemInfo> result = new LinkedList<>();
291         for (TransmissionJobItemInfo each : new TransmissionJobManager(jobType).getJobItemInfos(jobId)) {
292             TransmissionJobItemProgress jobItemProgress = each.getJobItemProgress();
293             String confirmedPosition = null == jobItemProgress ? "" : jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse("");
294             String currentPosition = null == jobItemProgress ? "" : getCurrentPosition(database, jobItemProgress.getDataSourceName());
295             result.add(new CDCJobItemInfo(each, confirmedPosition, currentPosition));
296         }
297         return result;
298     }
299     
300     private String getCurrentPosition(final ShardingSphereDatabase database, final String dataSourceName) {
301         StorageUnit storageUnit = database.getResourceMetaData().getStorageUnits().get(dataSourceName);
302         DialectPipelineSQLBuilder sqlBuilder = DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, storageUnit.getStorageType());
303         Optional<String> queryCurrentPositionSQL = sqlBuilder.buildQueryCurrentPositionSQL();
304         if (!queryCurrentPositionSQL.isPresent()) {
305             return "";
306         }
307         try (
308                 Connection connection = storageUnit.getDataSource().getConnection();
309                 Statement statement = connection.createStatement();
310                 ResultSet resultSet = statement.executeQuery(queryCurrentPositionSQL.get())) {
311             resultSet.next();
312             return resultSet.getString(1);
313         } catch (final SQLException ex) {
314             throw new PipelineInternalException(ex);
315         }
316     }
317     
318     @Override
319     public void commit(final String jobId) {
320     }
321     
322     @Override
323     public void rollback(final String jobId) {
324     }
325     
326     @Override
327     public String getType() {
328         return "STREAMING";
329     }
330 }