1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.listener;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
22 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
23 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
24 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
25 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
26 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
27 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
28 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
29 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNodeWatcher;
30 import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
31 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
32 import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
33 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
34 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
35 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
36 import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
37 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
38 import org.apache.shardingsphere.mode.manager.ContextManager;
39 import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
40
41 import java.util.List;
42 import java.util.stream.Collectors;
43
44
45
46
47 @Slf4j
48 public final class PipelineContextManagerLifecycleListener implements ContextManagerLifecycleListener {
49
50 @Override
51 public void onInitialized(final String databaseName, final ContextManager contextManager) {
52 ModeConfiguration modeConfig = contextManager.getInstanceContext().getModeConfiguration();
53 if (!contextManager.getInstanceContext().isCluster()) {
54 log.info("mode type is not Cluster, mode type='{}', ignore", modeConfig.getType());
55 return;
56 }
57
58 if (DefaultDatabase.LOGIC_NAME.equals(databaseName)) {
59 return;
60 }
61 PipelineContextKey contextKey = new PipelineContextKey(databaseName, contextManager.getInstanceContext().getInstance().getMetaData().getType());
62 PipelineContextManager.putContext(contextKey, new PipelineContext(modeConfig, contextManager));
63 PipelineMetaDataNodeWatcher.getInstance(contextKey);
64 ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
65 try {
66 dispatchEnablePipelineJobStartEvent(contextKey);
67
68 } catch (final RuntimeException ex) {
69
70 log.error("Dispatch enable pipeline job start event failed", ex);
71 }
72 }
73
74 private void dispatchEnablePipelineJobStartEvent(final PipelineContextKey contextKey) {
75 JobConfigurationAPI jobConfigAPI = PipelineAPIFactory.getJobConfigurationAPI(contextKey);
76 List<JobBriefInfo> allJobsBriefInfo = PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo()
77 .stream().filter(each -> !each.getJobName().startsWith("_")).collect(Collectors.toList());
78 log.info("All job names: {}", allJobsBriefInfo.stream().map(JobBriefInfo::getJobName).collect(Collectors.joining(",")));
79 for (JobBriefInfo each : allJobsBriefInfo) {
80 PipelineJobType jobType;
81 try {
82 jobType = PipelineJobIdUtils.parseJobType(each.getJobName());
83 } catch (final IllegalArgumentException ex) {
84 log.warn("Parse job type failed, job name: {}, error: {}", each.getJobName(), ex.getMessage());
85 continue;
86 }
87 if ("CONSISTENCY_CHECK".equals(jobType.getCode())) {
88 continue;
89 }
90 JobConfigurationPOJO jobConfig;
91 try {
92 jobConfig = jobConfigAPI.getJobConfiguration(each.getJobName());
93 } catch (final PipelineJobNotFoundException ex) {
94 log.error("Get job configuration failed, job name: {}, error: {}", each.getJobName(), ex.getMessage());
95 continue;
96 }
97 if (jobConfig.isDisabled()) {
98 continue;
99 }
100 new PipelineJobManager(jobType).resume(each.getJobName());
101 log.info("Dispatch enable pipeline job start event, job name: {}", each.getJobName());
102 }
103 }
104
105 @Override
106 public void onDestroyed(final String databaseName, final InstanceType instanceType) {
107 PipelineContextManager.removeContext(new PipelineContextKey(databaseName, instanceType));
108 }
109 }