1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc;
19
20 import lombok.RequiredArgsConstructor;
21 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
22 import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
23 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
24 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
25 import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
26 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
27 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
28 import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
29 import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
30 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
31 import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
32 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
33
34 import java.sql.SQLException;
35 import java.sql.Statement;
36 import java.util.Collection;
37 import java.util.LinkedList;
38 import java.util.Optional;
39
40
41
42
43
44
45 @HighFrequencyInvocation
46 @RequiredArgsConstructor
47 public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCExecutionUnit, T> {
48
49 private final DatabaseType protocolType;
50
51 private final ResourceMetaData resourceMetaData;
52
53 private final SQLStatement sqlStatement;
54
55 private final boolean isExceptionThrown;
56
57 private final ProcessEngine processEngine = new ProcessEngine();
58
59 @Override
60 public final Collection<T> execute(final Collection<JDBCExecutionUnit> executionUnits, final boolean isTrunkThread, final String processId) throws SQLException {
61
62 Collection<T> result = new LinkedList<>();
63 for (JDBCExecutionUnit each : executionUnits) {
64 T executeResult = execute(each, isTrunkThread, processId);
65 if (null != executeResult) {
66 result.add(executeResult);
67 }
68 }
69 return result;
70 }
71
72
73
74
75
76
77 private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread, final String processId) throws SQLException {
78 SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
79 DatabaseType storageType = resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getStorageType();
80 ConnectionProperties connectionProps = resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getConnectionProperties();
81 SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
82 try {
83 SQLUnit sqlUnit = jdbcExecutionUnit.getExecutionUnit().getSqlUnit();
84 sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), connectionProps, isTrunkThread);
85 T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
86 sqlExecutionHook.finishSuccess();
87 processEngine.completeSQLUnitExecution(jdbcExecutionUnit, processId);
88 return result;
89 } catch (final SQLException ex) {
90 if (!storageType.equals(protocolType)) {
91 Optional<T> saneResult = getSaneResult(sqlStatement, ex);
92 if (saneResult.isPresent()) {
93 return isTrunkThread ? saneResult.get() : null;
94 }
95 }
96 sqlExecutionHook.finishFailure(ex);
97 SQLExecutorExceptionHandler.handleException(ex);
98 return null;
99 }
100 }
101
102 protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode, DatabaseType storageType) throws SQLException;
103
104 protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement, SQLException ex);
105 }