1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.listener;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
22 import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
23
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26
27
28
29
30 @Slf4j
31 public final class PipelineElasticJobListener implements ElasticJobListener {
32
33
34 private static final Map<String, Long> RUNNING_JOBS = new ConcurrentHashMap<>();
35
36 @Override
37 public void beforeJobExecuted(final ShardingContexts shardingContexts) {
38 if (RUNNING_JOBS.containsKey(shardingContexts.getJobName())) {
39 log.warn("{} already exists", shardingContexts.getJobName());
40 }
41 RUNNING_JOBS.put(shardingContexts.getJobName(), System.currentTimeMillis());
42 }
43
44 @Override
45 public void afterJobExecuted(final ShardingContexts shardingContexts) {
46 log.info("After {} job execute ", shardingContexts.getJobName());
47 RUNNING_JOBS.remove(shardingContexts.getJobName());
48 }
49
50
51
52
53
54
55
56 public boolean isJobRunning(final String jobId) {
57 return RUNNING_JOBS.containsKey(jobId);
58 }
59
60 @Override
61 public String getType() {
62 return PipelineElasticJobListener.class.getName();
63 }
64 }