View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.execute;
19  
20  import lombok.AccessLevel;
21  import lombok.RequiredArgsConstructor;
22  import lombok.SneakyThrows;
23  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
24  import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
25  
26  import java.util.Collection;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.function.BiConsumer;
34  
35  /**
36   * Executor engine.
37   */
38  @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
39  public final class ExecuteEngine {
40      
41      private static final String THREAD_PREFIX = "pipeline-";
42      
43      private static final String THREAD_SUFFIX = "-%d";
44      
45      private static final ExecutorService CALLBACK_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build(THREAD_PREFIX + "callback" + THREAD_SUFFIX));
46      
47      private final ExecutorService executorService;
48      
49      /**
50       * Create task execute engine instance with cached thread pool.
51       *
52       * @param threadName thread name
53       * @return task execute engine instance
54       */
55      public static ExecuteEngine newCachedThreadInstance(final String threadName) {
56          String threadNameFormat = THREAD_PREFIX + threadName + THREAD_SUFFIX;
57          return new ExecuteEngine(Executors.newCachedThreadPool(ExecutorThreadFactoryBuilder.build(threadNameFormat)));
58      }
59      
60      /**
61       * Create task execute engine instance with fixed thread pool.
62       *
63       * @param threadNumber thread number
64       * @param threadName thread name
65       * @return task execute engine instance
66       */
67      public static ExecuteEngine newFixedThreadInstance(final int threadNumber, final String threadName) {
68          String threadNameFormat = THREAD_PREFIX + threadName + THREAD_SUFFIX;
69          return new ExecuteEngine(Executors.newFixedThreadPool(threadNumber, ExecutorThreadFactoryBuilder.build(threadNameFormat)));
70      }
71      
72      /**
73       * Submit a {@code LifecycleExecutor} with callback {@code ExecuteCallback} to execute.
74       *
75       * @param pipelineLifecycleRunnable lifecycle executor
76       * @param executeCallback execute callback
77       * @return execute future
78       */
79      public CompletableFuture<?> submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable, final ExecuteCallback executeCallback) {
80          return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService).whenCompleteAsync((unused, throwable) -> {
81              if (null == throwable) {
82                  executeCallback.onSuccess();
83              } else {
84                  Throwable cause = throwable.getCause();
85                  executeCallback.onFailure(null != cause ? cause : throwable);
86              }
87          }, CALLBACK_EXECUTOR);
88      }
89      
90      /**
91       * Submit a {@code LifecycleExecutor} to execute.
92       *
93       * @param pipelineLifecycleRunnable lifecycle executor
94       * @return execute future
95       */
96      public CompletableFuture<?> submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable) {
97          return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService);
98      }
99      
100     /**
101      * Shutdown.
102      */
103     public void shutdown() {
104         if (executorService.isShutdown()) {
105             return;
106         }
107         executorService.shutdown();
108         executorService.shutdownNow();
109     }
110     
111     /**
112      * Trigger.
113      *
114      * @param futures futures
115      * @param executeCallback execute callback on all the futures
116      * @throws PipelineInternalException if there's underlying execution exception
117      */
118     @SneakyThrows(InterruptedException.class)
119     public static void trigger(final Collection<CompletableFuture<?>> futures, final ExecuteCallback executeCallback) {
120         BlockingQueue<CompletableFuture<?>> futureQueue = new LinkedBlockingQueue<>();
121         for (CompletableFuture<?> each : futures) {
122             each.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
123                 
124                 @SneakyThrows(InterruptedException.class)
125                 @Override
126                 public void accept(final Object unused, final Throwable throwable) {
127                     futureQueue.put(each);
128                 }
129             }, CALLBACK_EXECUTOR);
130         }
131         for (int i = 1, count = futures.size(); i <= count; i++) {
132             CompletableFuture<?> future = futureQueue.take();
133             try {
134                 future.get();
135             } catch (final ExecutionException ex) {
136                 Throwable cause = ex.getCause();
137                 executeCallback.onFailure(null != cause ? cause : ex);
138                 throw new PipelineInternalException(ex);
139             }
140         }
141         executeCallback.onSuccess();
142     }
143 }