1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.infra.executor.kernel;
19
20 import lombok.Getter;
21 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
22 import org.apache.shardingsphere.infra.exception.generic.UnknownSQLException;
23 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
24 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
25 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
26 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
27
28 import java.sql.SQLException;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.Future;
36
37
38
39
40 @HighFrequencyInvocation
41 @Getter
42 public final class ExecutorEngine implements AutoCloseable {
43
44 private final ExecutorServiceManager executorServiceManager;
45
46 private ExecutorEngine(final int executorSize) {
47 executorServiceManager = new ExecutorServiceManager(executorSize);
48 }
49
50
51
52
53
54
55
56 public static ExecutorEngine createExecutorEngineWithSize(final int executorSize) {
57 return new ExecutorEngine(executorSize);
58 }
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext,
73 final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback, final boolean serial) throws SQLException {
74 if (executionGroupContext.getInputGroups().isEmpty()) {
75 return Collections.emptyList();
76 }
77 return serial ? serialExecute(executionGroupContext.getInputGroups().iterator(), executionGroupContext.getReportContext().getProcessId(), firstCallback, callback)
78 : parallelExecute(executionGroupContext.getInputGroups().iterator(), executionGroupContext.getReportContext().getProcessId(), firstCallback, callback);
79 }
80
81 private <I, O> List<O> serialExecute(final Iterator<ExecutionGroup<I>> executionGroups, final String processId, final ExecutorCallback<I, O> firstCallback,
82 final ExecutorCallback<I, O> callback) throws SQLException {
83 ExecutionGroup<I> firstInputs = executionGroups.next();
84 List<O> result = new LinkedList<>(syncExecute(firstInputs, processId, null == firstCallback ? callback : firstCallback));
85 while (executionGroups.hasNext()) {
86 result.addAll(syncExecute(executionGroups.next(), processId, callback));
87 }
88 return result;
89 }
90
91 private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> executionGroups, final String processId, final ExecutorCallback<I, O> firstCallback,
92 final ExecutorCallback<I, O> callback) throws SQLException {
93 ExecutionGroup<I> firstInputs = executionGroups.next();
94 Collection<Future<Collection<O>>> restResultFutures = asyncExecute(executionGroups, processId, callback);
95 return getGroupResults(syncExecute(firstInputs, processId, null == firstCallback ? callback : firstCallback), restResultFutures);
96 }
97
98 private <I, O> Collection<O> syncExecute(final ExecutionGroup<I> executionGroup, final String processId, final ExecutorCallback<I, O> callback) throws SQLException {
99 return callback.execute(executionGroup.getInputs(), true, processId);
100 }
101
102 private <I, O> Collection<Future<Collection<O>>> asyncExecute(final Iterator<ExecutionGroup<I>> executionGroups, final String processId, final ExecutorCallback<I, O> callback) {
103 Collection<Future<Collection<O>>> result = new LinkedList<>();
104 while (executionGroups.hasNext()) {
105 result.add(asyncExecute(executionGroups.next(), processId, callback));
106 }
107 return result;
108 }
109
110 private <I, O> Future<Collection<O>> asyncExecute(final ExecutionGroup<I> executionGroup, final String processId, final ExecutorCallback<I, O> callback) {
111 return executorServiceManager.getExecutorService().submit(() -> callback.execute(executionGroup.getInputs(), false, processId));
112 }
113
114 private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<Future<Collection<O>>> restFutures) throws SQLException {
115 List<O> result = new LinkedList<>(firstResults);
116 for (Future<Collection<O>> each : restFutures) {
117 try {
118 result.addAll(each.get());
119 } catch (final InterruptedException ex) {
120 Thread.currentThread().interrupt();
121 } catch (final ExecutionException ex) {
122 return throwException(ex);
123 }
124 }
125 return result;
126 }
127
128 private <O> List<O> throwException(final Exception exception) throws SQLException {
129 if (exception.getCause() instanceof SQLException) {
130 throw (SQLException) exception.getCause();
131 }
132 throw new UnknownSQLException(exception);
133 }
134
135 @Override
136 public void close() {
137 executorServiceManager.close();
138 }
139 }