1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.listener;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
22 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
23 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
24 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
25 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
26 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
27 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
28 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
29 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNodeWatcher;
30 import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
31 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
32 import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
33 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
34 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
35 import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
36 import org.apache.shardingsphere.mode.manager.ContextManager;
37 import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
38 import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListenerModeRequired;
39
40 import java.util.List;
41 import java.util.stream.Collectors;
42
43
44
45
46 @ContextManagerLifecycleListenerModeRequired("Cluster")
47 @Slf4j
48 public final class PipelineContextManagerLifecycleListener implements ContextManagerLifecycleListener {
49
50 @Override
51 public void onInitialized(final ContextManager contextManager) {
52 String preSelectedDatabaseName = contextManager.getPreSelectedDatabaseName();
53 if (DefaultDatabase.LOGIC_NAME.equals(preSelectedDatabaseName)) {
54 return;
55 }
56 PipelineContextKey contextKey = new PipelineContextKey(preSelectedDatabaseName, contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType());
57 PipelineContextManager.putContext(contextKey, new PipelineContext(contextManager.getComputeNodeInstanceContext().getModeConfiguration(), contextManager));
58 PipelineMetaDataNodeWatcher.getInstance(contextKey);
59 ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
60 try {
61 dispatchEnablePipelineJobStartEvent(contextKey);
62
63 } catch (final RuntimeException ex) {
64
65 log.error("Dispatch enable pipeline job start event failed", ex);
66 }
67 }
68
69 private void dispatchEnablePipelineJobStartEvent(final PipelineContextKey contextKey) {
70 JobConfigurationAPI jobConfigAPI = PipelineAPIFactory.getJobConfigurationAPI(contextKey);
71 List<JobBriefInfo> allJobsBriefInfo = PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo()
72 .stream().filter(each -> !each.getJobName().startsWith("_")).collect(Collectors.toList());
73 log.info("All job names: {}", allJobsBriefInfo.stream().map(JobBriefInfo::getJobName).collect(Collectors.joining(",")));
74 for (JobBriefInfo each : allJobsBriefInfo) {
75 PipelineJobType jobType;
76 try {
77 jobType = PipelineJobIdUtils.parseJobType(each.getJobName());
78 } catch (final IllegalArgumentException ex) {
79 log.warn("Parse job type failed, job name: {}, error: {}", each.getJobName(), ex.getMessage());
80 continue;
81 }
82 if ("CONSISTENCY_CHECK".equals(jobType.getCode())) {
83 continue;
84 }
85 JobConfigurationPOJO jobConfig;
86 try {
87 jobConfig = jobConfigAPI.getJobConfiguration(each.getJobName());
88 } catch (final PipelineJobNotFoundException ex) {
89 log.error("Get job configuration failed, job name: {}, error: {}", each.getJobName(), ex.getMessage());
90 continue;
91 }
92 if (jobConfig.isDisabled()) {
93 continue;
94 }
95 new PipelineJobManager(jobType).resume(each.getJobName());
96 log.info("Dispatch enable pipeline job start event, job name: {}", each.getJobName());
97 }
98 }
99
100 @Override
101 public void onDestroyed(final ContextManager contextManager) {
102 PipelineContextManager.removeContext(
103 new PipelineContextKey(contextManager.getPreSelectedDatabaseName(), contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()));
104 }
105 }