View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.driver.jdbc.core.statement;
19  
20  import com.google.common.base.Strings;
21  import lombok.AccessLevel;
22  import lombok.Getter;
23  import org.apache.shardingsphere.driver.executor.DriverExecutor;
24  import org.apache.shardingsphere.driver.executor.batch.BatchStatementExecutor;
25  import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
26  import org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
27  import org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
28  import org.apache.shardingsphere.driver.jdbc.adapter.AbstractStatementAdapter;
29  import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
30  import org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
31  import org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
32  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
33  import org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
34  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
35  import org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext;
36  import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
37  import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
38  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
39  import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
40  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
41  import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
42  import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
43  import org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
44  import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
45  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
46  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
47  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
48  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
49  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
50  import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
51  import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
52  import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
53  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
54  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
55  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
56  import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
57  import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
58  import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
59  import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
60  import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
61  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
62  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
63  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
64  import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
65  import org.apache.shardingsphere.infra.hint.HintValueContext;
66  import org.apache.shardingsphere.infra.hint.SQLHintUtils;
67  import org.apache.shardingsphere.infra.instance.InstanceContext;
68  import org.apache.shardingsphere.infra.merge.MergeEngine;
69  import org.apache.shardingsphere.infra.merge.result.MergedResult;
70  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
71  import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
72  import org.apache.shardingsphere.infra.metadata.user.Grantee;
73  import org.apache.shardingsphere.infra.route.context.RouteContext;
74  import org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
75  import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
76  import org.apache.shardingsphere.infra.session.query.QueryContext;
77  import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
78  import org.apache.shardingsphere.parser.rule.SQLParserRule;
79  import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
80  import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
81  import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
82  import org.apache.shardingsphere.traffic.engine.TrafficEngine;
83  import org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
84  import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
85  import org.apache.shardingsphere.traffic.rule.TrafficRule;
86  import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
87  import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
88  
89  import java.sql.Connection;
90  import java.sql.ResultSet;
91  import java.sql.SQLException;
92  import java.sql.Statement;
93  import java.util.ArrayList;
94  import java.util.Collection;
95  import java.util.Collections;
96  import java.util.LinkedList;
97  import java.util.List;
98  import java.util.Optional;
99  import java.util.stream.Collectors;
100 
101 /**
102  * ShardingSphere statement.
103  */
104 @HighFrequencyInvocation
105 public final class ShardingSphereStatement extends AbstractStatementAdapter {
106     
107     @Getter
108     private final ShardingSphereConnection connection;
109     
110     private final MetaDataContexts metaDataContexts;
111     
112     private final List<Statement> statements;
113     
114     private final StatementOption statementOption;
115     
116     @Getter(AccessLevel.PROTECTED)
117     private final DriverExecutor executor;
118     
119     private final KernelProcessor kernelProcessor;
120     
121     private final TrafficRule trafficRule;
122     
123     @Getter(AccessLevel.PROTECTED)
124     private final StatementManager statementManager;
125     
126     private final BatchStatementExecutor batchStatementExecutor;
127     
128     private boolean returnGeneratedKeys;
129     
130     private ExecutionContext executionContext;
131     
132     private ResultSet currentResultSet;
133     
134     private String trafficInstanceId;
135     
136     private boolean useFederation;
137     
138     private String databaseName;
139     
140     public ShardingSphereStatement(final ShardingSphereConnection connection) {
141         this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
142     }
143     
144     public ShardingSphereStatement(final ShardingSphereConnection connection, final int resultSetType, final int resultSetConcurrency) {
145         this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
146     }
147     
148     public ShardingSphereStatement(final ShardingSphereConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
149         this.connection = connection;
150         metaDataContexts = connection.getContextManager().getMetaDataContexts();
151         statements = new LinkedList<>();
152         statementOption = new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
153         executor = new DriverExecutor(connection);
154         kernelProcessor = new KernelProcessor();
155         trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
156         statementManager = new StatementManager();
157         batchStatementExecutor = new BatchStatementExecutor(this);
158         databaseName = connection.getDatabaseName();
159     }
160     
161     @Override
162     public ResultSet executeQuery(final String sql) throws SQLException {
163         if (Strings.isNullOrEmpty(sql)) {
164             throw new EmptySQLException().toSQLException();
165         }
166         ResultSet result;
167         try {
168             QueryContext queryContext = createQueryContext(sql);
169             handleAutoCommit(queryContext);
170             databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
171             connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
172             trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
173             if (null != trafficInstanceId) {
174                 JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
175                 return executor.getTrafficExecutor().execute(executionUnit, Statement::executeQuery);
176             }
177             useFederation = decide(queryContext,
178                     metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData());
179             if (useFederation) {
180                 return executeFederationQuery(queryContext);
181             }
182             executionContext = createExecutionContext(queryContext);
183             result = doExecuteQuery(executionContext);
184             // CHECKSTYLE:OFF
185         } catch (final RuntimeException ex) {
186             // CHECKSTYLE:ON
187             handleExceptionInTransaction(connection, metaDataContexts);
188             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
189         } finally {
190             currentResultSet = null;
191         }
192         currentResultSet = result;
193         return result;
194     }
195     
196     private ShardingSphereResultSet doExecuteQuery(final ExecutionContext executionContext) throws SQLException {
197         List<QueryResult> queryResults = executeQuery0(executionContext);
198         MergedResult mergedResult = mergeQuery(queryResults, executionContext.getSqlStatementContext());
199         boolean selectContainsEnhancedTable =
200                 executionContext.getSqlStatementContext() instanceof SelectStatementContext && ((SelectStatementContext) executionContext.getSqlStatementContext()).isContainsEnhancedTable();
201         return new ShardingSphereResultSet(getResultSets(), mergedResult, this, selectContainsEnhancedTable, executionContext);
202     }
203     
204     private boolean decide(final QueryContext queryContext, final ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
205         return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData);
206     }
207     
208     private Optional<String> getInstanceIdAndSet(final QueryContext queryContext) {
209         Optional<String> result = connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId();
210         if (!result.isPresent()) {
211             result = getInstanceId(queryContext);
212         }
213         if (connection.isHoldTransaction() && result.isPresent()) {
214             connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get());
215         }
216         return result;
217     }
218     
219     private Optional<String> getInstanceId(final QueryContext queryContext) {
220         InstanceContext instanceContext = connection.getContextManager().getInstanceContext();
221         return null != trafficRule && !trafficRule.getStrategyRules().isEmpty()
222                 ? new TrafficEngine(trafficRule, instanceContext).dispatch(queryContext, connection.isHoldTransaction())
223                 : Optional.empty();
224     }
225     
226     private List<QueryResult> executeQuery0(final ExecutionContext executionContext) throws SQLException {
227         if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) {
228             return executor.getRawExecutor().execute(
229                     createRawExecutionContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
230         }
231         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
232         cacheStatements(executionGroupContext.getInputGroups());
233         StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
234                 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), executionContext.getSqlStatementContext().getSqlStatement(),
235                 SQLExecutorExceptionHandler.isExceptionThrown());
236         return executor.getRegularExecutor().executeQuery(executionGroupContext, executionContext.getQueryContext(), callback);
237     }
238     
239     private ResultSet executeFederationQuery(final QueryContext queryContext) {
240         StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
241                 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), queryContext.getSqlStatementContext().getSqlStatement(),
242                 SQLExecutorExceptionHandler.isExceptionThrown());
243         SQLFederationContext context = new SQLFederationContext(false, queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
244         return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(), callback, context);
245     }
246     
247     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
248         int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
249         return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), statementManager, statementOption,
250                 metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
251                 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits());
252     }
253     
254     @Override
255     public int executeUpdate(final String sql) throws SQLException {
256         try {
257             return executeUpdate0(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL), Statement::executeUpdate);
258             // CHECKSTYLE:OFF
259         } catch (final RuntimeException ex) {
260             // CHECKSTYLE:ON
261             handleExceptionInTransaction(connection, metaDataContexts);
262             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
263         } finally {
264             currentResultSet = null;
265         }
266     }
267     
268     @Override
269     public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
270         if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
271             returnGeneratedKeys = true;
272         }
273         try {
274             return executeUpdate0(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL, autoGeneratedKeys),
275                     (statement, actualSQL) -> statement.executeUpdate(actualSQL, autoGeneratedKeys));
276             // CHECKSTYLE:OFF
277         } catch (final RuntimeException ex) {
278             // CHECKSTYLE:ON
279             handleExceptionInTransaction(connection, metaDataContexts);
280             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
281         } finally {
282             currentResultSet = null;
283         }
284     }
285     
286     @Override
287     public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
288         returnGeneratedKeys = true;
289         try {
290             return executeUpdate0(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnIndexes), (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnIndexes));
291             // CHECKSTYLE:OFF
292         } catch (final RuntimeException ex) {
293             // CHECKSTYLE:ON
294             handleExceptionInTransaction(connection, metaDataContexts);
295             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
296         } finally {
297             currentResultSet = null;
298         }
299     }
300     
301     @Override
302     public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
303         returnGeneratedKeys = true;
304         try {
305             return executeUpdate0(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnNames), (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnNames));
306             // CHECKSTYLE:OFF
307         } catch (final RuntimeException ex) {
308             // CHECKSTYLE:ON
309             handleExceptionInTransaction(connection, metaDataContexts);
310             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
311         } finally {
312             currentResultSet = null;
313         }
314     }
315     
316     private int executeUpdate(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext, final ExecutionContext executionContext) throws SQLException {
317         return isNeedImplicitCommitTransaction(connection, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1)
318                 ? executeUpdateWithImplicitCommitTransaction(() -> useDriverToExecuteUpdate(updateCallback, sqlStatementContext, executionContext))
319                 : useDriverToExecuteUpdate(updateCallback, sqlStatementContext, executionContext);
320     }
321     
322     private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws SQLException {
323         QueryContext queryContext = createQueryContext(sql);
324         handleAutoCommit(queryContext);
325         databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
326         connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
327         trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
328         if (null != trafficInstanceId) {
329             JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
330             return executor.getTrafficExecutor().execute(executionUnit, trafficCallback);
331         }
332         executionContext = createExecutionContext(queryContext);
333         if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) {
334             Collection<ExecuteResult> results = executor.getRawExecutor().execute(createRawExecutionContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback());
335             return accumulate(results);
336         }
337         return executeUpdate(updateCallback, queryContext.getSqlStatementContext(), executionContext);
338     }
339     
340     private int executeUpdateWithImplicitCommitTransaction(final ImplicitTransactionCallback<Integer> callback) throws SQLException {
341         int result;
342         try {
343             connection.setAutoCommit(false);
344             result = callback.execute();
345             connection.commit();
346             // CHECKSTYLE:OFF
347         } catch (final RuntimeException ex) {
348             // CHECKSTYLE:ON
349             connection.rollback();
350             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
351         } finally {
352             connection.setAutoCommit(true);
353         }
354         return result;
355     }
356     
357     private int useDriverToExecuteUpdate(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext,
358                                          final ExecutionContext executionContext) throws SQLException {
359         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
360         cacheStatements(executionGroupContext.getInputGroups());
361         JDBCExecutorCallback<Integer> callback = createExecuteUpdateCallback(updateCallback, sqlStatementContext);
362         return executor.getRegularExecutor().executeUpdate(executionGroupContext,
363                 executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), callback);
364     }
365     
366     private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext) {
367         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
368         return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
369                 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatementContext.getSqlStatement(), isExceptionThrown) {
370             
371             @Override
372             protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
373                 return updateCallback.executeUpdate(sql, statement);
374             }
375             
376             @Override
377             protected Optional<Integer> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
378                 return Optional.empty();
379             }
380         };
381     }
382     
383     private int accumulate(final Collection<ExecuteResult> results) {
384         int result = 0;
385         for (ExecuteResult each : results) {
386             result += ((UpdateResult) each).getUpdateCount();
387         }
388         return result;
389     }
390     
391     @Override
392     public boolean execute(final String sql) throws SQLException {
393         try {
394             return execute0(sql, (actualSQL, statement) -> statement.execute(actualSQL), Statement::execute);
395             // CHECKSTYLE:OFF
396         } catch (final SQLException ex) {
397             // CHECKSTYLE:ON
398             handleExceptionInTransaction(connection, metaDataContexts);
399             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
400         }
401     }
402     
403     @Override
404     public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
405         try {
406             if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
407                 returnGeneratedKeys = true;
408             }
409             return execute0(sql, (actualSQL, statement) -> statement.execute(actualSQL, autoGeneratedKeys), (statement, actualSQL) -> statement.execute(actualSQL, autoGeneratedKeys));
410             // CHECKSTYLE:OFF
411         } catch (final SQLException ex) {
412             // CHECKSTYLE:ON
413             handleExceptionInTransaction(connection, metaDataContexts);
414             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
415         }
416     }
417     
418     @Override
419     public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
420         try {
421             returnGeneratedKeys = true;
422             return execute0(sql, (actualSQL, statement) -> statement.execute(actualSQL, columnIndexes), (statement, actualSQL) -> statement.execute(actualSQL, columnIndexes));
423             // CHECKSTYLE:OFF
424         } catch (final SQLException ex) {
425             // CHECKSTYLE:ON
426             handleExceptionInTransaction(connection, metaDataContexts);
427             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
428         }
429     }
430     
431     @Override
432     public boolean execute(final String sql, final String[] columnNames) throws SQLException {
433         try {
434             returnGeneratedKeys = true;
435             return execute0(sql, (actualSQL, statement) -> statement.execute(actualSQL, columnNames), (statement, actualSQL) -> statement.execute(actualSQL, columnNames));
436             // CHECKSTYLE:OFF
437         } catch (final SQLException ex) {
438             // CHECKSTYLE:ON
439             handleExceptionInTransaction(connection, metaDataContexts);
440             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
441         }
442     }
443     
444     private boolean execute0(final String sql, final ExecuteCallback executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws SQLException {
445         try {
446             QueryContext queryContext = createQueryContext(sql);
447             handleAutoCommit(queryContext);
448             databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
449             connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
450             trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
451             if (null != trafficInstanceId) {
452                 JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
453                 return executor.getTrafficExecutor().execute(executionUnit, trafficCallback);
454             }
455             useFederation = decide(queryContext,
456                     metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData());
457             if (useFederation) {
458                 ResultSet resultSet = executeFederationQuery(queryContext);
459                 return null != resultSet;
460             }
461             executionContext = createExecutionContext(queryContext);
462             if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) {
463                 Collection<ExecuteResult> results = executor.getRawExecutor().execute(createRawExecutionContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback());
464                 return results.iterator().next() instanceof QueryResult;
465             }
466             return executeWithExecutionContext(executeCallback, executionContext);
467         } finally {
468             currentResultSet = null;
469         }
470     }
471     
472     private void handleAutoCommit(final QueryContext queryContext) throws SQLException {
473         if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) {
474             connection.handleAutoCommit();
475         }
476     }
477     
478     private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException {
479         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
480         ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
481         ExecutionGroupContext<JDBCExecutionUnit> context =
482                 prepareEngine.prepare(new RouteContext(), Collections.singletonList(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
483         return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
484     }
485     
486     private void clearStatements() throws SQLException {
487         for (Statement each : statements) {
488             each.close();
489         }
490         statements.clear();
491     }
492     
493     @Override
494     public void addBatch(final String sql) throws SQLException {
495         batchStatementExecutor.addBatch(sql);
496     }
497     
498     @Override
499     public void clearBatch() {
500         batchStatementExecutor.clear();
501     }
502     
503     @Override
504     public int[] executeBatch() throws SQLException {
505         return batchStatementExecutor.executeBatch();
506     }
507     
508     private QueryContext createQueryContext(final String originSQL) {
509         SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
510         String sql = SQLHintUtils.removeHint(originSQL);
511         HintValueContext hintValueContext = SQLHintUtils.extractHint(originSQL);
512         SQLStatement sqlStatement = sqlParserRule.getSQLParserEngine(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType()).parse(sql, false);
513         SQLStatementContext sqlStatementContext = new SQLBindEngine(metaDataContexts.getMetaData(), databaseName, hintValueContext).bind(sqlStatement, Collections.emptyList());
514         return new QueryContext(sqlStatementContext, sql, Collections.emptyList(), hintValueContext);
515     }
516     
517     private ExecutionContext createExecutionContext(final QueryContext queryContext) throws SQLException {
518         clearStatements();
519         RuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();
520         ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(databaseName);
521         SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext());
522         return kernelProcessor.generateExecutionContext(queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(),
523                 connection.getDatabaseConnectionManager().getConnectionContext());
524     }
525     
526     private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
527         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
528         return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(),
529                 new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
530     }
531     
532     private ExecutionGroupContext<RawSQLExecutionUnit> createRawExecutionContext(final ExecutionContext executionContext) throws SQLException {
533         int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
534         return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
535                 .prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
536     }
537     
538     private boolean executeWithExecutionContext(final ExecuteCallback executeCallback, final ExecutionContext executionContext) throws SQLException {
539         return isNeedImplicitCommitTransaction(connection, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1)
540                 ? executeWithImplicitCommitTransaction(() -> useDriverToExecute(executeCallback, executionContext))
541                 : useDriverToExecute(executeCallback, executionContext);
542     }
543     
544     private boolean executeWithImplicitCommitTransaction(final ImplicitTransactionCallback<Boolean> callback) throws SQLException {
545         boolean result;
546         try {
547             connection.setAutoCommit(false);
548             result = callback.execute();
549             connection.commit();
550             // CHECKSTYLE:OFF
551         } catch (final Exception ex) {
552             // CHECKSTYLE:ON
553             connection.rollback();
554             throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
555         } finally {
556             connection.setAutoCommit(true);
557         }
558         return result;
559     }
560     
561     private boolean useDriverToExecute(final ExecuteCallback callback, final ExecutionContext executionContext) throws SQLException {
562         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
563         cacheStatements(executionGroupContext.getInputGroups());
564         JDBCExecutorCallback<Boolean> jdbcExecutorCallback = createExecuteCallback(callback, executionContext.getSqlStatementContext().getSqlStatement());
565         return executor.getRegularExecutor().execute(executionGroupContext,
566                 executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
567     }
568     
569     private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException {
570         for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
571             statements.addAll(each.getInputs().stream().map(JDBCExecutionUnit::getStorageResource).collect(Collectors.toList()));
572         }
573         replay();
574     }
575     
576     private JDBCExecutorCallback<Boolean> createExecuteCallback(final ExecuteCallback executeCallback, final SQLStatement sqlStatement) {
577         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
578         return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
579                 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, isExceptionThrown) {
580             
581             @Override
582             protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
583                 return executeCallback.execute(sql, statement);
584             }
585             
586             @Override
587             protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement1, final SQLException ex) {
588                 return Optional.empty();
589             }
590         };
591     }
592     
593     private void replay() throws SQLException {
594         for (Statement each : statements) {
595             getMethodInvocationRecorder().replay(each);
596         }
597     }
598     
599     @Override
600     public ResultSet getResultSet() throws SQLException {
601         if (null != currentResultSet) {
602             return currentResultSet;
603         }
604         if (null != trafficInstanceId) {
605             return executor.getTrafficExecutor().getResultSet();
606         }
607         if (useFederation) {
608             return executor.getSqlFederationEngine().getResultSet();
609         }
610         if (executionContext.getSqlStatementContext() instanceof SelectStatementContext
611                 || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
612             List<ResultSet> resultSets = getResultSets();
613             if (resultSets.isEmpty()) {
614                 return currentResultSet;
615             }
616             SQLStatementContext sqlStatementContext = executionContext.getSqlStatementContext();
617             MergedResult mergedResult = mergeQuery(getQueryResults(resultSets), sqlStatementContext);
618             boolean selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable();
619             currentResultSet = new ShardingSphereResultSet(resultSets, mergedResult, this, selectContainsEnhancedTable, executionContext);
620         }
621         return currentResultSet;
622     }
623     
624     private List<ResultSet> getResultSets() throws SQLException {
625         List<ResultSet> result = new ArrayList<>(statements.size());
626         for (Statement each : statements) {
627             if (null != each.getResultSet()) {
628                 result.add(each.getResultSet());
629             }
630         }
631         return result;
632     }
633     
634     private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
635         List<QueryResult> result = new ArrayList<>(resultSets.size());
636         for (ResultSet each : resultSets) {
637             if (null != each) {
638                 result.add(new JDBCStreamQueryResult(each));
639             }
640         }
641         return result;
642     }
643     
644     private MergedResult mergeQuery(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
645         MergeEngine mergeEngine = new MergeEngine(metaDataContexts.getMetaData().getGlobalRuleMetaData(), metaDataContexts.getMetaData().getDatabase(databaseName),
646                 metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
647         return mergeEngine.merge(queryResults, sqlStatementContext);
648     }
649     
650     @SuppressWarnings("MagicConstant")
651     @Override
652     public int getResultSetType() {
653         return statementOption.getResultSetType();
654     }
655     
656     @SuppressWarnings("MagicConstant")
657     @Override
658     public int getResultSetConcurrency() {
659         return statementOption.getResultSetConcurrency();
660     }
661     
662     @Override
663     public int getResultSetHoldability() {
664         return statementOption.getResultSetHoldability();
665     }
666     
667     @Override
668     public boolean isAccumulate() {
669         for (DataNodeRuleAttribute each : metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(DataNodeRuleAttribute.class)) {
670             if (each.isNeedAccumulate(executionContext.getSqlStatementContext().getTablesContext().getTableNames())) {
671                 return true;
672             }
673         }
674         return false;
675     }
676     
677     @Override
678     public Collection<Statement> getRoutedStatements() {
679         return statements;
680     }
681     
682     @Override
683     public ResultSet getGeneratedKeys() throws SQLException {
684         Optional<GeneratedKeyContext> generatedKey = findGeneratedKey();
685         if (returnGeneratedKeys && generatedKey.isPresent() && !generatedKey.get().getGeneratedValues().isEmpty()) {
686             return new GeneratedKeysResultSet(getGeneratedKeysColumnName(generatedKey.get().getColumnName()), generatedKey.get().getGeneratedValues().iterator(), this);
687         }
688         Collection<Comparable<?>> generatedValues = new LinkedList<>();
689         for (Statement each : statements) {
690             ResultSet resultSet = each.getGeneratedKeys();
691             while (resultSet.next()) {
692                 generatedValues.add((Comparable<?>) resultSet.getObject(1));
693             }
694         }
695         String columnName = generatedKey.map(GeneratedKeyContext::getColumnName).orElse(null);
696         return new GeneratedKeysResultSet(getGeneratedKeysColumnName(columnName), generatedValues.iterator(), this);
697     }
698     
699     private Optional<GeneratedKeyContext> findGeneratedKey() {
700         return executionContext.getSqlStatementContext() instanceof InsertStatementContext
701                 ? ((InsertStatementContext) executionContext.getSqlStatementContext()).getGeneratedKeyContext()
702                 : Optional.empty();
703     }
704     
705     private String getGeneratedKeysColumnName(final String columnName) {
706         return metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType() instanceof MySQLDatabaseType ? "GENERATED_KEY" : columnName;
707     }
708 }