View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.job.service;
19  
20  import lombok.RequiredArgsConstructor;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
23  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
24  import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
25  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
26  import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
27  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
28  import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
29  import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
30  import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
31  import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
32  import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobMetaData;
33  import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
34  import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
35  import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
36  import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
37  import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.ShardingInfo;
38  import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
39  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
40  import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
41  
42  import java.time.LocalDateTime;
43  import java.util.Collection;
44  import java.util.Collections;
45  import java.util.List;
46  import java.util.Optional;
47  import java.util.concurrent.TimeUnit;
48  import java.util.stream.Collectors;
49  
50  /**
51   * Pipeline job manager.
52   */
53  @RequiredArgsConstructor
54  @Slf4j
55  public final class PipelineJobManager {
56      
57      @SuppressWarnings("rawtypes")
58      private final PipelineJobType jobType;
59      
60      /**
61       * Start job.
62       *
63       * @param jobConfig job configuration
64       */
65      public void start(final PipelineJobConfiguration jobConfig) {
66          String jobId = jobConfig.getJobId();
67          ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
68          PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
69          if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) {
70              log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
71              return;
72          }
73          governanceFacade.getJobFacade().getJob().create(jobId, jobType.getOption().getJobClass());
74          governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobType.getOption()).convertToJobConfigurationPOJO(jobConfig));
75      }
76      
77      /**
78       * Resume disabled job.
79       *
80       * @param jobId job id
81       */
82      public void resume(final String jobId) {
83          if (jobType.getOption().isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
84              Optional<? extends PipelineJobItemProgress> jobItemProgress = new PipelineJobItemManager<>(jobType.getOption().getYamlJobItemProgressSwapper()).getProgress(jobId, 0);
85              if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) {
86                  log.info("job status is FINISHED, ignore, jobId={}", jobId);
87                  return;
88              }
89          }
90          startCurrentDisabledJob(jobId);
91          String toBeStartDisabledNextJobType = jobType.getOption().getToBeStartDisabledNextJobType();
92          if (null != toBeStartDisabledNextJobType) {
93              startNextDisabledJob(jobId, toBeStartDisabledNextJobType);
94          }
95      }
96      
97      private void startCurrentDisabledJob(final String jobId) {
98          PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
99          pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
100         JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
101         jobConfigPOJO.setDisabled(false);
102         jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
103         jobConfigPOJO.getProps().remove("stop_time");
104         jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1));
105         String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
106         pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount());
107         PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
108         pipelineDistributedBarrier.await(barrierEnablePath, 5L, TimeUnit.SECONDS);
109     }
110     
111     private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) {
112         PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> {
113             try {
114                 new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStartDisabledNextJobType)).resume(optional);
115                 // CHECKSTYLE:OFF
116             } catch (final RuntimeException ex) {
117                 // CHECKSTYLE:ON
118                 log.warn("start related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
119             }
120         });
121     }
122     
123     /**
124      * Stop job.
125      *
126      * @param jobId job id
127      */
128     public void stop(final String jobId) {
129         String toBeStoppedPreviousJobType = jobType.getOption().getToBeStoppedPreviousJobType();
130         if (null != toBeStoppedPreviousJobType) {
131             stopPreviousJob(jobId, toBeStoppedPreviousJobType);
132         }
133         stopCurrentJob(jobId);
134     }
135     
136     private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) {
137         PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> {
138             try {
139                 new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStoppedPreviousJobType)).stop(optional);
140                 // CHECKSTYLE:OFF
141             } catch (final RuntimeException ex) {
142                 // CHECKSTYLE:ON
143                 log.warn("stop related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
144             }
145         });
146     }
147     
148     private void stopCurrentJob(final String jobId) {
149         PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
150         pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
151         JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
152         jobConfigPOJO.setDisabled(true);
153         jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DateTimeFormatterFactory.getDatetimeFormatter()));
154         String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
155         pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
156         PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
157         pipelineDistributedBarrier.await(barrierPath, 5L, TimeUnit.SECONDS);
158     }
159     
160     /**
161      * Drop job.
162      *
163      * @param jobId to be drooped job id
164      */
165     public void drop(final String jobId) {
166         PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
167         PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), null);
168         PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getJob().delete(jobId);
169     }
170     
171     /**
172      * Get pipeline jobs info.
173      *
174      * @param contextKey context key
175      * @return jobs info
176      */
177     @SuppressWarnings("unchecked")
178     public List<PipelineJobInfo> getJobInfos(final PipelineContextKey contextKey) {
179         try {
180             return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(this::isValidJob)
181                     .map(each -> new PipelineJobInfo(new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(each.getJobName())),
182                             jobType.getJobTarget(new PipelineJobConfigurationManager(jobType.getOption()).getJobConfiguration(each.getJobName()))))
183                     .collect(Collectors.toList());
184         } catch (final UnsupportedOperationException ex) {
185             return Collections.emptyList();
186         }
187     }
188     
189     private boolean isValidJob(final JobBriefInfo jobInfo) {
190         return !jobInfo.getJobName().startsWith("_") && jobType.getOption().isTransmissionJob() && jobType.getType().equals(PipelineJobIdUtils.parseJobType(jobInfo.getJobName()).getType());
191     }
192     
193     /**
194      * Get pipeline job sharding info.
195      *
196      * @param contextKey context key
197      * @param jobId job id
198      * @return job sharding info
199      */
200     public Collection<ShardingInfo> getJobShardingInfos(final PipelineContextKey contextKey, final String jobId) {
201         try {
202             return PipelineAPIFactory.getShardingStatisticsAPI(contextKey).getShardingInfo(jobId);
203         } catch (final UnsupportedOperationException ex) {
204             return Collections.emptyList();
205         }
206     }
207 }