1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.backend.connector;
19
20 import lombok.Getter;
21 import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
22 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
23 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
24 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
25 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
26 import org.apache.shardingsphere.infra.exception.dialect.exception.transaction.TableModifyInTransactionException;
27 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
28 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
29 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
30 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
31 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
32 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
33 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
34 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
35 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
36 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
37 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
38 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
39 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
40 import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
41 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
42 import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
43 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
44 import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
45 import org.apache.shardingsphere.infra.session.query.QueryContext;
46 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
47 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
48 import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.ProxyJDBCExecutor;
49 import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
50 import org.apache.shardingsphere.proxy.backend.connector.sane.SaneQueryResultEngine;
51 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
52 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
53 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
54 import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
55 import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
56 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
57 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.CloseStatement;
58 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
59 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.FetchStatement;
60 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.MoveStatement;
61 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.TruncateStatement;
62 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
63 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.OpenGaussStatement;
64 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
65 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.PostgreSQLStatement;
66 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
67 import org.apache.shardingsphere.transaction.api.TransactionType;
68 import org.apache.shardingsphere.transaction.spi.TransactionHook;
69
70 import java.sql.Connection;
71 import java.sql.SQLException;
72 import java.util.Collection;
73 import java.util.Collections;
74 import java.util.List;
75 import java.util.Optional;
76
77
78
79
80 public final class ProxySQLExecutor {
81
82 private final String type;
83
84 private final ProxyDatabaseConnectionManager databaseConnectionManager;
85
86 private final ProxyJDBCExecutor regularExecutor;
87
88 private final RawExecutor rawExecutor;
89
90 @Getter
91 private final SQLFederationEngine sqlFederationEngine;
92
93 private final Collection<TransactionHook> transactionHooks = ShardingSphereServiceLoader.getServiceInstances(TransactionHook.class);
94
95 public ProxySQLExecutor(final String type, final ProxyDatabaseConnectionManager databaseConnectionManager, final DatabaseConnector databaseConnector, final QueryContext queryContext) {
96 this.type = type;
97 this.databaseConnectionManager = databaseConnectionManager;
98 ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
99 ConnectionContext connectionContext = databaseConnectionManager.getConnectionSession().getConnectionContext();
100 JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connectionContext);
101 regularExecutor = new ProxyJDBCExecutor(type, databaseConnectionManager.getConnectionSession(), databaseConnector, jdbcExecutor);
102 rawExecutor = new RawExecutor(executorEngine, connectionContext);
103 MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
104 String databaseName = databaseConnectionManager.getConnectionSession().getDatabaseName();
105 String schemaName = queryContext.getSqlStatementContext().getTablesContext().getSchemaName()
106 .orElseGet(() -> new DatabaseTypeRegistry(queryContext.getSqlStatementContext().getDatabaseType()).getDefaultSchemaName(databaseName));
107 sqlFederationEngine = new SQLFederationEngine(databaseName, schemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
108 }
109
110
111
112
113
114
115 public void checkExecutePrerequisites(final ExecutionContext executionContext) {
116 ShardingSpherePreconditions.checkState(isValidExecutePrerequisites(executionContext), () -> new TableModifyInTransactionException(getTableName(executionContext)));
117 }
118
119 private boolean isValidExecutePrerequisites(final ExecutionContext executionContext) {
120 return !isExecuteDDLInXATransaction(executionContext.getSqlStatementContext().getSqlStatement())
121 && !isExecuteDDLInPostgreSQLOpenGaussTransaction(executionContext.getSqlStatementContext().getSqlStatement());
122 }
123
124 private boolean isExecuteDDLInXATransaction(final SQLStatement sqlStatement) {
125 TransactionType transactionType = TransactionUtils.getTransactionType(databaseConnectionManager.getConnectionSession().getConnectionContext().getTransactionContext());
126 TransactionStatus transactionStatus = databaseConnectionManager.getConnectionSession().getTransactionStatus();
127 return TransactionType.XA == transactionType && transactionStatus.isInTransaction() && isUnsupportedDDLStatement(sqlStatement);
128 }
129
130 private boolean isExecuteDDLInPostgreSQLOpenGaussTransaction(final SQLStatement sqlStatement) {
131
132 boolean isPostgreSQLOpenGaussStatement = isPostgreSQLOrOpenGaussStatement(sqlStatement);
133 boolean isSupportedStatement = isSupportedSQLStatement(sqlStatement);
134 return sqlStatement instanceof DDLStatement
135 && !isSupportedStatement && isPostgreSQLOpenGaussStatement && databaseConnectionManager.getConnectionSession().getTransactionStatus().isInTransaction();
136 }
137
138 private boolean isSupportedSQLStatement(final SQLStatement sqlStatement) {
139 return isCursorStatement(sqlStatement) || sqlStatement instanceof TruncateStatement;
140 }
141
142 private boolean isCursorStatement(final SQLStatement sqlStatement) {
143 return sqlStatement instanceof OpenGaussCursorStatement
144 || sqlStatement instanceof CloseStatement || sqlStatement instanceof MoveStatement || sqlStatement instanceof FetchStatement;
145 }
146
147 private boolean isUnsupportedDDLStatement(final SQLStatement sqlStatement) {
148 if (isPostgreSQLOrOpenGaussStatement(sqlStatement) && isSupportedSQLStatement(sqlStatement)) {
149 return false;
150 }
151 return sqlStatement instanceof DDLStatement;
152 }
153
154 private boolean isPostgreSQLOrOpenGaussStatement(final SQLStatement sqlStatement) {
155 return sqlStatement instanceof PostgreSQLStatement || sqlStatement instanceof OpenGaussStatement;
156 }
157
158 private String getTableName(final ExecutionContext executionContext) {
159 return executionContext.getSqlStatementContext() instanceof TableAvailable && !((TableAvailable) executionContext.getSqlStatementContext()).getAllTables().isEmpty()
160 ? ((TableAvailable) executionContext.getSqlStatementContext()).getAllTables().iterator().next().getTableName().getIdentifier().getValue()
161 : "unknown_table";
162 }
163
164
165
166
167
168
169
170
171 public List<ExecuteResult> execute(final ExecutionContext executionContext) throws SQLException {
172 String databaseName = databaseConnectionManager.getConnectionSession().getDatabaseName();
173 Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getRuleMetaData().getRules();
174 int maxConnectionsSizePerQuery = ProxyContext.getInstance()
175 .getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
176 boolean isReturnGeneratedKeys = executionContext.getSqlStatementContext().getSqlStatement() instanceof MySQLInsertStatement;
177 return hasRawExecutionRule(rules) ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery)
178 : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
179 }
180
181 private boolean hasRawExecutionRule(final Collection<ShardingSphereRule> rules) {
182 for (ShardingSphereRule each : rules) {
183 if (each.getAttributes().findAttribute(RawExecutionRuleAttribute.class).isPresent()) {
184 return true;
185 }
186 }
187 return false;
188 }
189
190 private List<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
191 RawExecutionPrepareEngine prepareEngine = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
192 ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext;
193 try {
194 executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(),
195 new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
196 databaseConnectionManager.getConnectionSession().getDatabaseName(), databaseConnectionManager.getConnectionSession().getGrantee()));
197 } catch (final SQLException ex) {
198 return getSaneExecuteResults(executionContext, ex);
199 }
200
201 return rawExecutor.execute(executionGroupContext, executionContext.getQueryContext(), new RawSQLExecutorCallback());
202 }
203
204 private List<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
205 final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
206 JDBCBackendStatement statementManager = (JDBCBackendStatement) databaseConnectionManager.getConnectionSession().getStatementManager();
207 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
208 type, maxConnectionsSizePerQuery, databaseConnectionManager, statementManager, new StatementOption(isReturnGeneratedKeys), rules,
209 ProxyContext.getInstance().getContextManager().getDatabase(databaseConnectionManager.getConnectionSession().getDatabaseName()).getResourceMetaData().getStorageUnits());
210 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
211 try {
212 executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(),
213 new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
214 databaseConnectionManager.getConnectionSession().getDatabaseName(), databaseConnectionManager.getConnectionSession().getGrantee()));
215 } catch (final SQLException ex) {
216 return getSaneExecuteResults(executionContext, ex);
217 }
218 executeTransactionHooksBeforeExecuteSQL(databaseConnectionManager.getConnectionSession());
219 return regularExecutor.execute(executionContext.getQueryContext(), executionGroupContext, isReturnGeneratedKeys, isExceptionThrown);
220 }
221
222 private void executeTransactionHooksBeforeExecuteSQL(final ConnectionSession connectionSession) throws SQLException {
223 if (!getTransactionContext(connectionSession).isInTransaction()) {
224 return;
225 }
226 for (TransactionHook each : transactionHooks) {
227 each.beforeExecuteSQL(connectionSession.getDatabaseConnectionManager().getCachedConnections().values(), getTransactionContext(connectionSession), connectionSession.getIsolationLevel());
228 }
229 }
230
231 private TransactionConnectionContext getTransactionContext(final ConnectionSession connectionSession) {
232 return connectionSession.getDatabaseConnectionManager().getConnectionSession().getConnectionContext().getTransactionContext();
233 }
234
235 private List<ExecuteResult> getSaneExecuteResults(final ExecutionContext executionContext, final SQLException originalException) throws SQLException {
236 DatabaseType databaseType = ProxyContext.getInstance().getContextManager().getDatabase(databaseConnectionManager.getConnectionSession().getDatabaseName()).getProtocolType();
237 Optional<ExecuteResult> executeResult = new SaneQueryResultEngine(databaseType).getSaneQueryResult(executionContext.getSqlStatementContext().getSqlStatement(), originalException);
238 return executeResult.map(Collections::singletonList).orElseThrow(() -> originalException);
239 }
240 }