1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.backend.connector;
19
20 import com.google.common.base.Strings;
21 import lombok.Getter;
22 import org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
23 import org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.transaction.DialectTransactionOption;
24 import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
25 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
26 import org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
27 import org.apache.shardingsphere.database.exception.core.exception.transaction.TableModifyInTransactionException;
28 import org.apache.shardingsphere.infra.binder.context.segment.table.TablesContext;
29 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
30 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
31 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
32 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
33 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
34 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
35 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
36 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
37 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
38 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
39 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
40 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
41 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
42 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
43 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
44 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
45 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
46 import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
47 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
48 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
49 import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
50 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
51 import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
52 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
53 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
54 import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.ProxyJDBCExecutor;
55 import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
56 import org.apache.shardingsphere.proxy.backend.connector.sane.DialectSaneQueryResultEngine;
57 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
58 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
59 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
60 import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
61 import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
62 import org.apache.shardingsphere.sql.parser.statement.core.enums.TransactionIsolationLevel;
63 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
64 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CloseStatement;
65 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CursorStatement;
66 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.DDLStatement;
67 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.FetchStatement;
68 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.MoveStatement;
69 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.TruncateStatement;
70 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
71 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
72 import org.apache.shardingsphere.transaction.api.TransactionType;
73 import org.apache.shardingsphere.transaction.spi.TransactionHook;
74
75 import java.sql.Connection;
76 import java.sql.SQLException;
77 import java.util.Collection;
78 import java.util.Collections;
79 import java.util.List;
80 import java.util.Map;
81 import java.util.Map.Entry;
82 import java.util.Optional;
83
84
85
86
87 public final class ProxySQLExecutor {
88
89 private final JDBCDriverType type;
90
91 private final ProxyDatabaseConnectionManager databaseConnectionManager;
92
93 private final ProxyJDBCExecutor regularExecutor;
94
95 private final RawExecutor rawExecutor;
96
97 @Getter
98 private final SQLFederationEngine sqlFederationEngine;
99
100 @SuppressWarnings("rawtypes")
101 private final Map<ShardingSphereRule, TransactionHook> transactionHooks = OrderedSPILoader.getServices(
102 TransactionHook.class, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules());
103
104 public ProxySQLExecutor(final JDBCDriverType type,
105 final ProxyDatabaseConnectionManager databaseConnectionManager, final DatabaseProxyConnector databaseProxyConnector, final SQLStatementContext sqlStatementContext) {
106 this.type = type;
107 this.databaseConnectionManager = databaseConnectionManager;
108 ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
109 ConnectionContext connectionContext = databaseConnectionManager.getConnectionSession().getConnectionContext();
110 JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connectionContext);
111 regularExecutor = new ProxyJDBCExecutor(type, databaseConnectionManager.getConnectionSession(), databaseProxyConnector, jdbcExecutor);
112 rawExecutor = new RawExecutor(executorEngine, connectionContext);
113 MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
114 String currentDatabaseName = Strings.isNullOrEmpty(databaseConnectionManager.getConnectionSession().getCurrentDatabaseName())
115 ? databaseConnectionManager.getConnectionSession().getUsedDatabaseName()
116 : databaseConnectionManager.getConnectionSession().getCurrentDatabaseName();
117 String currentSchemaName = getSchemaName(sqlStatementContext, metaDataContexts.getMetaData().getDatabase(currentDatabaseName));
118 sqlFederationEngine = new SQLFederationEngine(currentDatabaseName, currentSchemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
119 }
120
121 private String getSchemaName(final SQLStatementContext sqlStatementContext, final ShardingSphereDatabase database) {
122 String defaultSchemaName = new DatabaseTypeRegistry(sqlStatementContext.getSqlStatement().getDatabaseType()).getDefaultSchemaName(database.getName());
123 return sqlStatementContext.getTablesContext().getSchemaName().orElse(defaultSchemaName);
124 }
125
126
127
128
129
130
131 public void checkExecutePrerequisites(final SQLStatementContext sqlStatementContext) {
132 ShardingSpherePreconditions.checkState(
133 isValidExecutePrerequisites(sqlStatementContext.getSqlStatement()), () -> new TableModifyInTransactionException(getTableName(sqlStatementContext.getTablesContext())));
134 }
135
136 private boolean isValidExecutePrerequisites(final SQLStatement sqlStatement) {
137 return !(sqlStatement instanceof DDLStatement) || isSupportDDLInTransaction(sqlStatement.getDatabaseType(), (DDLStatement) sqlStatement);
138 }
139
140 private boolean isSupportDDLInTransaction(final DatabaseType databaseType, final DDLStatement sqlStatement) {
141 DialectTransactionOption transactionOption = new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getTransactionOption();
142 boolean isDDLWithoutMetaDataChanged = isDDLWithoutMetaDataChanged(sqlStatement);
143 if (isInXATransaction()) {
144 return transactionOption.isSupportDDLInXATransaction() && (isDDLWithoutMetaDataChanged || transactionOption.isSupportMetaDataRefreshInTransaction());
145 }
146 if (isInLocalTransaction()) {
147 return transactionOption.isSupportMetaDataRefreshInTransaction() || isDDLWithoutMetaDataChanged;
148 }
149 return true;
150 }
151
152 private boolean isInXATransaction() {
153 TransactionType transactionType = TransactionUtils.getTransactionType(databaseConnectionManager.getConnectionSession().getConnectionContext().getTransactionContext());
154 TransactionStatus transactionStatus = databaseConnectionManager.getConnectionSession().getTransactionStatus();
155 return TransactionType.XA == transactionType && transactionStatus.isInTransaction();
156 }
157
158 private boolean isInLocalTransaction() {
159 return databaseConnectionManager.getConnectionSession().getTransactionStatus().isInTransaction();
160 }
161
162
163 private boolean isDDLWithoutMetaDataChanged(final DDLStatement sqlStatement) {
164 return isCursorStatement(sqlStatement) || sqlStatement instanceof TruncateStatement;
165 }
166
167 private boolean isCursorStatement(final DDLStatement sqlStatement) {
168 return sqlStatement instanceof CursorStatement || sqlStatement instanceof CloseStatement || sqlStatement instanceof MoveStatement || sqlStatement instanceof FetchStatement;
169 }
170
171 private String getTableName(final TablesContext tablesContext) {
172 return tablesContext.getSimpleTables().isEmpty() ? "unknown_table" : tablesContext.getSimpleTables().iterator().next().getTableName().getIdentifier().getValue();
173 }
174
175
176
177
178
179
180
181
182 public List<ExecuteResult> execute(final ExecutionContext executionContext) throws SQLException {
183 String databaseName = databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
184 Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getRuleMetaData().getRules();
185 int maxConnectionsSizePerQuery = ProxyContext.getInstance()
186 .getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
187 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(executionContext.getSqlStatementContext().getSqlStatement().getDatabaseType()).getDialectDatabaseMetaData();
188 boolean isReturnGeneratedKeys = executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement && dialectDatabaseMetaData.getGeneratedKeyOption().isPresent();
189 return hasRawExecutionRule(rules)
190 ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery)
191 : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
192 }
193
194 private boolean hasRawExecutionRule(final Collection<ShardingSphereRule> rules) {
195 for (ShardingSphereRule each : rules) {
196 if (each.getAttributes().findAttribute(RawExecutionRuleAttribute.class).isPresent()) {
197 return true;
198 }
199 }
200 return false;
201 }
202
203 private List<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
204 RawExecutionPrepareEngine prepareEngine = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
205 ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext;
206 try {
207 String databaseName = databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
208 executionGroupContext = prepareEngine.prepare(databaseName, executionContext, executionContext.getExecutionUnits(),
209 new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
210 databaseName, databaseConnectionManager.getConnectionSession().getConnectionContext().getGrantee()));
211 } catch (final SQLException ex) {
212 return getSaneExecuteResults(executionContext, ex);
213 }
214
215 return rawExecutor.execute(executionGroupContext, executionContext.getQueryContext(), new RawSQLExecutorCallback());
216 }
217
218 private List<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
219 final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
220 JDBCBackendStatement statementManager = (JDBCBackendStatement) databaseConnectionManager.getConnectionSession().getStatementManager();
221 String databaseName = databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
222 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
223 type, maxConnectionsSizePerQuery, databaseConnectionManager, statementManager, new StatementOption(isReturnGeneratedKeys), rules,
224 ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData());
225 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
226 try {
227 executionGroupContext = prepareEngine.prepare(databaseName, executionContext, executionContext.getExecutionUnits(),
228 new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
229 databaseName, databaseConnectionManager.getConnectionSession().getConnectionContext().getGrantee()));
230 } catch (final SQLException ex) {
231 return getSaneExecuteResults(executionContext, ex);
232 }
233 executeTransactionHooksBeforeExecuteSQL(databaseConnectionManager.getConnectionSession());
234 return regularExecutor.execute(executionContext.getQueryContext(), executionGroupContext, isReturnGeneratedKeys, isExceptionThrown);
235 }
236
237 @SuppressWarnings({"unchecked", "rawtypes"})
238 private void executeTransactionHooksBeforeExecuteSQL(final ConnectionSession connectionSession) throws SQLException {
239 if (!getTransactionContext(connectionSession).isInTransaction()) {
240 return;
241 }
242 for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
243 entry.getValue().beforeExecuteSQL(entry.getKey(), ProxyContext.getInstance().getContextManager().getDatabaseType(),
244 connectionSession.getDatabaseConnectionManager().getCachedConnections().values(), getTransactionContext(connectionSession),
245 connectionSession.getIsolationLevel().orElse(TransactionIsolationLevel.READ_COMMITTED));
246 }
247 }
248
249 private TransactionConnectionContext getTransactionContext(final ConnectionSession connectionSession) {
250 return connectionSession.getDatabaseConnectionManager().getConnectionSession().getConnectionContext().getTransactionContext();
251 }
252
253 private List<ExecuteResult> getSaneExecuteResults(final ExecutionContext executionContext, final SQLException originalException) throws SQLException {
254 DatabaseType databaseType = ProxyContext.getInstance().getContextManager().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName()).getProtocolType();
255 Optional<ExecuteResult> executeResult = DatabaseTypedSPILoader.findService(DialectSaneQueryResultEngine.class, databaseType)
256 .flatMap(optional -> optional.getSaneQueryResult(executionContext.getSqlStatementContext().getSqlStatement(), originalException));
257 return executeResult.map(Collections::singletonList).orElseThrow(() -> originalException);
258 }
259 }