1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.job.service;
19
20 import lombok.RequiredArgsConstructor;
21 import org.apache.shardingsphere.data.pipeline.core.execute.ShardingTotalCountUsageJobExecutorServiceHandler;
22 import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
23 import org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
24 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
25 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
26 import org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
27 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
28 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
29 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
30
31 import java.time.LocalDateTime;
32 import java.util.Collections;
33
34
35
36
37 @RequiredArgsConstructor
38 public final class PipelineJobConfigurationManager {
39
40 private final PipelineJobOption jobOption;
41
42
43
44
45
46
47
48
49 @SuppressWarnings("unchecked")
50 public <T extends PipelineJobConfiguration> T getJobConfiguration(final String jobId) {
51 return (T) jobOption.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
52 }
53
54
55
56
57
58
59
60 @SuppressWarnings({"unchecked", "rawtypes"})
61 public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) {
62 JobConfigurationPOJO result = new JobConfigurationPOJO();
63 result.setJobName(jobConfig.getJobId());
64 result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount());
65 YamlPipelineJobConfigurationSwapper swapper = jobOption.getYamlJobConfigurationSwapper();
66 result.setJobParameter(YamlEngine.marshal(swapper.swapToYamlConfiguration(jobConfig)));
67 String createTimeFormat = LocalDateTime.now().format(DateTimeFormatterFactory.getDatetimeFormatter());
68 result.getProps().setProperty("create_time", createTimeFormat);
69 result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
70 result.getProps().setProperty("run_count", "1");
71 result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
72 result.setJobExecutorServiceHandlerType(ShardingTotalCountUsageJobExecutorServiceHandler.TYPE);
73 return result;
74 }
75 }