1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.api;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
23 import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
24 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId;
25 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
26 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
27 import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration;
28 import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
29 import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
30 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
31 import org.apache.shardingsphere.data.pipeline.cdc.core.pojo.CDCJobItemInfo;
32 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
33 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
34 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
35 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
36 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
37 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
38 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
39 import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.swapper.YamlPipelineDataSourceConfigurationSwapper;
40 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
41 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
42 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
43 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
44 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
45 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
46 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
47 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
48 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
49 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
50 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
51 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
52 import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
53 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
54 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
55 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
56 import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
57 import org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
58 import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
59 import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
60 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
61 import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
62 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
63 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
64 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
65 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
66 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
67 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
68 import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
69 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
70 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
71 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
72 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
73 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
74
75 import java.sql.Connection;
76 import java.sql.ResultSet;
77 import java.sql.SQLException;
78 import java.time.LocalDateTime;
79 import java.util.Collection;
80 import java.util.Collections;
81 import java.util.HashMap;
82 import java.util.LinkedList;
83 import java.util.List;
84 import java.util.Map;
85 import java.util.Map.Entry;
86 import java.util.Optional;
87 import java.util.Properties;
88 import java.util.stream.Collectors;
89
90
91
92
93 @Slf4j
94 public final class CDCJobAPI implements TransmissionJobAPI {
95
96 private final CDCJobType jobType;
97
98 private final PipelineJobManager jobManager;
99
100 private final PipelineJobConfigurationManager jobConfigManager;
101
102 private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper;
103
104 private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine;
105
106 private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper;
107
108 public CDCJobAPI() {
109 jobType = new CDCJobType();
110 jobManager = new PipelineJobManager(jobType);
111 jobConfigManager = new PipelineJobConfigurationManager(jobType);
112 dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
113 ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
114 pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
115 }
116
117
118
119
120
121
122
123
124
125 public String create(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
126 PipelineContextKey contextKey = new PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
127 YamlCDCJobConfiguration yamlJobConfig = getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
128 CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
129 ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
130 PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
131 if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId())) {
132 log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
133 } else {
134 governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), jobType.getJobClass());
135 JobConfigurationPOJO jobConfigPOJO = jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
136 jobConfigPOJO.setDisabled(true);
137 governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO);
138 if (!param.isFull()) {
139 initIncrementalPosition(jobConfig);
140 }
141 }
142 return jobConfig.getJobId();
143 }
144
145 private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps, final PipelineContextKey contextKey) {
146 YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
147 List<String> schemaTableNames = param.getSchemaTableNames();
148 Collections.sort(schemaTableNames);
149 result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, schemaTableNames, param.isFull())));
150 result.setDatabaseName(param.getDatabaseName());
151 result.setSchemaTableNames(schemaTableNames);
152 result.setFull(param.isFull());
153 result.setDecodeWithTX(param.isDecodeWithTX());
154 YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
155 sinkConfig.setSinkType(sinkType.name());
156 sinkConfig.setProps(sinkProps);
157 result.setSinkConfig(sinkConfig);
158 ShardingSphereDatabase database = PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getDatabaseName());
159 result.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
160 List<JobDataNodeLine> jobDataNodeLines = JobDataNodeLineConvertUtils.convertDataNodesToLines(param.getDataNodesMap());
161 result.setJobShardingDataNodes(jobDataNodeLines.stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
162 JobDataNodeLine tableFirstDataNodes = new JobDataNodeLine(param.getDataNodesMap().entrySet().stream()
163 .map(each -> new JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1))).collect(Collectors.toList()));
164 result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
165 result.setSourceDatabaseType(PipelineDataSourceConfigurationFactory.newInstance(
166 result.getDataSourceConfiguration().getType(), result.getDataSourceConfiguration().getParameter()).getDatabaseType().getType());
167 return result;
168 }
169
170 private ShardingSpherePipelineDataSourceConfiguration getDataSourceConfiguration(final ShardingSphereDatabase database) {
171 Map<String, Map<String, Object>> dataSourcePoolProps = new HashMap<>(database.getResourceMetaData().getStorageUnits().size(), 1F);
172 for (Entry<String, StorageUnit> entry : database.getResourceMetaData().getStorageUnits().entrySet()) {
173 dataSourcePoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
174 }
175 YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
176 targetRootConfig.setDatabaseName(database.getName());
177 targetRootConfig.setDataSources(dataSourcePoolProps);
178 targetRootConfig.setRules(ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations()));
179 return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
180 }
181
182 private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
183 String jobId = jobConfig.getJobId();
184 PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
185 try (PipelineDataSourceManager pipelineDataSourceManager = new PipelineDataSourceManager()) {
186 for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
187 if (jobItemManager.getProgress(jobId, i).isPresent()) {
188 continue;
189 }
190 IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
191 TransmissionJobItemProgress jobItemProgress = getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext);
192 PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist(
193 jobId, i, YamlEngine.marshal(jobType.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
194 }
195 } catch (final SQLException ex) {
196 throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
197 }
198 }
199
200 private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) {
201 JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
202 String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
203 StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
204 return new IncrementalDumperContext(
205 new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), tableAndSchemaNameMapper),
206 jobConfig.getJobId(), jobConfig.isDecodeWithTX());
207 }
208
209 private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager,
210 final IncrementalDumperContext incrementalDumperContext) throws SQLException {
211 TransmissionJobItemProgress result = new TransmissionJobItemProgress();
212 result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
213 result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
214 IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
215 IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager));
216 result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
217 return result;
218 }
219
220
221
222
223
224
225
226 public void start(final String jobId, final PipelineSink sink) {
227 CDCJob job = new CDCJob(sink);
228 PipelineJobRegistry.add(jobId, job);
229 enable(jobId);
230 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
231 OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfigPOJO.toJobConfiguration());
232 job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
233 oneOffJobBootstrap.execute();
234 }
235
236 private void enable(final String jobId) {
237 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
238 jobConfigPOJO.setDisabled(false);
239 jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
240 jobConfigPOJO.getProps().remove("stop_time");
241 PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
242 }
243
244
245
246
247
248
249 public void disable(final String jobId) {
250 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
251 jobConfigPOJO.setDisabled(true);
252 jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
253 PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
254 }
255
256
257
258
259
260
261 public void drop(final String jobId) {
262 CDCJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
263 jobManager.drop(jobId);
264 cleanup(jobConfig);
265 }
266
267 private void cleanup(final CDCJobConfiguration jobConfig) {
268 for (Entry<String, Map<String, Object>> entry : jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
269 try {
270 StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(entry.getValue());
271 new IncrementalTaskPositionManager(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(), pipelineDataSourceConfig);
272 } catch (final SQLException ex) {
273 log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex);
274 }
275 }
276 }
277
278
279
280
281
282
283
284 public List<CDCJobItemInfo> getJobItemInfos(final String jobId) {
285 CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
286 ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
287 Collection<TransmissionJobItemInfo> jobItemInfos = new TransmissionJobManager(jobType).getJobItemInfos(jobId);
288 List<CDCJobItemInfo> result = new LinkedList<>();
289 for (TransmissionJobItemInfo each : jobItemInfos) {
290 TransmissionJobItemProgress jobItemProgress = each.getJobItemProgress();
291 if (null == jobItemProgress) {
292 result.add(new CDCJobItemInfo(each, "", ""));
293 continue;
294 }
295 result.add(new CDCJobItemInfo(each, jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse(""),
296 getCurrentPosition(database, jobItemProgress.getDataSourceName())));
297 }
298 return result;
299 }
300
301 private String getCurrentPosition(final ShardingSphereDatabase database, final String dataSourceName) {
302 StorageUnit storageUnit = database.getResourceMetaData().getStorageUnits().get(dataSourceName);
303 DialectPipelineSQLBuilder sqlBuilder = DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, storageUnit.getStorageType());
304 Optional<String> queryCurrentPositionSQL = sqlBuilder.buildQueryCurrentPositionSQL();
305 if (!queryCurrentPositionSQL.isPresent()) {
306 return "";
307 }
308 try (Connection connection = storageUnit.getDataSource().getConnection()) {
309 ResultSet resultSet = connection.createStatement().executeQuery(queryCurrentPositionSQL.get());
310 resultSet.next();
311 return resultSet.getString(1);
312 } catch (final SQLException ex) {
313 throw new PipelineInternalException(ex);
314 }
315 }
316
317 @Override
318 public void commit(final String jobId) throws SQLException {
319 }
320
321 @Override
322 public void rollback(final String jobId) throws SQLException {
323 }
324
325 @Override
326 public String getType() {
327 return "STREAMING";
328 }
329 }