1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.execute;
19
20 import lombok.AccessLevel;
21 import lombok.RequiredArgsConstructor;
22 import lombok.SneakyThrows;
23 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
24 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
25
26 import java.util.Collection;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.function.BiConsumer;
34
35
36
37
38 @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
39 public final class ExecuteEngine {
40
41 private static final String THREAD_PREFIX = "pipeline-";
42
43 private static final String THREAD_SUFFIX = "-%d";
44
45 private static final ExecutorService CALLBACK_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build(THREAD_PREFIX + "callback" + THREAD_SUFFIX));
46
47 private final ExecutorService executorService;
48
49
50
51
52
53
54
55 public static ExecuteEngine newCachedThreadInstance(final String threadName) {
56 String threadNameFormat = THREAD_PREFIX + threadName + THREAD_SUFFIX;
57 return new ExecuteEngine(Executors.newCachedThreadPool(ExecutorThreadFactoryBuilder.build(threadNameFormat)));
58 }
59
60
61
62
63
64
65
66
67 public static ExecuteEngine newFixedThreadInstance(final int threadNumber, final String threadName) {
68 String threadNameFormat = THREAD_PREFIX + threadName + THREAD_SUFFIX;
69 return new ExecuteEngine(Executors.newFixedThreadPool(threadNumber, ExecutorThreadFactoryBuilder.build(threadNameFormat)));
70 }
71
72
73
74
75
76
77
78
79 public CompletableFuture<?> submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable, final ExecuteCallback executeCallback) {
80 return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService).whenCompleteAsync((unused, throwable) -> {
81 if (null == throwable) {
82 executeCallback.onSuccess();
83 } else {
84 Throwable cause = throwable.getCause();
85 executeCallback.onFailure(null != cause ? cause : throwable);
86 }
87 }, CALLBACK_EXECUTOR);
88 }
89
90
91
92
93
94
95
96 public CompletableFuture<?> submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable) {
97 return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService);
98 }
99
100
101
102
103 public void shutdown() {
104 if (executorService.isShutdown()) {
105 return;
106 }
107 executorService.shutdown();
108 executorService.shutdownNow();
109 }
110
111
112
113
114
115
116
117
118 @SneakyThrows(InterruptedException.class)
119 public static void trigger(final Collection<CompletableFuture<?>> futures, final ExecuteCallback executeCallback) {
120 BlockingQueue<CompletableFuture<?>> futureQueue = new LinkedBlockingQueue<>();
121 for (CompletableFuture<?> each : futures) {
122 each.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
123
124 @SneakyThrows(InterruptedException.class)
125 @Override
126 public void accept(final Object unused, final Throwable throwable) {
127 futureQueue.put(each);
128 }
129 }, CALLBACK_EXECUTOR);
130 }
131 for (int i = 1, count = futures.size(); i <= count; i++) {
132 CompletableFuture<?> future = futureQueue.take();
133 try {
134 future.get();
135 } catch (final ExecutionException ex) {
136 Throwable cause = ex.getCause();
137 executeCallback.onFailure(null != cause ? cause : ex);
138 throw new PipelineInternalException(ex);
139 }
140 }
141 executeCallback.onSuccess();
142 }
143 }