View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.backend.connector;
19  
20  import com.google.common.base.Strings;
21  import lombok.Getter;
22  import org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
23  import org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.transaction.DialectTransactionOption;
24  import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
25  import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
26  import org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
27  import org.apache.shardingsphere.database.exception.core.exception.transaction.TableModifyInTransactionException;
28  import org.apache.shardingsphere.infra.binder.context.segment.table.TablesContext;
29  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
30  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
31  import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
32  import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
33  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
34  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
35  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
36  import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
37  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
38  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
39  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
40  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
41  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
42  import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
43  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
44  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
45  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
46  import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
47  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
48  import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
49  import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
50  import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
51  import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
52  import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
53  import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
54  import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.ProxyJDBCExecutor;
55  import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
56  import org.apache.shardingsphere.proxy.backend.connector.sane.DialectSaneQueryResultEngine;
57  import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
58  import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
59  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
60  import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
61  import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
62  import org.apache.shardingsphere.sql.parser.statement.core.enums.TransactionIsolationLevel;
63  import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
64  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CloseStatement;
65  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.CursorStatement;
66  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.DDLStatement;
67  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.FetchStatement;
68  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.MoveStatement;
69  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.ddl.TruncateStatement;
70  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
71  import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
72  import org.apache.shardingsphere.transaction.api.TransactionType;
73  import org.apache.shardingsphere.transaction.spi.TransactionHook;
74  
75  import java.sql.Connection;
76  import java.sql.SQLException;
77  import java.util.Collection;
78  import java.util.Collections;
79  import java.util.List;
80  import java.util.Map;
81  import java.util.Map.Entry;
82  import java.util.Optional;
83  
84  /**
85   * Proxy SQL Executor.
86   */
87  public final class ProxySQLExecutor {
88      
89      private final JDBCDriverType type;
90      
91      private final ProxyDatabaseConnectionManager databaseConnectionManager;
92      
93      private final ProxyJDBCExecutor regularExecutor;
94      
95      private final RawExecutor rawExecutor;
96      
97      @Getter
98      private final SQLFederationEngine sqlFederationEngine;
99      
100     @SuppressWarnings("rawtypes")
101     private final Map<ShardingSphereRule, TransactionHook> transactionHooks = OrderedSPILoader.getServices(
102             TransactionHook.class, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules());
103     
104     public ProxySQLExecutor(final JDBCDriverType type,
105                             final ProxyDatabaseConnectionManager databaseConnectionManager, final DatabaseProxyConnector databaseProxyConnector, final SQLStatementContext sqlStatementContext) {
106         this.type = type;
107         this.databaseConnectionManager = databaseConnectionManager;
108         ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
109         ConnectionContext connectionContext = databaseConnectionManager.getConnectionSession().getConnectionContext();
110         JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connectionContext);
111         regularExecutor = new ProxyJDBCExecutor(type, databaseConnectionManager.getConnectionSession(), databaseProxyConnector, jdbcExecutor);
112         rawExecutor = new RawExecutor(executorEngine, connectionContext);
113         MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
114         String currentDatabaseName = Strings.isNullOrEmpty(databaseConnectionManager.getConnectionSession().getCurrentDatabaseName())
115                 ? databaseConnectionManager.getConnectionSession().getUsedDatabaseName()
116                 : databaseConnectionManager.getConnectionSession().getCurrentDatabaseName();
117         String currentSchemaName = getSchemaName(sqlStatementContext, metaDataContexts.getMetaData().getDatabase(currentDatabaseName));
118         sqlFederationEngine = new SQLFederationEngine(currentDatabaseName, currentSchemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
119     }
120     
121     private String getSchemaName(final SQLStatementContext sqlStatementContext, final ShardingSphereDatabase database) {
122         String defaultSchemaName = new DatabaseTypeRegistry(sqlStatementContext.getSqlStatement().getDatabaseType()).getDefaultSchemaName(database.getName());
123         return sqlStatementContext.getTablesContext().getSchemaName().orElse(defaultSchemaName);
124     }
125     
126     /**
127      * Check execute prerequisites.
128      *
129      * @param sqlStatementContext execution context
130      */
131     public void checkExecutePrerequisites(final SQLStatementContext sqlStatementContext) {
132         ShardingSpherePreconditions.checkState(
133                 isValidExecutePrerequisites(sqlStatementContext.getSqlStatement()), () -> new TableModifyInTransactionException(getTableName(sqlStatementContext.getTablesContext())));
134     }
135     
136     private boolean isValidExecutePrerequisites(final SQLStatement sqlStatement) {
137         return !(sqlStatement instanceof DDLStatement) || isSupportDDLInTransaction(sqlStatement.getDatabaseType(), (DDLStatement) sqlStatement);
138     }
139     
140     private boolean isSupportDDLInTransaction(final DatabaseType databaseType, final DDLStatement sqlStatement) {
141         DialectTransactionOption transactionOption = new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getTransactionOption();
142         boolean isDDLWithoutMetaDataChanged = isDDLWithoutMetaDataChanged(sqlStatement);
143         if (isInXATransaction()) {
144             return transactionOption.isSupportDDLInXATransaction() && (isDDLWithoutMetaDataChanged || transactionOption.isSupportMetaDataRefreshInTransaction());
145         }
146         if (isInLocalTransaction()) {
147             return transactionOption.isSupportMetaDataRefreshInTransaction() || isDDLWithoutMetaDataChanged;
148         }
149         return true;
150     }
151     
152     private boolean isInXATransaction() {
153         TransactionType transactionType = TransactionUtils.getTransactionType(databaseConnectionManager.getConnectionSession().getConnectionContext().getTransactionContext());
154         TransactionStatus transactionStatus = databaseConnectionManager.getConnectionSession().getTransactionStatus();
155         return TransactionType.XA == transactionType && transactionStatus.isInTransaction();
156     }
157     
158     private boolean isInLocalTransaction() {
159         return databaseConnectionManager.getConnectionSession().getTransactionStatus().isInTransaction();
160     }
161     
162     // TODO should be removed after metadata refresh supported for all database.
163     private boolean isDDLWithoutMetaDataChanged(final DDLStatement sqlStatement) {
164         return isCursorStatement(sqlStatement) || sqlStatement instanceof TruncateStatement;
165     }
166     
167     private boolean isCursorStatement(final DDLStatement sqlStatement) {
168         return sqlStatement instanceof CursorStatement || sqlStatement instanceof CloseStatement || sqlStatement instanceof MoveStatement || sqlStatement instanceof FetchStatement;
169     }
170     
171     private String getTableName(final TablesContext tablesContext) {
172         return tablesContext.getSimpleTables().isEmpty() ? "unknown_table" : tablesContext.getSimpleTables().iterator().next().getTableName().getIdentifier().getValue();
173     }
174     
175     /**
176      * Execute SQL.
177      *
178      * @param executionContext execution context
179      * @return execute results
180      * @throws SQLException SQL exception
181      */
182     public List<ExecuteResult> execute(final ExecutionContext executionContext) throws SQLException {
183         String databaseName = databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
184         Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getRuleMetaData().getRules();
185         int maxConnectionsSizePerQuery = ProxyContext.getInstance()
186                 .getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
187         DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(executionContext.getSqlStatementContext().getSqlStatement().getDatabaseType()).getDialectDatabaseMetaData();
188         boolean isReturnGeneratedKeys = executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement && dialectDatabaseMetaData.getGeneratedKeyOption().isPresent();
189         return hasRawExecutionRule(rules)
190                 ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery)
191                 : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
192     }
193     
194     private boolean hasRawExecutionRule(final Collection<ShardingSphereRule> rules) {
195         for (ShardingSphereRule each : rules) {
196             if (each.getAttributes().findAttribute(RawExecutionRuleAttribute.class).isPresent()) {
197                 return true;
198             }
199         }
200         return false;
201     }
202     
203     private List<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
204         RawExecutionPrepareEngine prepareEngine = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
205         ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext;
206         try {
207             String databaseName = databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
208             executionGroupContext = prepareEngine.prepare(databaseName, executionContext, executionContext.getExecutionUnits(),
209                     new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
210                             databaseName, databaseConnectionManager.getConnectionSession().getConnectionContext().getGrantee()));
211         } catch (final SQLException ex) {
212             return getSaneExecuteResults(executionContext, ex);
213         }
214         // TODO handle query header
215         return rawExecutor.execute(executionGroupContext, executionContext.getQueryContext(), new RawSQLExecutorCallback());
216     }
217     
218     private List<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
219                                                    final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
220         JDBCBackendStatement statementManager = (JDBCBackendStatement) databaseConnectionManager.getConnectionSession().getStatementManager();
221         String databaseName = databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
222         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
223                 type, maxConnectionsSizePerQuery, databaseConnectionManager, statementManager, new StatementOption(isReturnGeneratedKeys), rules,
224                 ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData());
225         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
226         try {
227             executionGroupContext = prepareEngine.prepare(databaseName, executionContext, executionContext.getExecutionUnits(),
228                     new ExecutionGroupReportContext(databaseConnectionManager.getConnectionSession().getProcessId(),
229                             databaseName, databaseConnectionManager.getConnectionSession().getConnectionContext().getGrantee()));
230         } catch (final SQLException ex) {
231             return getSaneExecuteResults(executionContext, ex);
232         }
233         executeTransactionHooksBeforeExecuteSQL(databaseConnectionManager.getConnectionSession());
234         return regularExecutor.execute(executionContext.getQueryContext(), executionGroupContext, isReturnGeneratedKeys, isExceptionThrown);
235     }
236     
237     @SuppressWarnings({"unchecked", "rawtypes"})
238     private void executeTransactionHooksBeforeExecuteSQL(final ConnectionSession connectionSession) throws SQLException {
239         if (!getTransactionContext(connectionSession).isInTransaction()) {
240             return;
241         }
242         for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
243             entry.getValue().beforeExecuteSQL(entry.getKey(), ProxyContext.getInstance().getContextManager().getDatabaseType(),
244                     connectionSession.getDatabaseConnectionManager().getCachedConnections().values(), getTransactionContext(connectionSession),
245                     connectionSession.getIsolationLevel().orElse(TransactionIsolationLevel.READ_COMMITTED));
246         }
247     }
248     
249     private TransactionConnectionContext getTransactionContext(final ConnectionSession connectionSession) {
250         return connectionSession.getDatabaseConnectionManager().getConnectionSession().getConnectionContext().getTransactionContext();
251     }
252     
253     private List<ExecuteResult> getSaneExecuteResults(final ExecutionContext executionContext, final SQLException originalException) throws SQLException {
254         DatabaseType databaseType = ProxyContext.getInstance().getContextManager().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName()).getProtocolType();
255         Optional<ExecuteResult> executeResult = DatabaseTypedSPILoader.findService(DialectSaneQueryResultEngine.class, databaseType)
256                 .flatMap(optional -> optional.getSaneQueryResult(executionContext.getSqlStatementContext().getSqlStatement(), originalException));
257         return executeResult.map(Collections::singletonList).orElseThrow(() -> originalException);
258     }
259 }