1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.job.service;
19
20 import lombok.RequiredArgsConstructor;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
23 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
24 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
25 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
26 import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
27 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
28 import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
29 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
30 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
31 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
32 import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
33 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
34 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
35 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
36 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
37 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
38
39 import java.time.LocalDateTime;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.Optional;
43 import java.util.concurrent.TimeUnit;
44 import java.util.stream.Collectors;
45
46
47
48
49 @RequiredArgsConstructor
50 @Slf4j
51 public final class PipelineJobManager {
52
53 private final PipelineJobType jobType;
54
55
56
57
58
59
60 public void start(final PipelineJobConfiguration jobConfig) {
61 String jobId = jobConfig.getJobId();
62 ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
63 PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
64 if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) {
65 log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
66 return;
67 }
68 governanceFacade.getJobFacade().getJob().create(jobId, jobType.getJobClass());
69 governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobType).convertToJobConfigurationPOJO(jobConfig));
70 }
71
72
73
74
75
76
77 public void resume(final String jobId) {
78 if (jobType.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
79 Optional<? extends PipelineJobItemProgress> jobItemProgress = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).getProgress(jobId, 0);
80 if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) {
81 log.info("job status is FINISHED, ignore, jobId={}", jobId);
82 return;
83 }
84 }
85 startCurrentDisabledJob(jobId);
86 jobType.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional));
87
88 }
89
90 private void startCurrentDisabledJob(final String jobId) {
91 PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
92 pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
93 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
94 jobConfigPOJO.setDisabled(false);
95 jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
96 jobConfigPOJO.getProps().remove("stop_time");
97 jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1));
98 String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
99 pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount());
100 PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
101 pipelineDistributedBarrier.await(barrierEnablePath, 5L, TimeUnit.SECONDS);
102 }
103
104 private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) {
105 PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> {
106 try {
107 new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStartDisabledNextJobType)).resume(optional);
108
109 } catch (final RuntimeException ex) {
110
111 log.warn("start related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
112 }
113 });
114 }
115
116
117
118
119
120
121 public void stop(final String jobId) {
122 jobType.getToBeStoppedPreviousJobType().ifPresent(optional -> stopPreviousJob(jobId, optional));
123 stopCurrentJob(jobId);
124 }
125
126 private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) {
127 PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> {
128 try {
129 new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStoppedPreviousJobType)).stop(optional);
130
131 } catch (final RuntimeException ex) {
132
133 log.warn("stop related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
134 }
135 });
136 }
137
138 private void stopCurrentJob(final String jobId) {
139 PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
140 pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
141 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
142 jobConfigPOJO.setDisabled(true);
143 jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
144 String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
145 pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
146 PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
147 pipelineDistributedBarrier.await(barrierPath, 5L, TimeUnit.SECONDS);
148 }
149
150
151
152
153
154
155 public void drop(final String jobId) {
156 PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
157 PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), null);
158 PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getJob().delete(jobId);
159 }
160
161
162
163
164
165
166
167 public List<PipelineJobInfo> getJobInfos(final PipelineContextKey contextKey) {
168 try {
169 return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
170 .filter(each -> !each.getJobName().startsWith("_") && jobType.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
171 .map(each -> jobType.getJobInfo(each.getJobName())).collect(Collectors.toList());
172 } catch (final UnsupportedOperationException ex) {
173 return Collections.emptyList();
174 }
175 }
176 }