1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.job.service;
19
20 import lombok.RequiredArgsConstructor;
21 import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
22 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
23 import org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
24 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
25 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
26 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
27 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
28
29 import java.time.LocalDateTime;
30 import java.util.Collections;
31
32
33
34
35 @RequiredArgsConstructor
36 public final class PipelineJobConfigurationManager {
37
38 private final PipelineJobType jobType;
39
40
41
42
43
44
45
46
47 @SuppressWarnings("unchecked")
48 public <T extends PipelineJobConfiguration> T getJobConfiguration(final String jobId) {
49 return (T) jobType.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
50 }
51
52
53
54
55
56
57
58 public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) {
59 JobConfigurationPOJO result = new JobConfigurationPOJO();
60 result.setJobName(jobConfig.getJobId());
61 result.setShardingTotalCount(jobType.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount());
62 result.setJobParameter(YamlEngine.marshal(jobType.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
63 String createTimeFormat = LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter());
64 result.getProps().setProperty("create_time", createTimeFormat);
65 result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
66 result.getProps().setProperty("run_count", "1");
67 result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
68 return result;
69 }
70 }