1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.job.service;
19
20 import lombok.RequiredArgsConstructor;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
23 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
24 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
25 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
26 import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
27 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
28 import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
29 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
30 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
31 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
32 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
33 import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
34 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
35 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
36 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
37 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.ShardingInfo;
38 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
39 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
40 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
41
42 import java.time.LocalDateTime;
43 import java.util.Collection;
44 import java.util.Collections;
45 import java.util.List;
46 import java.util.Optional;
47 import java.util.concurrent.TimeUnit;
48 import java.util.stream.Collectors;
49
50
51
52
53 @RequiredArgsConstructor
54 @Slf4j
55 public final class PipelineJobManager {
56
57 @SuppressWarnings("rawtypes")
58 private final PipelineJobType jobType;
59
60
61
62
63
64
65 public void start(final PipelineJobConfiguration jobConfig) {
66 String jobId = jobConfig.getJobId();
67 ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
68 PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
69 if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) {
70 log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
71 return;
72 }
73 governanceFacade.getJobFacade().getJob().create(jobId, jobType.getOption().getJobClass());
74 governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobType.getOption()).convertToJobConfigurationPOJO(jobConfig));
75 }
76
77
78
79
80
81
82 public void resume(final String jobId) {
83 if (jobType.getOption().isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
84 Optional<? extends PipelineJobItemProgress> jobItemProgress = new PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper()).getProgress(jobId, 0);
85 if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) {
86 log.info("job status is FINISHED, ignore, jobId={}", jobId);
87 return;
88 }
89 }
90 startCurrentDisabledJob(jobId);
91 String toBeStartDisabledNextJobType = jobType.getOption().getToBeStartDisabledNextJobType();
92 if (null != toBeStartDisabledNextJobType) {
93 startNextDisabledJob(jobId, toBeStartDisabledNextJobType);
94 }
95 }
96
97 private void startCurrentDisabledJob(final String jobId) {
98 PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
99 pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
100 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
101 jobConfigPOJO.setDisabled(false);
102 jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
103 jobConfigPOJO.getProps().remove("stop_time");
104 jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1));
105 String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
106 pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount());
107 PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
108 pipelineDistributedBarrier.await(barrierEnablePath, 5L, TimeUnit.SECONDS);
109 }
110
111 private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) {
112 PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> {
113 try {
114 new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStartDisabledNextJobType)).resume(optional);
115
116 } catch (final RuntimeException ex) {
117
118 log.warn("start related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
119 }
120 });
121 }
122
123
124
125
126
127
128 public void stop(final String jobId) {
129 String toBeStoppedPreviousJobType = jobType.getOption().getToBeStoppedPreviousJobType();
130 if (null != toBeStoppedPreviousJobType) {
131 stopPreviousJob(jobId, toBeStoppedPreviousJobType);
132 }
133 stopCurrentJob(jobId);
134 }
135
136 private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) {
137 PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> {
138 try {
139 new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStoppedPreviousJobType)).stop(optional);
140
141 } catch (final RuntimeException ex) {
142
143 log.warn("stop related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
144 }
145 });
146 }
147
148 private void stopCurrentJob(final String jobId) {
149 PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
150 pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
151 JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
152 jobConfigPOJO.setDisabled(true);
153 jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DateTimeFormatterFactory.getDatetimeFormatter()));
154 String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
155 pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
156 PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
157 pipelineDistributedBarrier.await(barrierPath, 5L, TimeUnit.SECONDS);
158 }
159
160
161
162
163
164
165 public void drop(final String jobId) {
166 PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
167 PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), null);
168 PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getJob().delete(jobId);
169 }
170
171
172
173
174
175
176
177 @SuppressWarnings("unchecked")
178 public List<PipelineJobInfo> getJobInfos(final PipelineContextKey contextKey) {
179 try {
180 return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(this::isValidJob)
181 .map(each -> new PipelineJobInfo(new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName())),
182 jobType.getJobTarget(new PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(each.getJobName()))))
183 .collect(Collectors.toList());
184 } catch (final UnsupportedOperationException ex) {
185 return Collections.emptyList();
186 }
187 }
188
189 private boolean isValidJob(final JobBriefInfo jobInfo) {
190 return !jobInfo.getJobName().startsWith("_") && jobType.getOption().isTransmissionJob() && jobType.getType().equals(PipelineJobIdUtils.parseJobType(jobInfo.getJobName()).getType());
191 }
192
193
194
195
196
197
198
199
200 public Collection<ShardingInfo> getJobShardingInfos(final PipelineContextKey contextKey, final String jobId) {
201 try {
202 return PipelineAPIFactory.getShardingStatisticsAPI(contextKey).getShardingInfo(jobId);
203 } catch (final UnsupportedOperationException ex) {
204 return Collections.emptyList();
205 }
206 }
207 }