1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.api;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
23 import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
24 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId;
25 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
26 import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
27 import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration;
28 import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
29 import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
30 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
31 import org.apache.shardingsphere.data.pipeline.cdc.core.pojo.CDCJobItemInfo;
32 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
33 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
34 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
35 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
36 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
37 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
38 import org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
39 import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.swapper.YamlPipelineDataSourceConfigurationSwapper;
40 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
41 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
42 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
43 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
44 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
45 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
46 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
47 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
48 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
49 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
50 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
51 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
52 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
53 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlPipelineJobItemProgressSwapper;
54 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
55 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
56 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
57 import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
58 import org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
59 import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
60 import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
61 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
62 import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
63 import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
64 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
65 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
66 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
67 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
68 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
69 import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
70 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
71 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
72 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
73 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
74 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
75
76 import java.sql.Connection;
77 import java.sql.ResultSet;
78 import java.sql.SQLException;
79 import java.sql.Statement;
80 import java.time.LocalDateTime;
81 import java.util.Collection;
82 import java.util.Collections;
83 import java.util.HashMap;
84 import java.util.LinkedList;
85 import java.util.List;
86 import java.util.Map;
87 import java.util.Map.Entry;
88 import java.util.Optional;
89 import java.util.Properties;
90 import java.util.stream.Collectors;
91
92
93
94
95 @Slf4j
96 public final class CDCJobAPI implements TransmissionJobAPI {
97
98 private final CDCJobType jobType;
99
100 private final PipelineJobManager jobManager;
101
102 private final PipelineJobConfigurationManager jobConfigManager;
103
104 private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper;
105
106 private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine;
107
108 private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper;
109
110 public CDCJobAPI() {
111 jobType = new CDCJobType();
112 jobManager = new PipelineJobManager(jobType);
113 jobConfigManager = new PipelineJobConfigurationManager(jobType.getOption());
114 dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
115 ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
116 pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
117 }
118
119
120
121
122
123
124
125
126
127 public String create(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
128 PipelineContextKey contextKey = new PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
129 YamlCDCJobConfiguration yamlJobConfig = getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
130 CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
131 ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
132 PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
133 if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId())) {
134 log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
135 } else {
136 governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), jobType.getOption().getJobClass());
137 JobConfigurationPOJO jobConfigPOJO = jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
138 jobConfigPOJO.setDisabled(true);
139 governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO);
140 if (!param.isFull()) {
141 initIncrementalPosition(jobConfig);
142 }
143 }
144 return jobConfig.getJobId();
145 }
146
147 private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps, final PipelineContextKey contextKey) {
148 YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
149 List<String> schemaTableNames = param.getSchemaTableNames();
150 Collections.sort(schemaTableNames);
151 result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, schemaTableNames, param.isFull(), sinkType)));
152 result.setDatabaseName(param.getDatabaseName());
153 result.setSchemaTableNames(schemaTableNames);
154 result.setFull(param.isFull());
155 result.setDecodeWithTX(param.isDecodeWithTransaction());
156 YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
157 sinkConfig.setSinkType(sinkType.name());
158 sinkConfig.setProps(sinkProps);
159 result.setSinkConfig(sinkConfig);
160 ShardingSphereDatabase database = PipelineContextManager.getContext(contextKey).getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
161 result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
162 List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getTableAndDataNodesMap());
163 result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
164 JobDataNodeLine tableFirstDataNodes = new JobDataNodeLine(param.getTableAndDataNodesMap().entrySet().stream()
165 .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList()));
166 result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
167 result.setSourceDatabaseType(PipelineDataSourceConfigurationFactory.newInstance(
168 result.getDataSourceConfiguration().getType(), result.getDataSourceConfiguration().getParameter()).getDatabaseType().getType());
169 return result;
170 }
171
172 private ShardingSpherePipelineDataSourceConfiguration getDataSourceConfiguration(final ShardingSphereDatabase database) {
173 Map<String, Map<String, Object>> dataSourcePoolProps = new HashMap<>(database.getResourceMetaData().getStorageUnits().size(), 1F);
174 for (Entry<String, StorageUnit> entry : database.getResourceMetaData().getStorageUnits().entrySet()) {
175 dataSourcePoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
176 }
177 YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
178 targetRootConfig.setDatabaseName(database.getName());
179 targetRootConfig.setDataSources(dataSourcePoolProps);
180 targetRootConfig.setRules(ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations()));
181 return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
182 }
183
184 @SuppressWarnings({"rawtypes", "unchecked"})
185 private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
186 String jobId = jobConfig.getJobId();
187 PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper());
188 try (PipelineDataSourceManager pipelineDataSourceManager = new PipelineDataSourceManager()) {
189 for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
190 if (jobItemManager.getProgress(jobId, i).isPresent()) {
191 continue;
192 }
193 IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
194 TransmissionJobItemProgress jobItemProgress = getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext);
195 YamlPipelineJobItemProgressSwapper swapper = jobType.getOption().getYamlJobItemProgressSwapper();
196 PipelineAPIFactory.getPipelineGovernanceFacade(
197 PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(jobId, i, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress)));
198 }
199 } catch (final SQLException ex) {
200 throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
201 }
202 }
203
204 private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) {
205 JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
206 String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
207 StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
208 return new IncrementalDumperContext(new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), tableAndSchemaNameMapper),
209 jobConfig.getJobId(), jobConfig.isDecodeWithTX());
210 }
211
212 private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager,
213 final IncrementalDumperContext incrementalDumperContext) throws SQLException {
214 TransmissionJobItemProgress result = new TransmissionJobItemProgress();
215 result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
216 result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
217 IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
218 IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager));
219 result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
220 return result;
221 }
222
223
224
225
226
227
228
229 public void start(final String jobId, final PipelineSink sink) {
230 CDCJob job = new CDCJob(sink);
231 PipelineJobRegistry.add(jobId, job);
232 enable(jobId);
233 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
234 OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfigPOJO.toJobConfiguration());
235 job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
236 oneOffJobBootstrap.execute();
237 }
238
239 private void enable(final String jobId) {
240 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
241 jobConfigPOJO.setDisabled(false);
242 jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
243 jobConfigPOJO.getProps().remove("stop_time");
244 PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
245 }
246
247
248
249
250
251
252 public void disable(final String jobId) {
253 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
254 jobConfigPOJO.setDisabled(true);
255 jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DateTimeFormatterFactory.getDatetimeFormatter()));
256 PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
257 }
258
259
260
261
262
263
264 public void drop(final String jobId) {
265 CDCJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
266 jobManager.drop(jobId);
267 cleanup(jobConfig);
268 }
269
270 private void cleanup(final CDCJobConfiguration jobConfig) {
271 for (Entry<String, Map<String, Object>> entry : jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
272 try {
273 StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(entry.getValue());
274 new IncrementalTaskPositionManager(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(), pipelineDataSourceConfig);
275 } catch (final SQLException ex) {
276 log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex);
277 }
278 }
279 }
280
281
282
283
284
285
286
287 public Collection<CDCJobItemInfo> getJobItemInfos(final String jobId) {
288 CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(jobId);
289 ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
290 Collection<CDCJobItemInfo> result = new LinkedList<>();
291 for (TransmissionJobItemInfo each : new TransmissionJobManager(jobType).getJobItemInfos(jobId)) {
292 TransmissionJobItemProgress jobItemProgress = each.getJobItemProgress();
293 String confirmedPosition = null == jobItemProgress ? "" : jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse("");
294 String currentPosition = null == jobItemProgress ? "" : getCurrentPosition(database, jobItemProgress.getDataSourceName());
295 result.add(new CDCJobItemInfo(each, confirmedPosition, currentPosition));
296 }
297 return result;
298 }
299
300 private String getCurrentPosition(final ShardingSphereDatabase database, final String dataSourceName) {
301 StorageUnit storageUnit = database.getResourceMetaData().getStorageUnits().get(dataSourceName);
302 DialectPipelineSQLBuilder sqlBuilder = DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, storageUnit.getStorageType());
303 Optional<String> queryCurrentPositionSQL = sqlBuilder.buildQueryCurrentPositionSQL();
304 if (!queryCurrentPositionSQL.isPresent()) {
305 return "";
306 }
307 try (
308 Connection connection = storageUnit.getDataSource().getConnection();
309 Statement statement = connection.createStatement();
310 ResultSet resultSet = statement.executeQuery(queryCurrentPositionSQL.get())) {
311 resultSet.next();
312 return resultSet.getString(1);
313 } catch (final SQLException ex) {
314 throw new PipelineInternalException(ex);
315 }
316 }
317
318 @Override
319 public void commit(final String jobId) {
320 }
321
322 @Override
323 public void rollback(final String jobId) {
324 }
325
326 @Override
327 public String getType() {
328 return "STREAMING";
329 }
330 }