1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.driver.jdbc.core.statement;
19
20 import com.google.common.base.Strings;
21 import lombok.AccessLevel;
22 import lombok.Getter;
23 import org.apache.shardingsphere.driver.executor.DriverExecutor;
24 import org.apache.shardingsphere.driver.executor.batch.BatchStatementExecutor;
25 import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
26 import org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
27 import org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
28 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractStatementAdapter;
29 import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
30 import org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
31 import org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
32 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
33 import org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
34 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
35 import org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext;
36 import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
37 import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
38 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
39 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
40 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
41 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
42 import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
43 import org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
44 import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
45 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
46 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
47 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
48 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
49 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
50 import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
51 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
52 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
53 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
54 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
55 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
56 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
57 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
58 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
59 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
60 import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
61 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
62 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
63 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
64 import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
65 import org.apache.shardingsphere.infra.hint.HintValueContext;
66 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
67 import org.apache.shardingsphere.infra.instance.InstanceContext;
68 import org.apache.shardingsphere.infra.merge.MergeEngine;
69 import org.apache.shardingsphere.infra.merge.result.MergedResult;
70 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
71 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
72 import org.apache.shardingsphere.infra.metadata.user.Grantee;
73 import org.apache.shardingsphere.infra.route.context.RouteContext;
74 import org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
75 import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
76 import org.apache.shardingsphere.infra.session.query.QueryContext;
77 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
78 import org.apache.shardingsphere.parser.rule.SQLParserRule;
79 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
80 import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
81 import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
82 import org.apache.shardingsphere.traffic.engine.TrafficEngine;
83 import org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
84 import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
85 import org.apache.shardingsphere.traffic.rule.TrafficRule;
86 import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
87 import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
88
89 import java.sql.Connection;
90 import java.sql.ResultSet;
91 import java.sql.SQLException;
92 import java.sql.Statement;
93 import java.util.ArrayList;
94 import java.util.Collection;
95 import java.util.Collections;
96 import java.util.LinkedList;
97 import java.util.List;
98 import java.util.Optional;
99 import java.util.stream.Collectors;
100
101
102
103
104 @HighFrequencyInvocation
105 public final class ShardingSphereStatement extends AbstractStatementAdapter {
106
107 @Getter
108 private final ShardingSphereConnection connection;
109
110 private final MetaDataContexts metaDataContexts;
111
112 private final List<Statement> statements;
113
114 private final StatementOption statementOption;
115
116 @Getter(AccessLevel.PROTECTED)
117 private final DriverExecutor executor;
118
119 private final KernelProcessor kernelProcessor;
120
121 private final TrafficRule trafficRule;
122
123 @Getter(AccessLevel.PROTECTED)
124 private final StatementManager statementManager;
125
126 private final BatchStatementExecutor batchStatementExecutor;
127
128 private boolean returnGeneratedKeys;
129
130 private ExecutionContext executionContext;
131
132 private ResultSet currentResultSet;
133
134 private String trafficInstanceId;
135
136 private boolean useFederation;
137
138 private String databaseName;
139
140 public ShardingSphereStatement(final ShardingSphereConnection connection) {
141 this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
142 }
143
144 public ShardingSphereStatement(final ShardingSphereConnection connection, final int resultSetType, final int resultSetConcurrency) {
145 this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
146 }
147
148 public ShardingSphereStatement(final ShardingSphereConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
149 this.connection = connection;
150 metaDataContexts = connection.getContextManager().getMetaDataContexts();
151 statements = new LinkedList<>();
152 statementOption = new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
153 executor = new DriverExecutor(connection);
154 kernelProcessor = new KernelProcessor();
155 trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
156 statementManager = new StatementManager();
157 batchStatementExecutor = new BatchStatementExecutor(this);
158 databaseName = connection.getDatabaseName();
159 }
160
161 @Override
162 public ResultSet executeQuery(final String sql) throws SQLException {
163 if (Strings.isNullOrEmpty(sql)) {
164 throw new EmptySQLException().toSQLException();
165 }
166 ResultSet result;
167 try {
168 QueryContext queryContext = createQueryContext(sql);
169 handleAutoCommit(queryContext);
170 databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
171 connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
172 trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
173 if (null != trafficInstanceId) {
174 JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
175 return executor.getTrafficExecutor().execute(executionUnit, Statement::executeQuery);
176 }
177 useFederation = decide(queryContext,
178 metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData());
179 if (useFederation) {
180 return executeFederationQuery(queryContext);
181 }
182 executionContext = createExecutionContext(queryContext);
183 result = doExecuteQuery(executionContext);
184
185 } catch (final RuntimeException ex) {
186
187 handleExceptionInTransaction(connection, metaDataContexts);
188 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
189 } finally {
190 currentResultSet = null;
191 }
192 currentResultSet = result;
193 return result;
194 }
195
196 private ShardingSphereResultSet doExecuteQuery(final ExecutionContext executionContext) throws SQLException {
197 List<QueryResult> queryResults = executeQuery0(executionContext);
198 MergedResult mergedResult = mergeQuery(queryResults, executionContext.getSqlStatementContext());
199 boolean selectContainsEnhancedTable =
200 executionContext.getSqlStatementContext() instanceof SelectStatementContext && ((SelectStatementContext) executionContext.getSqlStatementContext()).isContainsEnhancedTable();
201 return new ShardingSphereResultSet(getResultSets(), mergedResult, this, selectContainsEnhancedTable, executionContext);
202 }
203
204 private boolean decide(final QueryContext queryContext, final ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
205 return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData);
206 }
207
208 private Optional<String> getInstanceIdAndSet(final QueryContext queryContext) {
209 Optional<String> result = connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId();
210 if (!result.isPresent()) {
211 result = getInstanceId(queryContext);
212 }
213 if (connection.isHoldTransaction() && result.isPresent()) {
214 connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get());
215 }
216 return result;
217 }
218
219 private Optional<String> getInstanceId(final QueryContext queryContext) {
220 InstanceContext instanceContext = connection.getContextManager().getInstanceContext();
221 return null != trafficRule && !trafficRule.getStrategyRules().isEmpty()
222 ? new TrafficEngine(trafficRule, instanceContext).dispatch(queryContext, connection.isHoldTransaction())
223 : Optional.empty();
224 }
225
226 private List<QueryResult> executeQuery0(final ExecutionContext executionContext) throws SQLException {
227 if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) {
228 return executor.getRawExecutor().execute(
229 createRawExecutionContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
230 }
231 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
232 cacheStatements(executionGroupContext.getInputGroups());
233 StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
234 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), executionContext.getSqlStatementContext().getSqlStatement(),
235 SQLExecutorExceptionHandler.isExceptionThrown());
236 return executor.getRegularExecutor().executeQuery(executionGroupContext, executionContext.getQueryContext(), callback);
237 }
238
239 private ResultSet executeFederationQuery(final QueryContext queryContext) {
240 StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
241 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), queryContext.getSqlStatementContext().getSqlStatement(),
242 SQLExecutorExceptionHandler.isExceptionThrown());
243 SQLFederationContext context = new SQLFederationContext(false, queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
244 return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(), callback, context);
245 }
246
247 private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
248 int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
249 return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), statementManager, statementOption,
250 metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
251 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits());
252 }
253
254 @Override
255 public int executeUpdate(final String sql) throws SQLException {
256 try {
257 return executeUpdate0(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL), Statement::executeUpdate);
258
259 } catch (final RuntimeException ex) {
260
261 handleExceptionInTransaction(connection, metaDataContexts);
262 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
263 } finally {
264 currentResultSet = null;
265 }
266 }
267
268 @Override
269 public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
270 if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
271 returnGeneratedKeys = true;
272 }
273 try {
274 return executeUpdate0(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL, autoGeneratedKeys),
275 (statement, actualSQL) -> statement.executeUpdate(actualSQL, autoGeneratedKeys));
276
277 } catch (final RuntimeException ex) {
278
279 handleExceptionInTransaction(connection, metaDataContexts);
280 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
281 } finally {
282 currentResultSet = null;
283 }
284 }
285
286 @Override
287 public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
288 returnGeneratedKeys = true;
289 try {
290 return executeUpdate0(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnIndexes), (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnIndexes));
291
292 } catch (final RuntimeException ex) {
293
294 handleExceptionInTransaction(connection, metaDataContexts);
295 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
296 } finally {
297 currentResultSet = null;
298 }
299 }
300
301 @Override
302 public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
303 returnGeneratedKeys = true;
304 try {
305 return executeUpdate0(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnNames), (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnNames));
306
307 } catch (final RuntimeException ex) {
308
309 handleExceptionInTransaction(connection, metaDataContexts);
310 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
311 } finally {
312 currentResultSet = null;
313 }
314 }
315
316 private int executeUpdate(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext, final ExecutionContext executionContext) throws SQLException {
317 return isNeedImplicitCommitTransaction(connection, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1)
318 ? executeUpdateWithImplicitCommitTransaction(() -> useDriverToExecuteUpdate(updateCallback, sqlStatementContext, executionContext))
319 : useDriverToExecuteUpdate(updateCallback, sqlStatementContext, executionContext);
320 }
321
322 private int executeUpdate0(final String sql, final ExecuteUpdateCallback updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws SQLException {
323 QueryContext queryContext = createQueryContext(sql);
324 handleAutoCommit(queryContext);
325 databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
326 connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
327 trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
328 if (null != trafficInstanceId) {
329 JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
330 return executor.getTrafficExecutor().execute(executionUnit, trafficCallback);
331 }
332 executionContext = createExecutionContext(queryContext);
333 if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) {
334 Collection<ExecuteResult> results = executor.getRawExecutor().execute(createRawExecutionContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback());
335 return accumulate(results);
336 }
337 return executeUpdate(updateCallback, queryContext.getSqlStatementContext(), executionContext);
338 }
339
340 private int executeUpdateWithImplicitCommitTransaction(final ImplicitTransactionCallback<Integer> callback) throws SQLException {
341 int result;
342 try {
343 connection.setAutoCommit(false);
344 result = callback.execute();
345 connection.commit();
346
347 } catch (final RuntimeException ex) {
348
349 connection.rollback();
350 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
351 } finally {
352 connection.setAutoCommit(true);
353 }
354 return result;
355 }
356
357 private int useDriverToExecuteUpdate(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext,
358 final ExecutionContext executionContext) throws SQLException {
359 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
360 cacheStatements(executionGroupContext.getInputGroups());
361 JDBCExecutorCallback<Integer> callback = createExecuteUpdateCallback(updateCallback, sqlStatementContext);
362 return executor.getRegularExecutor().executeUpdate(executionGroupContext,
363 executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), callback);
364 }
365
366 private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final ExecuteUpdateCallback updateCallback, final SQLStatementContext sqlStatementContext) {
367 boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
368 return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
369 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatementContext.getSqlStatement(), isExceptionThrown) {
370
371 @Override
372 protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
373 return updateCallback.executeUpdate(sql, statement);
374 }
375
376 @Override
377 protected Optional<Integer> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
378 return Optional.empty();
379 }
380 };
381 }
382
383 private int accumulate(final Collection<ExecuteResult> results) {
384 int result = 0;
385 for (ExecuteResult each : results) {
386 result += ((UpdateResult) each).getUpdateCount();
387 }
388 return result;
389 }
390
391 @Override
392 public boolean execute(final String sql) throws SQLException {
393 try {
394 return execute0(sql, (actualSQL, statement) -> statement.execute(actualSQL), Statement::execute);
395
396 } catch (final SQLException ex) {
397
398 handleExceptionInTransaction(connection, metaDataContexts);
399 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
400 }
401 }
402
403 @Override
404 public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
405 try {
406 if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
407 returnGeneratedKeys = true;
408 }
409 return execute0(sql, (actualSQL, statement) -> statement.execute(actualSQL, autoGeneratedKeys), (statement, actualSQL) -> statement.execute(actualSQL, autoGeneratedKeys));
410
411 } catch (final SQLException ex) {
412
413 handleExceptionInTransaction(connection, metaDataContexts);
414 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
415 }
416 }
417
418 @Override
419 public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
420 try {
421 returnGeneratedKeys = true;
422 return execute0(sql, (actualSQL, statement) -> statement.execute(actualSQL, columnIndexes), (statement, actualSQL) -> statement.execute(actualSQL, columnIndexes));
423
424 } catch (final SQLException ex) {
425
426 handleExceptionInTransaction(connection, metaDataContexts);
427 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
428 }
429 }
430
431 @Override
432 public boolean execute(final String sql, final String[] columnNames) throws SQLException {
433 try {
434 returnGeneratedKeys = true;
435 return execute0(sql, (actualSQL, statement) -> statement.execute(actualSQL, columnNames), (statement, actualSQL) -> statement.execute(actualSQL, columnNames));
436
437 } catch (final SQLException ex) {
438
439 handleExceptionInTransaction(connection, metaDataContexts);
440 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
441 }
442 }
443
444 private boolean execute0(final String sql, final ExecuteCallback executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws SQLException {
445 try {
446 QueryContext queryContext = createQueryContext(sql);
447 handleAutoCommit(queryContext);
448 databaseName = queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
449 connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
450 trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
451 if (null != trafficInstanceId) {
452 JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
453 return executor.getTrafficExecutor().execute(executionUnit, trafficCallback);
454 }
455 useFederation = decide(queryContext,
456 metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData());
457 if (useFederation) {
458 ResultSet resultSet = executeFederationQuery(queryContext);
459 return null != resultSet;
460 }
461 executionContext = createExecutionContext(queryContext);
462 if (!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()) {
463 Collection<ExecuteResult> results = executor.getRawExecutor().execute(createRawExecutionContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback());
464 return results.iterator().next() instanceof QueryResult;
465 }
466 return executeWithExecutionContext(executeCallback, executionContext);
467 } finally {
468 currentResultSet = null;
469 }
470 }
471
472 private void handleAutoCommit(final QueryContext queryContext) throws SQLException {
473 if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) {
474 connection.handleAutoCommit();
475 }
476 }
477
478 private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException {
479 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
480 ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
481 ExecutionGroupContext<JDBCExecutionUnit> context =
482 prepareEngine.prepare(new RouteContext(), Collections.singletonList(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
483 return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
484 }
485
486 private void clearStatements() throws SQLException {
487 for (Statement each : statements) {
488 each.close();
489 }
490 statements.clear();
491 }
492
493 @Override
494 public void addBatch(final String sql) throws SQLException {
495 batchStatementExecutor.addBatch(sql);
496 }
497
498 @Override
499 public void clearBatch() {
500 batchStatementExecutor.clear();
501 }
502
503 @Override
504 public int[] executeBatch() throws SQLException {
505 return batchStatementExecutor.executeBatch();
506 }
507
508 private QueryContext createQueryContext(final String originSQL) {
509 SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
510 String sql = SQLHintUtils.removeHint(originSQL);
511 HintValueContext hintValueContext = SQLHintUtils.extractHint(originSQL);
512 SQLStatement sqlStatement = sqlParserRule.getSQLParserEngine(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType()).parse(sql, false);
513 SQLStatementContext sqlStatementContext = new SQLBindEngine(metaDataContexts.getMetaData(), databaseName, hintValueContext).bind(sqlStatement, Collections.emptyList());
514 return new QueryContext(sqlStatementContext, sql, Collections.emptyList(), hintValueContext);
515 }
516
517 private ExecutionContext createExecutionContext(final QueryContext queryContext) throws SQLException {
518 clearStatements();
519 RuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();
520 ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(databaseName);
521 SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext());
522 return kernelProcessor.generateExecutionContext(queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(),
523 connection.getDatabaseConnectionManager().getConnectionContext());
524 }
525
526 private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
527 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
528 return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(),
529 new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
530 }
531
532 private ExecutionGroupContext<RawSQLExecutionUnit> createRawExecutionContext(final ExecutionContext executionContext) throws SQLException {
533 int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
534 return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
535 .prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
536 }
537
538 private boolean executeWithExecutionContext(final ExecuteCallback executeCallback, final ExecutionContext executionContext) throws SQLException {
539 return isNeedImplicitCommitTransaction(connection, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1)
540 ? executeWithImplicitCommitTransaction(() -> useDriverToExecute(executeCallback, executionContext))
541 : useDriverToExecute(executeCallback, executionContext);
542 }
543
544 private boolean executeWithImplicitCommitTransaction(final ImplicitTransactionCallback<Boolean> callback) throws SQLException {
545 boolean result;
546 try {
547 connection.setAutoCommit(false);
548 result = callback.execute();
549 connection.commit();
550
551 } catch (final Exception ex) {
552
553 connection.rollback();
554 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
555 } finally {
556 connection.setAutoCommit(true);
557 }
558 return result;
559 }
560
561 private boolean useDriverToExecute(final ExecuteCallback callback, final ExecutionContext executionContext) throws SQLException {
562 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
563 cacheStatements(executionGroupContext.getInputGroups());
564 JDBCExecutorCallback<Boolean> jdbcExecutorCallback = createExecuteCallback(callback, executionContext.getSqlStatementContext().getSqlStatement());
565 return executor.getRegularExecutor().execute(executionGroupContext,
566 executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
567 }
568
569 private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException {
570 for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
571 statements.addAll(each.getInputs().stream().map(JDBCExecutionUnit::getStorageResource).collect(Collectors.toList()));
572 }
573 replay();
574 }
575
576 private JDBCExecutorCallback<Boolean> createExecuteCallback(final ExecuteCallback executeCallback, final SQLStatement sqlStatement) {
577 boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
578 return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
579 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, isExceptionThrown) {
580
581 @Override
582 protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
583 return executeCallback.execute(sql, statement);
584 }
585
586 @Override
587 protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement1, final SQLException ex) {
588 return Optional.empty();
589 }
590 };
591 }
592
593 private void replay() throws SQLException {
594 for (Statement each : statements) {
595 getMethodInvocationRecorder().replay(each);
596 }
597 }
598
599 @Override
600 public ResultSet getResultSet() throws SQLException {
601 if (null != currentResultSet) {
602 return currentResultSet;
603 }
604 if (null != trafficInstanceId) {
605 return executor.getTrafficExecutor().getResultSet();
606 }
607 if (useFederation) {
608 return executor.getSqlFederationEngine().getResultSet();
609 }
610 if (executionContext.getSqlStatementContext() instanceof SelectStatementContext
611 || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
612 List<ResultSet> resultSets = getResultSets();
613 if (resultSets.isEmpty()) {
614 return currentResultSet;
615 }
616 SQLStatementContext sqlStatementContext = executionContext.getSqlStatementContext();
617 MergedResult mergedResult = mergeQuery(getQueryResults(resultSets), sqlStatementContext);
618 boolean selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable();
619 currentResultSet = new ShardingSphereResultSet(resultSets, mergedResult, this, selectContainsEnhancedTable, executionContext);
620 }
621 return currentResultSet;
622 }
623
624 private List<ResultSet> getResultSets() throws SQLException {
625 List<ResultSet> result = new ArrayList<>(statements.size());
626 for (Statement each : statements) {
627 if (null != each.getResultSet()) {
628 result.add(each.getResultSet());
629 }
630 }
631 return result;
632 }
633
634 private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
635 List<QueryResult> result = new ArrayList<>(resultSets.size());
636 for (ResultSet each : resultSets) {
637 if (null != each) {
638 result.add(new JDBCStreamQueryResult(each));
639 }
640 }
641 return result;
642 }
643
644 private MergedResult mergeQuery(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
645 MergeEngine mergeEngine = new MergeEngine(metaDataContexts.getMetaData().getGlobalRuleMetaData(), metaDataContexts.getMetaData().getDatabase(databaseName),
646 metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
647 return mergeEngine.merge(queryResults, sqlStatementContext);
648 }
649
650 @SuppressWarnings("MagicConstant")
651 @Override
652 public int getResultSetType() {
653 return statementOption.getResultSetType();
654 }
655
656 @SuppressWarnings("MagicConstant")
657 @Override
658 public int getResultSetConcurrency() {
659 return statementOption.getResultSetConcurrency();
660 }
661
662 @Override
663 public int getResultSetHoldability() {
664 return statementOption.getResultSetHoldability();
665 }
666
667 @Override
668 public boolean isAccumulate() {
669 for (DataNodeRuleAttribute each : metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(DataNodeRuleAttribute.class)) {
670 if (each.isNeedAccumulate(executionContext.getSqlStatementContext().getTablesContext().getTableNames())) {
671 return true;
672 }
673 }
674 return false;
675 }
676
677 @Override
678 public Collection<Statement> getRoutedStatements() {
679 return statements;
680 }
681
682 @Override
683 public ResultSet getGeneratedKeys() throws SQLException {
684 Optional<GeneratedKeyContext> generatedKey = findGeneratedKey();
685 if (returnGeneratedKeys && generatedKey.isPresent() && !generatedKey.get().getGeneratedValues().isEmpty()) {
686 return new GeneratedKeysResultSet(getGeneratedKeysColumnName(generatedKey.get().getColumnName()), generatedKey.get().getGeneratedValues().iterator(), this);
687 }
688 Collection<Comparable<?>> generatedValues = new LinkedList<>();
689 for (Statement each : statements) {
690 ResultSet resultSet = each.getGeneratedKeys();
691 while (resultSet.next()) {
692 generatedValues.add((Comparable<?>) resultSet.getObject(1));
693 }
694 }
695 String columnName = generatedKey.map(GeneratedKeyContext::getColumnName).orElse(null);
696 return new GeneratedKeysResultSet(getGeneratedKeysColumnName(columnName), generatedValues.iterator(), this);
697 }
698
699 private Optional<GeneratedKeyContext> findGeneratedKey() {
700 return executionContext.getSqlStatementContext() instanceof InsertStatementContext
701 ? ((InsertStatementContext) executionContext.getSqlStatementContext()).getGeneratedKeyContext()
702 : Optional.empty();
703 }
704
705 private String getGeneratedKeysColumnName(final String columnName) {
706 return metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType() instanceof MySQLDatabaseType ? "GENERATED_KEY" : columnName;
707 }
708 }