View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.backend.connector;
19  
20  import com.google.common.base.Strings;
21  import lombok.Getter;
22  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
23  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
24  import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
25  import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.transaction.DialectTransactionOption;
26  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
28  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
29  import org.apache.shardingsphere.infra.exception.dialect.exception.transaction.TableModifyInTransactionException;
30  import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
31  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
32  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
33  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
34  import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
35  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
36  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
37  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
38  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
39  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
40  import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
41  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
42  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
43  import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
44  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
45  import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
46  import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
47  import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
48  import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
49  import org.apache.shardingsphere.infra.session.query.QueryContext;
50  import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
51  import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
52  import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.ProxyJDBCExecutor;
53  import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
54  import org.apache.shardingsphere.proxy.backend.connector.sane.SaneQueryResultEngine;
55  import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
56  import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
57  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
58  import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
59  import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
60  import org.apache.shardingsphere.sql.parser.statement.core.enums.TransactionIsolationLevel;
61  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CloseStatement;
62  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CursorStatement;
63  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.DDLStatement;
64  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.FetchStatement;
65  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.MoveStatement;
66  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.TruncateStatement;
67  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
68  import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
69  import org.apache.shardingsphere.transaction.api.TransactionType;
70  import org.apache.shardingsphere.transaction.spi.TransactionHook;
71  
72  import java.sql.Connection;
73  import java.sql.SQLException;
74  import java.util.Collection;
75  import java.util.Collections;
76  import java.util.List;
77  import java.util.Map;
78  import java.util.Map.Entry;
79  import java.util.Optional;
80  
81  /**
82   * Proxy SQL Executor.
83   */
84  public final class ProxySQLExecutor {
85      
86      private final String type;
87      
88      private final ProxyDatabaseConnectionManager databaseConnectionManager;
89      
90      private final ProxyJDBCExecutor regularExecutor;
91      
92      private final RawExecutor rawExecutor;
93      
94      @Getter
95      private final SQLFederationEngine sqlFederationEngine;
96      
97      @SuppressWarnings("rawtypes")
98      private final Map<ShardingSphereRule, TransactionHook> transactionHooks = OrderedSPILoader.getServices(
99              TransactionHook.class, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules());
100     
101     public ProxySQLExecutor(final String type, final ProxyDatabaseConnectionManager databaseConnectionManager, final DatabaseConnector databaseConnector, final QueryContext queryContext) {
102         this.type = type;
103         this.databaseConnectionManager = databaseConnectionManager;
104         ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
105         ConnectionContext connectionContext = databaseConnectionManager.getConnectionSession().getConnectionContext();
106         JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connectionContext);
107         regularExecutor = new ProxyJDBCExecutor(type, databaseConnectionManager.getConnectionSession(), databaseConnector, jdbcExecutor);
108         rawExecutor = new RawExecutor(executorEngine, connectionContext);
109         MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
110         String currentDatabaseName = Strings.isNullOrEmpty(databaseConnectionManager.getConnectionSession().getCurrentDatabaseName())
111                 ? databaseConnectionManager.getConnectionSession().getUsedDatabaseName()
112                 : databaseConnectionManager.getConnectionSession().getCurrentDatabaseName();
113         String currentSchemaName = getSchemaName(queryContext.getSqlStatementContext(), metaDataContexts.getMetaData().getDatabase(currentDatabaseName));
114         sqlFederationEngine = new SQLFederationEngine(currentDatabaseName, currentSchemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
115     }
116     
117     private String getSchemaName(final SQLStatementContext sqlStatementContext, final ShardingSphereDatabase database) {
118         String defaultSchemaName = new DatabaseTypeRegistry(sqlStatementContext.getDatabaseType()).getDefaultSchemaName(database.getName());
119         return sqlStatementContext.getTablesContext().getSchemaName().orElse(defaultSchemaName);
120     }
121     
122     /**
123      * Check execute prerequisites.
124      *
125      * @param executionContext execution context
126      */
127     public void checkExecutePrerequisites(final ExecutionContext executionContext) {
128         ShardingSpherePreconditions.checkState(
129                 isValidExecutePrerequisites(executionContext.getSqlStatementContext()), () -> new TableModifyInTransactionException(getTableName(executionContext)));
130     }
131     
132     private boolean isValidExecutePrerequisites(final SQLStatementContext sqlStatementContext) {
133         return !(sqlStatementContext.getSqlStatement() instanceof DDLStatement)
134                 || isSupportDDLInTransaction(sqlStatementContext.getDatabaseType(), (DDLStatement) sqlStatementContext.getSqlStatement());
135     }
136     
137     private boolean isSupportDDLInTransaction(final DatabaseType databaseType, final DDLStatement sqlStatement) {
138         DialectTransactionOption transactionOption = new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getTransactionOption();
139         boolean isDDLWithoutMetaDataChanged = isDDLWithoutMetaDataChanged(sqlStatement);
140         if (isInXATransaction()) {
141             return transactionOption.isSupportDDLInXATransaction() && (isDDLWithoutMetaDataChanged || transactionOption.isSupportMetaDataRefreshInTransaction());
142         }
143         if (isInLocalTransaction()) {
144             return transactionOption.isSupportMetaDataRefreshInTransaction() || isDDLWithoutMetaDataChanged;
145         }
146         return true;
147     }
148     
149     private boolean isInXATransaction() {
150         TransactionType transactionType = TransactionUtils.getTransactionType(databaseConnectionManager.getConnectionSession().getConnectionContext().getTransactionContext());
151         TransactionStatus transactionStatus = databaseConnectionManager.getConnectionSession().getTransactionStatus();
152         return TransactionType.XA == transactionType && transactionStatus.isInTransaction();
153     }
154     
155     private boolean isInLocalTransaction() {
156         return databaseConnectionManager.getConnectionSession().getTransactionStatus().isInTransaction();
157     }
158     
159     // TODO should be removed after metadata refresh supported for all database.
160     private boolean isDDLWithoutMetaDataChanged(final DDLStatement sqlStatement) {
161         return isCursorStatement(sqlStatement) || sqlStatement instanceof TruncateStatement;
162     }
163     
164     private boolean isCursorStatement(final DDLStatement sqlStatement) {
165         return sqlStatement instanceof CursorStatement || sqlStatement instanceof CloseStatement || sqlStatement instanceof MoveStatement || sqlStatement instanceof FetchStatement;
166     }
167     
168     private String getTableName(final ExecutionContext executionContext) {
169         return executionContext.getSqlStatementContext().getTablesContext().getSimpleTables().isEmpty()
170                 ? "unknown_table"
171                 : executionContext.getSqlStatementContext().getTablesContext().getSimpleTables().iterator().next().getTableName().getIdentifier().getValue();
172     }
173     
174     /**
175      * Execute SQL.
176      *
177      * @param executionContext execution context
178      * @return execute results
179      * @throws SQLException SQL exception
180      */
181     public List<ExecuteResult> execute(final ExecutionContext executionContext) throws SQLException {
182         String databaseName = databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
183         Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getRuleMetaData().getRules();
184         int maxConnectionsSizePerQuery = ProxyContext.getInstance()
185                 .getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
186         DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(executionContext.getSqlStatementContext().getDatabaseType()).getDialectDatabaseMetaData();
187         boolean isReturnGeneratedKeys = executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement
188                 && dialectDatabaseMetaData.getGeneratedKeyOption().isSupportReturnGeneratedKeys();
189         return hasRawExecutionRule(rules)
190                 ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery)
191                 : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
192     }
193     
194     private boolean hasRawExecutionRule(final Collection<ShardingSphereRule> rules) {
195         for (ShardingSphereRule each : rules) {
196             if (each.getAttributes().findAttribute(RawExecutionRuleAttribute.class).isPresent()) {
197                 return true;
198             }
199         }
200         return false;
201     }
202     
203     private List<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
204         RawExecutionPrepareEngine prepareEngine = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
205         ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext;
206         try {
207             String databaseName = databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
208             executionGroupContext = prepareEngine.prepare(databaseName, executionContext.getRouteContext(), executionContext.getExecutionUnits(),
209                     new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
210                             databaseName, databaseConnectionManager.getConnectionSession().getConnectionContext().getGrantee()));
211         } catch (final SQLException ex) {
212             return getSaneExecuteResults(executionContext, ex);
213         }
214         // TODO handle query header
215         return rawExecutor.execute(executionGroupContext, executionContext.getQueryContext(), new RawSQLExecutorCallback());
216     }
217     
218     private List<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
219                                                    final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
220         JDBCBackendStatement statementManager = (JDBCBackendStatement) databaseConnectionManager.getConnectionSession().getStatementManager();
221         String databaseName = databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
222         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
223                 type, maxConnectionsSizePerQuery, databaseConnectionManager, statementManager, new StatementOption(isReturnGeneratedKeys), rules,
224                 ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData());
225         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
226         try {
227             executionGroupContext = prepareEngine.prepare(databaseName, executionContext.getRouteContext(), executionContext.getExecutionUnits(),
228                     new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
229                             databaseName, databaseConnectionManager.getConnectionSession().getConnectionContext().getGrantee()));
230         } catch (final SQLException ex) {
231             return getSaneExecuteResults(executionContext, ex);
232         }
233         executeTransactionHooksBeforeExecuteSQL(databaseConnectionManager.getConnectionSession());
234         return regularExecutor.execute(executionContext.getQueryContext(), executionGroupContext, isReturnGeneratedKeys, isExceptionThrown);
235     }
236     
237     @SuppressWarnings({"unchecked", "rawtypes"})
238     private void executeTransactionHooksBeforeExecuteSQL(final ConnectionSession connectionSession) throws SQLException {
239         if (!getTransactionContext(connectionSession).isInTransaction()) {
240             return;
241         }
242         for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
243             entry.getValue().beforeExecuteSQL(entry.getKey(), ProxyContext.getInstance().getDatabaseType(),
244                     connectionSession.getDatabaseConnectionManager().getCachedConnections().values(), getTransactionContext(connectionSession),
245                     connectionSession.getIsolationLevel().orElse(TransactionIsolationLevel.READ_COMMITTED));
246         }
247     }
248     
249     private TransactionConnectionContext getTransactionContext(final ConnectionSession connectionSession) {
250         return connectionSession.getDatabaseConnectionManager().getConnectionSession().getConnectionContext().getTransactionContext();
251     }
252     
253     private List<ExecuteResult> getSaneExecuteResults(final ExecutionContext executionContext, final SQLException originalException) throws SQLException {
254         DatabaseType databaseType = ProxyContext.getInstance().getContextManager().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName()).getProtocolType();
255         Optional<ExecuteResult> executeResult = new SaneQueryResultEngine(databaseType).getSaneQueryResult(executionContext.getSqlStatementContext().getSqlStatement(), originalException);
256         return executeResult.map(Collections::singletonList).orElseThrow(() -> originalException);
257     }
258 }