1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.driver.jdbc.core.statement;
19
20 import com.google.common.base.Strings;
21 import lombok.AccessLevel;
22 import lombok.Getter;
23 import org.apache.shardingsphere.driver.executor.DriverExecutor;
24 import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
25 import org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
26 import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
27 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
28 import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
29 import org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
30 import org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
31 import org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
32 import org.apache.shardingsphere.driver.jdbc.core.statement.metadata.ShardingSphereParameterMetaData;
33 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
34 import org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
35 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
36 import org.apache.shardingsphere.infra.binder.context.aware.ParameterAware;
37 import org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
38 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
39 import org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext;
40 import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
41 import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
42 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
43 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
44 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
45 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
46 import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
47 import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
48 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
49 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
50 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
51 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
52 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
53 import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
54 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
55 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
56 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
57 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
58 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
59 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
60 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
61 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
62 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
63 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
64 import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
65 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
66 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
67 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
68 import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
69 import org.apache.shardingsphere.infra.hint.HintManager;
70 import org.apache.shardingsphere.infra.hint.HintValueContext;
71 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
72 import org.apache.shardingsphere.infra.instance.InstanceContext;
73 import org.apache.shardingsphere.infra.merge.MergeEngine;
74 import org.apache.shardingsphere.infra.merge.result.MergedResult;
75 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
76 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
77 import org.apache.shardingsphere.infra.metadata.user.Grantee;
78 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
79 import org.apache.shardingsphere.infra.route.context.RouteContext;
80 import org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
81 import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
82 import org.apache.shardingsphere.infra.rule.attribute.resoure.StorageConnectorReusableRuleAttribute;
83 import org.apache.shardingsphere.infra.session.query.QueryContext;
84 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
85 import org.apache.shardingsphere.parser.rule.SQLParserRule;
86 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
87 import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
88 import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
89 import org.apache.shardingsphere.traffic.engine.TrafficEngine;
90 import org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
91 import org.apache.shardingsphere.traffic.rule.TrafficRule;
92 import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
93 import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
94
95 import java.sql.Connection;
96 import java.sql.ParameterMetaData;
97 import java.sql.PreparedStatement;
98 import java.sql.ResultSet;
99 import java.sql.SQLException;
100 import java.sql.Statement;
101 import java.util.ArrayList;
102 import java.util.Collection;
103 import java.util.Collections;
104 import java.util.LinkedList;
105 import java.util.List;
106 import java.util.Map;
107 import java.util.Optional;
108 import java.util.stream.Collectors;
109
110
111
112
113 @HighFrequencyInvocation
114 public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter {
115
116 @Getter
117 private final ShardingSphereConnection connection;
118
119 private final MetaDataContexts metaDataContexts;
120
121 private final String sql;
122
123 private final List<PreparedStatement> statements;
124
125 private final List<List<Object>> parameterSets;
126
127 private final SQLStatement sqlStatement;
128
129 private final SQLStatementContext sqlStatementContext;
130
131 private final String databaseName;
132
133 private final StatementOption statementOption;
134
135 @Getter
136 private final ParameterMetaData parameterMetaData;
137
138 @Getter(AccessLevel.PROTECTED)
139 private final DriverExecutor executor;
140
141 private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
142
143 private final Collection<Comparable<?>> generatedValues = new LinkedList<>();
144
145 private final KernelProcessor kernelProcessor;
146
147 private final boolean statementsCacheable;
148
149 private final TrafficRule trafficRule;
150
151 @Getter(AccessLevel.PROTECTED)
152 private final StatementManager statementManager;
153
154 @Getter
155 private final boolean selectContainsEnhancedTable;
156
157 private ExecutionContext executionContext;
158
159 private Map<String, Integer> columnLabelAndIndexMap;
160
161 private ResultSet currentResultSet;
162
163 private String trafficInstanceId;
164
165 private boolean useFederation;
166
167 private final HintValueContext hintValueContext;
168
169 private ResultSet currentBatchGeneratedKeysResultSet;
170
171 public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
172 this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false, null);
173 }
174
175 public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
176 this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT, false, null);
177 }
178
179 public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
180 this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, Statement.RETURN_GENERATED_KEYS == autoGeneratedKeys, null);
181 }
182
183 public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql, final String[] columns) throws SQLException {
184 this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, true, columns);
185 }
186
187 public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency,
188 final int resultSetHoldability) throws SQLException {
189 this(connection, sql, resultSetType, resultSetConcurrency, resultSetHoldability, false, null);
190 }
191
192 private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
193 final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,
194 final String[] columns) throws SQLException {
195 if (Strings.isNullOrEmpty(sql)) {
196 throw new EmptySQLException().toSQLException();
197 }
198 this.connection = connection;
199 metaDataContexts = connection.getContextManager().getMetaDataContexts();
200 hintValueContext = SQLHintUtils.extractHint(sql);
201 this.sql = SQLHintUtils.removeHint(sql);
202 statements = new ArrayList<>();
203 parameterSets = new ArrayList<>();
204 SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
205 SQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType());
206 sqlStatement = sqlParserEngine.parse(this.sql, true);
207 sqlStatementContext = new SQLBindEngine(metaDataContexts.getMetaData(), connection.getDatabaseName(), hintValueContext).bind(sqlStatement, Collections.emptyList());
208 databaseName = sqlStatementContext.getTablesContext().getDatabaseName().orElse(connection.getDatabaseName());
209 connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
210 parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
211 statementOption = returnGeneratedKeys ? new StatementOption(true, columns) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
212 executor = new DriverExecutor(connection);
213 JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.getDatabaseConnectionManager().getConnectionContext());
214 batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, databaseName, connection.getProcessId());
215 kernelProcessor = new KernelProcessor();
216 statementsCacheable = isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData());
217 trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
218 selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable();
219 statementManager = new StatementManager();
220 }
221
222 private boolean isStatementsCacheable(final RuleMetaData databaseRuleMetaData) {
223 return databaseRuleMetaData.getAttributes(StorageConnectorReusableRuleAttribute.class).size() == databaseRuleMetaData.getRules().size() && !HintManager.isInstantiated();
224 }
225
226 @Override
227 public ResultSet executeQuery() throws SQLException {
228 ResultSet result;
229 try {
230 if (statementsCacheable && !statements.isEmpty()) {
231 resetParameters();
232 return statements.iterator().next().executeQuery();
233 }
234 clearPrevious();
235 QueryContext queryContext = createQueryContext();
236 handleAutoCommit(queryContext);
237 trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
238 if (null != trafficInstanceId) {
239 JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
240 return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeQuery());
241 }
242 useFederation = decide(queryContext,
243 metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData());
244 if (useFederation) {
245 return executeFederationQuery(queryContext);
246 }
247 executionContext = createExecutionContext(queryContext);
248 result = doExecuteQuery(executionContext);
249
250 } catch (final RuntimeException ex) {
251
252 handleExceptionInTransaction(connection, metaDataContexts);
253 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
254 } finally {
255 clearBatch();
256 }
257 currentResultSet = result;
258 return result;
259 }
260
261 private ShardingSphereResultSet doExecuteQuery(final ExecutionContext executionContext) throws SQLException {
262 List<QueryResult> queryResults = executeQuery0(executionContext);
263 MergedResult mergedResult = mergeQuery(queryResults, executionContext.getSqlStatementContext());
264 List<ResultSet> resultSets = getResultSets();
265 if (null == columnLabelAndIndexMap) {
266 columnLabelAndIndexMap = ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(sqlStatementContext, selectContainsEnhancedTable, resultSets.get(0).getMetaData());
267 }
268 return new ShardingSphereResultSet(resultSets, mergedResult, this, selectContainsEnhancedTable, executionContext, columnLabelAndIndexMap);
269 }
270
271 private boolean decide(final QueryContext queryContext, final ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
272 return executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, globalRuleMetaData);
273 }
274
275 private void handleAutoCommit(final QueryContext queryContext) throws SQLException {
276 if (AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement())) {
277 connection.handleAutoCommit();
278 }
279 }
280
281 private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final QueryContext queryContext) throws SQLException {
282 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
283 ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
284 ExecutionGroupContext<JDBCExecutionUnit> context =
285 prepareEngine.prepare(new RouteContext(), Collections.singleton(executionUnit), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
286 ShardingSpherePreconditions.checkState(!context.getInputGroups().isEmpty() && !context.getInputGroups().iterator().next().getInputs().isEmpty(), EmptyTrafficExecutionUnitException::new);
287 return context.getInputGroups().iterator().next().getInputs().iterator().next();
288 }
289
290 private Optional<String> getInstanceIdAndSet(final QueryContext queryContext) {
291 Optional<String> result = connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId();
292 if (!result.isPresent()) {
293 result = getInstanceId(queryContext);
294 }
295 if (connection.isHoldTransaction() && result.isPresent()) {
296 connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get());
297 }
298 return result;
299 }
300
301 private Optional<String> getInstanceId(final QueryContext queryContext) {
302 InstanceContext instanceContext = connection.getContextManager().getInstanceContext();
303 return null != trafficRule && !trafficRule.getStrategyRules().isEmpty()
304 ? new TrafficEngine(trafficRule, instanceContext).dispatch(queryContext, connection.isHoldTransaction())
305 : Optional.empty();
306 }
307
308 private void resetParameters() throws SQLException {
309 parameterSets.clear();
310 parameterSets.add(getParameters());
311 replaySetParameter();
312 }
313
314 private List<QueryResult> executeQuery0(final ExecutionContext executionContext) throws SQLException {
315 if (hasRawExecutionRule()) {
316 return executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
317 executionContext.getQueryContext(), new RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
318 }
319 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
320 cacheStatements(executionGroupContext.getInputGroups());
321 return executor.getRegularExecutor().executeQuery(executionGroupContext, executionContext.getQueryContext(),
322 new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
323 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement,
324 SQLExecutorExceptionHandler.isExceptionThrown()));
325 }
326
327 private ResultSet executeFederationQuery(final QueryContext queryContext) {
328 PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
329 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
330 SQLFederationContext context = new SQLFederationContext(false, queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
331 return executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(), callback, context);
332 }
333
334 private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
335 int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
336 return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), statementManager,
337 statementOption, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
338 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits());
339 }
340
341 @Override
342 public int executeUpdate() throws SQLException {
343 try {
344 if (statementsCacheable && !statements.isEmpty()) {
345 resetParameters();
346 return statements.iterator().next().executeUpdate();
347 }
348 clearPrevious();
349 QueryContext queryContext = createQueryContext();
350 handleAutoCommit(queryContext);
351 trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
352 if (null != trafficInstanceId) {
353 JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
354 return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());
355 }
356 executionContext = createExecutionContext(queryContext);
357 if (hasRawExecutionRule()) {
358 Collection<ExecuteResult> results =
359 executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback());
360 return accumulate(results);
361 }
362 return executeUpdateWithExecutionContext(executionContext);
363
364 } catch (final RuntimeException ex) {
365
366 handleExceptionInTransaction(connection, metaDataContexts);
367 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
368 } finally {
369 clearBatch();
370 }
371 }
372
373 private int useDriverToExecuteUpdate(final ExecutionContext executionContext) throws SQLException {
374 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
375 cacheStatements(executionGroupContext.getInputGroups());
376 return executor.getRegularExecutor().executeUpdate(executionGroupContext,
377 executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
378 }
379
380 private int accumulate(final Collection<ExecuteResult> results) {
381 int result = 0;
382 for (ExecuteResult each : results) {
383 result += ((UpdateResult) each).getUpdateCount();
384 }
385 return result;
386 }
387
388 private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {
389 boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
390 return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
391 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, isExceptionThrown) {
392
393 @Override
394 protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
395 return ((PreparedStatement) statement).executeUpdate();
396 }
397
398 @Override
399 protected Optional<Integer> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
400 return Optional.empty();
401 }
402 };
403 }
404
405 @Override
406 public boolean execute() throws SQLException {
407 try {
408 if (statementsCacheable && !statements.isEmpty()) {
409 resetParameters();
410 return statements.iterator().next().execute();
411 }
412 clearPrevious();
413 QueryContext queryContext = createQueryContext();
414 handleAutoCommit(queryContext);
415 trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
416 if (null != trafficInstanceId) {
417 JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
418 return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).execute());
419 }
420 useFederation = decide(queryContext,
421 metaDataContexts.getMetaData().getDatabase(databaseName), metaDataContexts.getMetaData().getGlobalRuleMetaData());
422 if (useFederation) {
423 ResultSet resultSet = executeFederationQuery(queryContext);
424 return null != resultSet;
425 }
426 executionContext = createExecutionContext(queryContext);
427 if (hasRawExecutionRule()) {
428 Collection<ExecuteResult> results =
429 executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext), executionContext.getQueryContext(), new RawSQLExecutorCallback());
430 return results.iterator().next() instanceof QueryResult;
431 }
432 return executeWithExecutionContext(executionContext);
433
434 } catch (final RuntimeException ex) {
435
436 handleExceptionInTransaction(connection, metaDataContexts);
437 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
438 } finally {
439 clearBatch();
440 }
441 }
442
443 private boolean hasRawExecutionRule() {
444 return !metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
445 }
446
447 private ExecutionGroupContext<RawSQLExecutionUnit> createRawExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
448 int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
449 return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
450 .prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
451 }
452
453 private boolean executeWithExecutionContext(final ExecutionContext executionContext) throws SQLException {
454 return isNeedImplicitCommitTransaction(connection, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1)
455 ? executeWithImplicitCommitTransaction(() -> useDriverToExecute(executionContext))
456 : useDriverToExecute(executionContext);
457 }
458
459 private boolean executeWithImplicitCommitTransaction(final ImplicitTransactionCallback<Boolean> callback) throws SQLException {
460 boolean result;
461 try {
462 connection.setAutoCommit(false);
463 result = callback.execute();
464 connection.commit();
465
466 } catch (final Exception ex) {
467
468 connection.rollback();
469 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
470 } finally {
471 connection.setAutoCommit(true);
472 }
473 return result;
474 }
475
476 private int executeUpdateWithExecutionContext(final ExecutionContext executionContext) throws SQLException {
477 return isNeedImplicitCommitTransaction(connection, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1)
478 ? executeUpdateWithImplicitCommitTransaction(() -> useDriverToExecuteUpdate(executionContext))
479 : useDriverToExecuteUpdate(executionContext);
480 }
481
482 private int executeUpdateWithImplicitCommitTransaction(final ImplicitTransactionCallback<Integer> callback) throws SQLException {
483 int result;
484 try {
485 connection.setAutoCommit(false);
486 result = callback.execute();
487 connection.commit();
488
489 } catch (final RuntimeException ex) {
490
491 connection.rollback();
492 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
493 } finally {
494 connection.setAutoCommit(true);
495 }
496 return result;
497 }
498
499 private boolean useDriverToExecute(final ExecutionContext executionContext) throws SQLException {
500 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(executionContext);
501 cacheStatements(executionGroupContext.getInputGroups());
502 return executor.getRegularExecutor().execute(executionGroupContext,
503 executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
504 }
505
506 private JDBCExecutorCallback<Boolean> createExecuteCallback() {
507 boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
508 return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
509 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), sqlStatement, isExceptionThrown) {
510
511 @Override
512 protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
513 return ((PreparedStatement) statement).execute();
514 }
515
516 @Override
517 protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
518 return Optional.empty();
519 }
520 };
521 }
522
523 private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
524 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
525 return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(),
526 new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", "")));
527 }
528
529 @Override
530 public ResultSet getResultSet() throws SQLException {
531 if (null != currentResultSet) {
532 return currentResultSet;
533 }
534 if (null != trafficInstanceId) {
535 return executor.getTrafficExecutor().getResultSet();
536 }
537 if (useFederation) {
538 return executor.getSqlFederationEngine().getResultSet();
539 }
540 if (executionContext.getSqlStatementContext() instanceof SelectStatementContext
541 || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
542 List<ResultSet> resultSets = getResultSets();
543 if (resultSets.isEmpty()) {
544 return currentResultSet;
545 }
546 SQLStatementContext sqlStatementContext = executionContext.getSqlStatementContext();
547 MergedResult mergedResult = mergeQuery(getQueryResults(resultSets), sqlStatementContext);
548 if (null == columnLabelAndIndexMap) {
549 columnLabelAndIndexMap = ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(sqlStatementContext, selectContainsEnhancedTable, resultSets.get(0).getMetaData());
550 }
551 currentResultSet = new ShardingSphereResultSet(resultSets, mergedResult, this, selectContainsEnhancedTable, executionContext, columnLabelAndIndexMap);
552 }
553 return currentResultSet;
554 }
555
556 private List<ResultSet> getResultSets() throws SQLException {
557 List<ResultSet> result = new ArrayList<>(statements.size());
558 for (Statement each : statements) {
559 if (null != each.getResultSet()) {
560 result.add(each.getResultSet());
561 }
562 }
563 return result;
564 }
565
566 private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
567 List<QueryResult> result = new ArrayList<>(resultSets.size());
568 for (ResultSet each : resultSets) {
569 if (null != each) {
570 result.add(new JDBCStreamQueryResult(each));
571 }
572 }
573 return result;
574 }
575
576 private ExecutionContext createExecutionContext(final QueryContext queryContext) {
577 RuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();
578 ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(databaseName);
579 SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext());
580 ExecutionContext result = kernelProcessor.generateExecutionContext(
581 queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
582 findGeneratedKey(result).ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));
583 return result;
584 }
585
586 private ExecutionContext createExecutionContext(final QueryContext queryContext, final String trafficInstanceId) {
587 ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(queryContext.getSql(), queryContext.getParameters()));
588 return new ExecutionContext(queryContext, Collections.singletonList(executionUnit), new RouteContext());
589 }
590
591 private QueryContext createQueryContext() {
592 List<Object> params = new ArrayList<>(getParameters());
593 if (sqlStatementContext instanceof ParameterAware) {
594 ((ParameterAware) sqlStatementContext).setUpParameters(params);
595 }
596 return new QueryContext(sqlStatementContext, sql, params, hintValueContext, true);
597 }
598
599 private MergedResult mergeQuery(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
600 MergeEngine mergeEngine = new MergeEngine(metaDataContexts.getMetaData().getGlobalRuleMetaData(), metaDataContexts.getMetaData().getDatabase(databaseName),
601 metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
602 return mergeEngine.merge(queryResults, sqlStatementContext);
603 }
604
605 private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException {
606 for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
607 each.getInputs().forEach(eachInput -> {
608 statements.add((PreparedStatement) eachInput.getStorageResource());
609 parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters());
610 });
611 }
612 replay();
613 }
614
615 private void replay() throws SQLException {
616 replaySetParameter();
617 for (Statement each : statements) {
618 getMethodInvocationRecorder().replay(each);
619 }
620 }
621
622 private void replaySetParameter() throws SQLException {
623 for (int i = 0; i < statements.size(); i++) {
624 replaySetParameter(statements.get(i), parameterSets.get(i));
625 }
626 }
627
628 private void clearPrevious() {
629 statements.clear();
630 parameterSets.clear();
631 generatedValues.clear();
632 }
633
634 private Optional<GeneratedKeyContext> findGeneratedKey(final ExecutionContext executionContext) {
635 return executionContext.getSqlStatementContext() instanceof InsertStatementContext
636 ? ((InsertStatementContext) executionContext.getSqlStatementContext()).getGeneratedKeyContext()
637 : Optional.empty();
638 }
639
640 @Override
641 public ResultSet getGeneratedKeys() throws SQLException {
642 if (null != currentBatchGeneratedKeysResultSet) {
643 return currentBatchGeneratedKeysResultSet;
644 }
645 Optional<GeneratedKeyContext> generatedKey = findGeneratedKey(executionContext);
646 if (generatedKey.isPresent() && statementOption.isReturnGeneratedKeys() && !generatedValues.isEmpty()) {
647 return new GeneratedKeysResultSet(getGeneratedKeysColumnName(generatedKey.get().getColumnName()), generatedValues.iterator(), this);
648 }
649 for (PreparedStatement each : statements) {
650 ResultSet resultSet = each.getGeneratedKeys();
651 while (resultSet.next()) {
652 generatedValues.add((Comparable<?>) resultSet.getObject(1));
653 }
654 }
655 String columnName = generatedKey.map(GeneratedKeyContext::getColumnName).orElse(null);
656 return new GeneratedKeysResultSet(getGeneratedKeysColumnName(columnName), generatedValues.iterator(), this);
657 }
658
659 private String getGeneratedKeysColumnName(final String columnName) {
660 return metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType() instanceof MySQLDatabaseType ? "GENERATED_KEY" : columnName;
661 }
662
663 @Override
664 public void addBatch() {
665 try {
666 QueryContext queryContext = createQueryContext();
667 trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
668 executionContext = null == trafficInstanceId ? createExecutionContext(queryContext) : createExecutionContext(queryContext, trafficInstanceId);
669 batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
670 } finally {
671 currentResultSet = null;
672 clearParameters();
673 }
674 }
675
676 @Override
677 public int[] executeBatch() throws SQLException {
678 if (null == executionContext) {
679 return new int[0];
680 }
681 try {
682
683 return doExecuteBatch(batchPreparedStatementExecutor);
684
685 } catch (final RuntimeException ex) {
686
687 handleExceptionInTransaction(connection, metaDataContexts);
688 throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
689 } finally {
690 clearBatch();
691 }
692 }
693
694 private int[] doExecuteBatch(final BatchPreparedStatementExecutor batchExecutor) throws SQLException {
695 initBatchPreparedStatementExecutor(batchExecutor);
696 int[] result = batchExecutor.executeBatch(executionContext.getSqlStatementContext());
697 if (statementOption.isReturnGeneratedKeys() && generatedValues.isEmpty()) {
698 List<Statement> batchPreparedStatementExecutorStatements = batchExecutor.getStatements();
699 for (Statement statement : batchPreparedStatementExecutorStatements) {
700 statements.add((PreparedStatement) statement);
701 }
702 currentBatchGeneratedKeysResultSet = getGeneratedKeys();
703 statements.clear();
704 }
705 return result;
706 }
707
708 private void initBatchPreparedStatementExecutor(final BatchPreparedStatementExecutor batchExecutor) throws SQLException {
709 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, metaDataContexts.getMetaData().getProps()
710 .<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), connection.getDatabaseConnectionManager(), statementManager, statementOption,
711 metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
712 metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits());
713 List<ExecutionUnit> executionUnits = new ArrayList<>(batchExecutor.getBatchExecutionUnits().size());
714 for (BatchExecutionUnit each : batchExecutor.getBatchExecutionUnits()) {
715 ExecutionUnit executionUnit = each.getExecutionUnit();
716 executionUnits.add(executionUnit);
717 }
718 batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(), executionUnits, new ExecutionGroupReportContext(connection.getProcessId(), databaseName, new Grantee("", ""))));
719 setBatchParametersForStatements(batchExecutor);
720 }
721
722 private void setBatchParametersForStatements(final BatchPreparedStatementExecutor batchExecutor) throws SQLException {
723 for (Statement each : batchExecutor.getStatements()) {
724 List<List<Object>> paramSet = batchExecutor.getParameterSet(each);
725 for (List<Object> eachParams : paramSet) {
726 replaySetParameter((PreparedStatement) each, eachParams);
727 ((PreparedStatement) each).addBatch();
728 }
729 }
730 }
731
732 @Override
733 public void clearBatch() {
734 currentResultSet = null;
735 batchPreparedStatementExecutor.clear();
736 clearParameters();
737 }
738
739 @SuppressWarnings("MagicConstant")
740 @Override
741 public int getResultSetType() {
742 return statementOption.getResultSetType();
743 }
744
745 @SuppressWarnings("MagicConstant")
746 @Override
747 public int getResultSetConcurrency() {
748 return statementOption.getResultSetConcurrency();
749 }
750
751 @Override
752 public int getResultSetHoldability() {
753 return statementOption.getResultSetHoldability();
754 }
755
756 @Override
757 public boolean isAccumulate() {
758 for (DataNodeRuleAttribute each : metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(DataNodeRuleAttribute.class)) {
759 if (each.isNeedAccumulate(executionContext.getSqlStatementContext().getTablesContext().getTableNames())) {
760 return true;
761 }
762 }
763 return false;
764 }
765
766 @Override
767 public Collection<PreparedStatement> getRoutedStatements() {
768 return statements;
769 }
770 }