View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.backend.connector;
19  
20  import lombok.Getter;
21  import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
22  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
23  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
24  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
25  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
26  import org.apache.shardingsphere.infra.exception.dialect.exception.transaction.TableModifyInTransactionException;
27  import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
28  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
29  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
30  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
31  import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
32  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
33  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
34  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
35  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
36  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
37  import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
38  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
39  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
40  import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
41  import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
42  import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
43  import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
44  import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
45  import org.apache.shardingsphere.infra.session.query.QueryContext;
46  import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
47  import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
48  import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.ProxyJDBCExecutor;
49  import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
50  import org.apache.shardingsphere.proxy.backend.connector.sane.SaneQueryResultEngine;
51  import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
52  import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
53  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
54  import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
55  import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
56  import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
57  import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.CloseStatement;
58  import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
59  import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.FetchStatement;
60  import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.MoveStatement;
61  import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.TruncateStatement;
62  import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
63  import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.OpenGaussStatement;
64  import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
65  import org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.PostgreSQLStatement;
66  import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
67  import org.apache.shardingsphere.transaction.api.TransactionType;
68  import org.apache.shardingsphere.transaction.spi.TransactionHook;
69  
70  import java.sql.Connection;
71  import java.sql.SQLException;
72  import java.util.Collection;
73  import java.util.Collections;
74  import java.util.List;
75  import java.util.Optional;
76  
77  /**
78   * Proxy SQL Executor.
79   */
80  public final class ProxySQLExecutor {
81      
82      private final String type;
83      
84      private final ProxyDatabaseConnectionManager databaseConnectionManager;
85      
86      private final ProxyJDBCExecutor regularExecutor;
87      
88      private final RawExecutor rawExecutor;
89      
90      @Getter
91      private final SQLFederationEngine sqlFederationEngine;
92      
93      private final Collection<TransactionHook> transactionHooks = ShardingSphereServiceLoader.getServiceInstances(TransactionHook.class);
94      
95      public ProxySQLExecutor(final String type, final ProxyDatabaseConnectionManager databaseConnectionManager, final DatabaseConnector databaseConnector, final QueryContext queryContext) {
96          this.type = type;
97          this.databaseConnectionManager = databaseConnectionManager;
98          ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
99          ConnectionContext connectionContext = databaseConnectionManager.getConnectionSession().getConnectionContext();
100         JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connectionContext);
101         regularExecutor = new ProxyJDBCExecutor(type, databaseConnectionManager.getConnectionSession(), databaseConnector, jdbcExecutor);
102         rawExecutor = new RawExecutor(executorEngine, connectionContext);
103         MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
104         String databaseName = databaseConnectionManager.getConnectionSession().getDatabaseName();
105         String schemaName = queryContext.getSqlStatementContext().getTablesContext().getSchemaName()
106                 .orElseGet(() -> new DatabaseTypeRegistry(queryContext.getSqlStatementContext().getDatabaseType()).getDefaultSchemaName(databaseName));
107         sqlFederationEngine = new SQLFederationEngine(databaseName, schemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
108     }
109     
110     /**
111      * Check execute prerequisites.
112      *
113      * @param executionContext execution context
114      */
115     public void checkExecutePrerequisites(final ExecutionContext executionContext) {
116         ShardingSpherePreconditions.checkState(isValidExecutePrerequisites(executionContext), () -> new TableModifyInTransactionException(getTableName(executionContext)));
117     }
118     
119     private boolean isValidExecutePrerequisites(final ExecutionContext executionContext) {
120         return !isExecuteDDLInXATransaction(executionContext.getSqlStatementContext().getSqlStatement())
121                 && !isExecuteDDLInPostgreSQLOpenGaussTransaction(executionContext.getSqlStatementContext().getSqlStatement());
122     }
123     
124     private boolean isExecuteDDLInXATransaction(final SQLStatement sqlStatement) {
125         TransactionType transactionType = TransactionUtils.getTransactionType(databaseConnectionManager.getConnectionSession().getConnectionContext().getTransactionContext());
126         TransactionStatus transactionStatus = databaseConnectionManager.getConnectionSession().getTransactionStatus();
127         return TransactionType.XA == transactionType && transactionStatus.isInTransaction() && isUnsupportedDDLStatement(sqlStatement);
128     }
129     
130     private boolean isExecuteDDLInPostgreSQLOpenGaussTransaction(final SQLStatement sqlStatement) {
131         // TODO implement DDL statement commit/rollback in PostgreSQL/openGauss transaction
132         boolean isPostgreSQLOpenGaussStatement = isPostgreSQLOrOpenGaussStatement(sqlStatement);
133         boolean isSupportedStatement = isSupportedSQLStatement(sqlStatement);
134         return sqlStatement instanceof DDLStatement
135                 && !isSupportedStatement && isPostgreSQLOpenGaussStatement && databaseConnectionManager.getConnectionSession().getTransactionStatus().isInTransaction();
136     }
137     
138     private boolean isSupportedSQLStatement(final SQLStatement sqlStatement) {
139         return isCursorStatement(sqlStatement) || sqlStatement instanceof TruncateStatement;
140     }
141     
142     private boolean isCursorStatement(final SQLStatement sqlStatement) {
143         return sqlStatement instanceof OpenGaussCursorStatement
144                 || sqlStatement instanceof CloseStatement || sqlStatement instanceof MoveStatement || sqlStatement instanceof FetchStatement;
145     }
146     
147     private boolean isUnsupportedDDLStatement(final SQLStatement sqlStatement) {
148         if (isPostgreSQLOrOpenGaussStatement(sqlStatement) && isSupportedSQLStatement(sqlStatement)) {
149             return false;
150         }
151         return sqlStatement instanceof DDLStatement;
152     }
153     
154     private boolean isPostgreSQLOrOpenGaussStatement(final SQLStatement sqlStatement) {
155         return sqlStatement instanceof PostgreSQLStatement || sqlStatement instanceof OpenGaussStatement;
156     }
157     
158     private String getTableName(final ExecutionContext executionContext) {
159         return executionContext.getSqlStatementContext() instanceof TableAvailable && !((TableAvailable) executionContext.getSqlStatementContext()).getAllTables().isEmpty()
160                 ? ((TableAvailable) executionContext.getSqlStatementContext()).getAllTables().iterator().next().getTableName().getIdentifier().getValue()
161                 : "unknown_table";
162     }
163     
164     /**
165      * Execute SQL.
166      *
167      * @param executionContext execution context
168      * @return execute results
169      * @throws SQLException SQL exception
170      */
171     public List<ExecuteResult> execute(final ExecutionContext executionContext) throws SQLException {
172         String databaseName = databaseConnectionManager.getConnectionSession().getDatabaseName();
173         Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getRuleMetaData().getRules();
174         int maxConnectionsSizePerQuery = ProxyContext.getInstance()
175                 .getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
176         boolean isReturnGeneratedKeys = executionContext.getSqlStatementContext().getSqlStatement() instanceof MySQLInsertStatement;
177         return hasRawExecutionRule(rules) ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery)
178                 : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
179     }
180     
181     private boolean hasRawExecutionRule(final Collection<ShardingSphereRule> rules) {
182         for (ShardingSphereRule each : rules) {
183             if (each.getAttributes().findAttribute(RawExecutionRuleAttribute.class).isPresent()) {
184                 return true;
185             }
186         }
187         return false;
188     }
189     
190     private List<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
191         RawExecutionPrepareEngine prepareEngine = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
192         ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext;
193         try {
194             executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(),
195                     new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
196                             databaseConnectionManager.getConnectionSession().getDatabaseName(), databaseConnectionManager.getConnectionSession().getGrantee()));
197         } catch (final SQLException ex) {
198             return getSaneExecuteResults(executionContext, ex);
199         }
200         // TODO handle query header
201         return rawExecutor.execute(executionGroupContext, executionContext.getQueryContext(), new RawSQLExecutorCallback());
202     }
203     
204     private List<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
205                                                    final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
206         JDBCBackendStatement statementManager = (JDBCBackendStatement) databaseConnectionManager.getConnectionSession().getStatementManager();
207         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
208                 type, maxConnectionsSizePerQuery, databaseConnectionManager, statementManager, new StatementOption(isReturnGeneratedKeys), rules,
209                 ProxyContext.getInstance().getContextManager().getDatabase(databaseConnectionManager.getConnectionSession().getDatabaseName()).getResourceMetaData().getStorageUnits());
210         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
211         try {
212             executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(),
213                     new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
214                             databaseConnectionManager.getConnectionSession().getDatabaseName(), databaseConnectionManager.getConnectionSession().getGrantee()));
215         } catch (final SQLException ex) {
216             return getSaneExecuteResults(executionContext, ex);
217         }
218         executeTransactionHooksBeforeExecuteSQL(databaseConnectionManager.getConnectionSession());
219         return regularExecutor.execute(executionContext.getQueryContext(), executionGroupContext, isReturnGeneratedKeys, isExceptionThrown);
220     }
221     
222     private void executeTransactionHooksBeforeExecuteSQL(final ConnectionSession connectionSession) throws SQLException {
223         if (!getTransactionContext(connectionSession).isInTransaction()) {
224             return;
225         }
226         for (TransactionHook each : transactionHooks) {
227             each.beforeExecuteSQL(connectionSession.getDatabaseConnectionManager().getCachedConnections().values(), getTransactionContext(connectionSession), connectionSession.getIsolationLevel());
228         }
229     }
230     
231     private TransactionConnectionContext getTransactionContext(final ConnectionSession connectionSession) {
232         return connectionSession.getDatabaseConnectionManager().getConnectionSession().getConnectionContext().getTransactionContext();
233     }
234     
235     private List<ExecuteResult> getSaneExecuteResults(final ExecutionContext executionContext, final SQLException originalException) throws SQLException {
236         DatabaseType databaseType = ProxyContext.getInstance().getContextManager().getDatabase(databaseConnectionManager.getConnectionSession().getDatabaseName()).getProtocolType();
237         Optional<ExecuteResult> executeResult = new SaneQueryResultEngine(databaseType).getSaneQueryResult(executionContext.getSqlStatementContext().getSqlStatement(), originalException);
238         return executeResult.map(Collections::singletonList).orElseThrow(() -> originalException);
239     }
240 }