1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc;
19
20 import lombok.RequiredArgsConstructor;
21 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
22 import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
23 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
24 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
25 import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
26 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
27 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
28 import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
29 import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
30 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
31 import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
32 import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
33 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
34
35 import java.sql.SQLException;
36 import java.sql.Statement;
37 import java.util.Collection;
38 import java.util.LinkedList;
39 import java.util.Optional;
40
41
42
43
44
45
46 @HighFrequencyInvocation
47 @RequiredArgsConstructor
48 public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCExecutionUnit, T> {
49
50 private final DatabaseType protocolType;
51
52 private final ResourceMetaData resourceMetaData;
53
54 private final SQLStatement sqlStatement;
55
56 private final boolean isExceptionThrown;
57
58 private final ProcessEngine processEngine = new ProcessEngine();
59
60 @Override
61 public final Collection<T> execute(final Collection<JDBCExecutionUnit> executionUnits, final boolean isTrunkThread, final String processId) throws SQLException {
62
63 Collection<T> result = new LinkedList<>();
64 for (JDBCExecutionUnit each : executionUnits) {
65 T executeResult = execute(each, isTrunkThread, processId);
66 if (null != executeResult) {
67 result.add(executeResult);
68 }
69 }
70 return result;
71 }
72
73
74
75
76
77
78 private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread, final String processId) throws SQLException {
79 SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
80 String dataSourceName = jdbcExecutionUnit.getExecutionUnit().getDataSourceName();
81
82 StorageUnit storageUnit = resourceMetaData.getStorageUnits().containsKey(dataSourceName)
83 ? resourceMetaData.getStorageUnits().get(dataSourceName)
84 : resourceMetaData.getStorageUnits().values().iterator().next();
85 DatabaseType storageType = storageUnit.getStorageType();
86 ConnectionProperties connectionProps = storageUnit.getConnectionProperties();
87 SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
88 try {
89 SQLUnit sqlUnit = jdbcExecutionUnit.getExecutionUnit().getSqlUnit();
90 sqlExecutionHook.start(dataSourceName, sqlUnit.getSql(), sqlUnit.getParameters(), connectionProps, isTrunkThread);
91 T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
92 sqlExecutionHook.finishSuccess();
93 processEngine.completeSQLUnitExecution(jdbcExecutionUnit, processId);
94 return result;
95 } catch (final SQLException ex) {
96 if (!storageType.equals(protocolType)) {
97 Optional<T> saneResult = getSaneResult(sqlStatement, ex);
98 if (saneResult.isPresent()) {
99 return isTrunkThread ? saneResult.get() : null;
100 }
101 }
102 sqlExecutionHook.finishFailure(ex);
103 SQLExecutorExceptionHandler.handleException(ex);
104 return null;
105 }
106 }
107
108 protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode, DatabaseType storageType) throws SQLException;
109
110 protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement, SQLException ex);
111 }