View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.job.service;
19  
20  import lombok.RequiredArgsConstructor;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
23  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
24  import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
25  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
26  import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
27  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
28  import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
29  import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
30  import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
31  import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
32  import org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
33  import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
34  import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
35  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
36  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
37  import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
38  
39  import java.time.LocalDateTime;
40  import java.util.Collections;
41  import java.util.List;
42  import java.util.Optional;
43  import java.util.concurrent.TimeUnit;
44  import java.util.stream.Collectors;
45  
46  /**
47   * Pipeline job manager.
48   */
49  @RequiredArgsConstructor
50  @Slf4j
51  public final class PipelineJobManager {
52      
53      private final PipelineJobType jobType;
54      
55      /**
56       * Start job.
57       *
58       * @param jobConfig job configuration
59       */
60      public void start(final PipelineJobConfiguration jobConfig) {
61          String jobId = jobConfig.getJobId();
62          ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId));
63          PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
64          if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) {
65              log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId);
66              return;
67          }
68          governanceFacade.getJobFacade().getJob().create(jobId, jobType.getJobClass());
69          governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobType).convertToJobConfigurationPOJO(jobConfig));
70      }
71      
72      /**
73       * Resume disabled job.
74       *
75       * @param jobId job id
76       */
77      public void resume(final String jobId) {
78          if (jobType.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
79              Optional<? extends PipelineJobItemProgress> jobItemProgress = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).getProgress(jobId, 0);
80              if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) {
81                  log.info("job status is FINISHED, ignore, jobId={}", jobId);
82                  return;
83              }
84          }
85          startCurrentDisabledJob(jobId);
86          jobType.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional));
87          
88      }
89      
90      private void startCurrentDisabledJob(final String jobId) {
91          PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
92          pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
93          JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
94          jobConfigPOJO.setDisabled(false);
95          jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
96          jobConfigPOJO.getProps().remove("stop_time");
97          jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1));
98          String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
99          pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount());
100         PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
101         pipelineDistributedBarrier.await(barrierEnablePath, 5L, TimeUnit.SECONDS);
102     }
103     
104     private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) {
105         PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> {
106             try {
107                 new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStartDisabledNextJobType)).resume(optional);
108                 // CHECKSTYLE:OFF
109             } catch (final RuntimeException ex) {
110                 // CHECKSTYLE:ON
111                 log.warn("start related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
112             }
113         });
114     }
115     
116     /**
117      * Stop job.
118      *
119      * @param jobId job id
120      */
121     public void stop(final String jobId) {
122         jobType.getToBeStoppedPreviousJobType().ifPresent(optional -> stopPreviousJob(jobId, optional));
123         stopCurrentJob(jobId);
124     }
125     
126     private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) {
127         PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> {
128             try {
129                 new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStoppedPreviousJobType)).stop(optional);
130                 // CHECKSTYLE:OFF
131             } catch (final RuntimeException ex) {
132                 // CHECKSTYLE:ON
133                 log.warn("stop related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
134             }
135         });
136     }
137     
138     private void stopCurrentJob(final String jobId) {
139         PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
140         pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
141         JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
142         jobConfigPOJO.setDisabled(true);
143         jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
144         String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
145         pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
146         PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
147         pipelineDistributedBarrier.await(barrierPath, 5L, TimeUnit.SECONDS);
148     }
149     
150     /**
151      * Drop job.
152      * 
153      * @param jobId to be drooped job id
154      */
155     public void drop(final String jobId) {
156         PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(jobId);
157         PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId), null);
158         PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getJob().delete(jobId);
159     }
160     
161     /**
162      * Get pipeline jobs info.
163      *
164      * @param contextKey context key
165      * @return jobs info
166      */
167     public List<PipelineJobInfo> getJobInfos(final PipelineContextKey contextKey) {
168         try {
169             return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream()
170                     .filter(each -> !each.getJobName().startsWith("_") && jobType.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()))
171                     .map(each -> jobType.getJobInfo(each.getJobName())).collect(Collectors.toList());
172         } catch (final UnsupportedOperationException ex) {
173             return Collections.emptyList();
174         }
175     }
176 }