View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.listener;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
22  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
23  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
24  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
25  import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
26  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
27  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
28  import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
29  import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNodeWatcher;
30  import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
31  import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
32  import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
33  import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
34  import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
35  import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
36  import org.apache.shardingsphere.mode.manager.ContextManager;
37  import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
38  import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListenerModeRequired;
39  
40  import java.util.List;
41  import java.util.stream.Collectors;
42  
43  /**
44   * Pipeline context manager lifecycle listener.
45   */
46  @ContextManagerLifecycleListenerModeRequired("Cluster")
47  @Slf4j
48  public final class PipelineContextManagerLifecycleListener implements ContextManagerLifecycleListener {
49      
50      @Override
51      public void onInitialized(final ContextManager contextManager) {
52          String preSelectedDatabaseName = contextManager.getPreSelectedDatabaseName();
53          if (DefaultDatabase.LOGIC_NAME.equals(preSelectedDatabaseName)) {
54              return;
55          }
56          PipelineContextKey contextKey = new PipelineContextKey(preSelectedDatabaseName, contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType());
57          PipelineContextManager.putContext(contextKey, new PipelineContext(contextManager.getComputeNodeInstanceContext().getModeConfiguration(), contextManager));
58          PipelineMetaDataNodeWatcher.getInstance(contextKey);
59          ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
60          try {
61              dispatchEnablePipelineJobStartEvent(contextKey);
62              // CHECKSTYLE:OFF
63          } catch (final RuntimeException ex) {
64              // CHECKSTYLE:ON
65              log.error("Dispatch enable pipeline job start event failed", ex);
66          }
67      }
68      
69      private void dispatchEnablePipelineJobStartEvent(final PipelineContextKey contextKey) {
70          JobConfigurationAPI jobConfigAPI = PipelineAPIFactory.getJobConfigurationAPI(contextKey);
71          List<JobBriefInfo> allJobsBriefInfo = PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo()
72                  .stream().filter(each -> !each.getJobName().startsWith("_")).collect(Collectors.toList());
73          log.info("All job names: {}", allJobsBriefInfo.stream().map(JobBriefInfo::getJobName).collect(Collectors.joining(",")));
74          for (JobBriefInfo each : allJobsBriefInfo) {
75              PipelineJobType jobType;
76              try {
77                  jobType = PipelineJobIdUtils.parseJobType(each.getJobName());
78              } catch (final IllegalArgumentException ex) {
79                  log.warn("Parse job type failed, job name: {}, error: {}", each.getJobName(), ex.getMessage());
80                  continue;
81              }
82              if ("CONSISTENCY_CHECK".equals(jobType.getCode())) {
83                  continue;
84              }
85              JobConfigurationPOJO jobConfig;
86              try {
87                  jobConfig = jobConfigAPI.getJobConfiguration(each.getJobName());
88              } catch (final PipelineJobNotFoundException ex) {
89                  log.error("Get job configuration failed, job name: {}, error: {}", each.getJobName(), ex.getMessage());
90                  continue;
91              }
92              if (jobConfig.isDisabled()) {
93                  continue;
94              }
95              new PipelineJobManager(jobType).resume(each.getJobName());
96              log.info("Dispatch enable pipeline job start event, job name: {}", each.getJobName());
97          }
98      }
99      
100     @Override
101     public void onDestroyed(final ContextManager contextManager) {
102         PipelineContextManager.removeContext(
103                 new PipelineContextKey(contextManager.getPreSelectedDatabaseName(), contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()));
104     }
105 }