1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.execute;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
22
23 import java.sql.SQLException;
24 import java.time.Instant;
25 import java.time.LocalDateTime;
26 import java.time.ZoneId;
27 import java.util.concurrent.atomic.AtomicReference;
28
29
30
31
32 @Slf4j
33 public abstract class AbstractPipelineLifecycleRunnable implements PipelineLifecycleRunnable {
34
35 private final AtomicReference<Boolean> running = new AtomicReference<>(null);
36
37 private volatile long startTimeMillis;
38
39 protected boolean isRunning() {
40 Boolean running = this.running.get();
41 return null != running && running;
42 }
43
44 @Override
45 public final void start() {
46 if (null != running.get() || !running.compareAndSet(null, true)) {
47 return;
48 }
49 startTimeMillis = System.currentTimeMillis();
50 runBlocking();
51 }
52
53 protected abstract void runBlocking();
54
55 @Override
56 public final void stop() {
57 Boolean running = this.running.get();
58 if (null == running) {
59 this.running.set(false);
60 return;
61 }
62 if (!running) {
63 return;
64 }
65 LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(startTimeMillis), ZoneId.systemDefault());
66 log.info("stop lifecycle executor {}, startTime={}, cost {} ms", this, startTime.format(DateTimeFormatterFactory.getStandardFormatter()), System.currentTimeMillis() - startTimeMillis);
67 try {
68 doStop();
69
70 } catch (final SQLException | RuntimeException ex) {
71
72 log.warn("doStop failed", ex);
73 }
74 this.running.set(false);
75 }
76
77 protected abstract void doStop() throws SQLException;
78
79 @Override
80 public final void run() {
81 start();
82 }
83 }