View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.infra.executor.kernel;
19  
20  import lombok.Getter;
21  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
22  import org.apache.shardingsphere.infra.exception.generic.UnknownSQLException;
23  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
24  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
25  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
26  import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
27  
28  import java.sql.SQLException;
29  import java.util.Collection;
30  import java.util.Collections;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.List;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.Future;
36  
37  /**
38   * Executor engine.
39   */
40  @HighFrequencyInvocation
41  @Getter
42  public final class ExecutorEngine implements AutoCloseable {
43      
44      private final ExecutorServiceManager executorServiceManager;
45      
46      private ExecutorEngine(final int executorSize) {
47          executorServiceManager = new ExecutorServiceManager(executorSize);
48      }
49      
50      /**
51       * Create executor engine with executor size.
52       *
53       * @param executorSize executor size
54       * @return created executor engine
55       */
56      public static ExecutorEngine createExecutorEngineWithSize(final int executorSize) {
57          return new ExecutorEngine(executorSize);
58      }
59      
60      /**
61       * Execute.
62       *
63       * @param executionGroupContext execution group context
64       * @param firstCallback first executor callback
65       * @param callback other executor callback
66       * @param serial whether using multi thread execute or not
67       * @param <I> type of input value
68       * @param <O> type of return value
69       * @return execute result
70       * @throws SQLException throw if execute failure
71       */
72      public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext,
73                                    final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback, final boolean serial) throws SQLException {
74          if (executionGroupContext.getInputGroups().isEmpty()) {
75              return Collections.emptyList();
76          }
77          return serial ? serialExecute(executionGroupContext.getInputGroups().iterator(), executionGroupContext.getReportContext().getProcessId(), firstCallback, callback)
78                  : parallelExecute(executionGroupContext.getInputGroups().iterator(), executionGroupContext.getReportContext().getProcessId(), firstCallback, callback);
79      }
80      
81      private <I, O> List<O> serialExecute(final Iterator<ExecutionGroup<I>> executionGroups, final String processId, final ExecutorCallback<I, O> firstCallback,
82                                           final ExecutorCallback<I, O> callback) throws SQLException {
83          ExecutionGroup<I> firstInputs = executionGroups.next();
84          List<O> result = new LinkedList<>(syncExecute(firstInputs, processId, null == firstCallback ? callback : firstCallback));
85          while (executionGroups.hasNext()) {
86              result.addAll(syncExecute(executionGroups.next(), processId, callback));
87          }
88          return result;
89      }
90      
91      private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> executionGroups, final String processId, final ExecutorCallback<I, O> firstCallback,
92                                             final ExecutorCallback<I, O> callback) throws SQLException {
93          ExecutionGroup<I> firstInputs = executionGroups.next();
94          Collection<Future<Collection<O>>> restResultFutures = asyncExecute(executionGroups, processId, callback);
95          return getGroupResults(syncExecute(firstInputs, processId, null == firstCallback ? callback : firstCallback), restResultFutures);
96      }
97      
98      private <I, O> Collection<O> syncExecute(final ExecutionGroup<I> executionGroup, final String processId, final ExecutorCallback<I, O> callback) throws SQLException {
99          return callback.execute(executionGroup.getInputs(), true, processId);
100     }
101     
102     private <I, O> Collection<Future<Collection<O>>> asyncExecute(final Iterator<ExecutionGroup<I>> executionGroups, final String processId, final ExecutorCallback<I, O> callback) {
103         Collection<Future<Collection<O>>> result = new LinkedList<>();
104         while (executionGroups.hasNext()) {
105             result.add(asyncExecute(executionGroups.next(), processId, callback));
106         }
107         return result;
108     }
109     
110     private <I, O> Future<Collection<O>> asyncExecute(final ExecutionGroup<I> executionGroup, final String processId, final ExecutorCallback<I, O> callback) {
111         return executorServiceManager.getExecutorService().submit(() -> callback.execute(executionGroup.getInputs(), false, processId));
112     }
113     
114     private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<Future<Collection<O>>> restFutures) throws SQLException {
115         List<O> result = new LinkedList<>(firstResults);
116         for (Future<Collection<O>> each : restFutures) {
117             try {
118                 result.addAll(each.get());
119             } catch (final InterruptedException ex) {
120                 Thread.currentThread().interrupt();
121             } catch (final ExecutionException ex) {
122                 return throwException(ex);
123             }
124         }
125         return result;
126     }
127     
128     private <O> List<O> throwException(final Exception exception) throws SQLException {
129         if (exception.getCause() instanceof SQLException) {
130             throw (SQLException) exception.getCause();
131         }
132         throw new UnknownSQLException(exception);
133     }
134     
135     @Override
136     public void close() {
137         executorServiceManager.close();
138     }
139 }