1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.execute;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
22
23 import java.time.Instant;
24 import java.time.LocalDateTime;
25 import java.time.ZoneId;
26 import java.util.concurrent.atomic.AtomicReference;
27
28
29
30
31 @Slf4j
32 public abstract class AbstractPipelineLifecycleRunnable implements PipelineLifecycleRunnable {
33
34 private final AtomicReference<Boolean> running = new AtomicReference<>(null);
35
36 private volatile long startTimeMillis;
37
38 protected boolean isRunning() {
39 Boolean running = this.running.get();
40 return null != running && running;
41 }
42
43 @Override
44 public final void start() {
45 if (null != running.get() || !running.compareAndSet(null, true)) {
46 return;
47 }
48 startTimeMillis = System.currentTimeMillis();
49 runBlocking();
50 }
51
52 protected abstract void runBlocking();
53
54 @Override
55 public final void stop() {
56 Boolean running = this.running.get();
57 if (null == running) {
58 this.running.set(false);
59 return;
60 }
61 if (!running) {
62 return;
63 }
64 LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(startTimeMillis), ZoneId.systemDefault());
65 log.info("stop lifecycle executor {}, startTime={}, cost {} ms", this, startTime.format(DateTimeFormatterFactory.getDatetimeFormatter()), System.currentTimeMillis() - startTimeMillis);
66 try {
67 doStop();
68
69 } catch (final RuntimeException ex) {
70
71 log.warn("doStop failed", ex);
72 }
73 this.running.set(false);
74 }
75
76 protected abstract void doStop();
77
78 @Override
79 public final void run() {
80 start();
81 }
82 }