1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.driver.jdbc.core.statement;
19
20 import lombok.AccessLevel;
21 import lombok.Getter;
22 import org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
23 import org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteCallback;
24 import org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteUpdateCallback;
25 import org.apache.shardingsphere.driver.executor.engine.batch.statement.BatchStatementExecutor;
26 import org.apache.shardingsphere.driver.executor.engine.facade.DriverExecutorFacade;
27 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractStatementAdapter;
28 import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
29 import org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
30 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
31 import org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
32 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
33 import org.apache.shardingsphere.infra.binder.context.statement.type.dml.InsertStatementContext;
34 import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
35 import org.apache.shardingsphere.infra.database.core.keygen.GeneratedKeyColumnProvider;
36 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
37 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
38 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
39 import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
40 import org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
41 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
42 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
43 import org.apache.shardingsphere.infra.hint.HintValueContext;
44 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
45 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
46 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
47 import org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
48 import org.apache.shardingsphere.infra.session.query.QueryContext;
49 import org.apache.shardingsphere.parser.rule.SQLParserRule;
50 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
51
52 import java.sql.ResultSet;
53 import java.sql.SQLException;
54 import java.sql.Statement;
55 import java.util.Collection;
56 import java.util.Collections;
57 import java.util.LinkedList;
58 import java.util.List;
59 import java.util.Optional;
60
61
62
63
64 @HighFrequencyInvocation
65 public final class ShardingSphereStatement extends AbstractStatementAdapter {
66
67 @Getter
68 private final ShardingSphereConnection connection;
69
70 private final ShardingSphereMetaData metaData;
71
72 private final StatementOption statementOption;
73
74 @Getter(AccessLevel.PROTECTED)
75 private final StatementManager statementManager;
76
77 private final DriverExecutorFacade driverExecutorFacade;
78
79 private final BatchStatementExecutor batchStatementExecutor;
80
81 private final List<Statement> statements;
82
83 private String usedDatabaseName;
84
85 private SQLStatementContext sqlStatementContext;
86
87 private boolean returnGeneratedKeys;
88
89 private ResultSet currentResultSet;
90
91 public ShardingSphereStatement(final ShardingSphereConnection connection) {
92 this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
93 }
94
95 public ShardingSphereStatement(final ShardingSphereConnection connection, final int resultSetType, final int resultSetConcurrency) {
96 this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
97 }
98
99 public ShardingSphereStatement(final ShardingSphereConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
100 this.connection = connection;
101 metaData = connection.getContextManager().getMetaDataContexts().getMetaData();
102 statementOption = new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
103 statementManager = new StatementManager();
104 connection.getStatementManagers().add(statementManager);
105 driverExecutorFacade = new DriverExecutorFacade(connection, statementOption, statementManager, JDBCDriverType.STATEMENT);
106 batchStatementExecutor = new BatchStatementExecutor(this);
107 statements = new LinkedList<>();
108 usedDatabaseName = connection.getCurrentDatabaseName();
109 }
110
111 @Override
112 public ResultSet executeQuery(final String sql) throws SQLException {
113 QueryContext queryContext = createQueryContext(sql);
114 try {
115 prepareExecute(queryContext);
116 ShardingSphereDatabase usedDatabase = metaData.getDatabase(usedDatabaseName);
117 currentResultSet = driverExecutorFacade.executeQuery(usedDatabase, metaData, queryContext, this, null,
118 (StatementAddCallback<Statement>) (statements, parameterSets) -> this.statements.addAll(statements), this::replay);
119 return currentResultSet;
120
121 } catch (final RuntimeException | SQLException ex) {
122
123 handleExceptionInTransaction(connection, metaData);
124 currentResultSet = null;
125 throw SQLExceptionTransformEngine.toSQLException(ex, metaData.getDatabase(usedDatabaseName).getProtocolType());
126 }
127 }
128
129 @Override
130 public int executeUpdate(final String sql) throws SQLException {
131 try {
132 return executeUpdate(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL));
133
134 } catch (final RuntimeException | SQLException ex) {
135
136 handleExceptionInTransaction(connection, metaData);
137 throw SQLExceptionTransformEngine.toSQLException(ex, metaData.getDatabase(usedDatabaseName).getProtocolType());
138 }
139 }
140
141 @Override
142 public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
143 if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
144 currentResultSet = null;
145 returnGeneratedKeys = true;
146 }
147 try {
148 return executeUpdate(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL, autoGeneratedKeys));
149
150 } catch (final RuntimeException | SQLException ex) {
151
152 handleExceptionInTransaction(connection, metaData);
153 throw SQLExceptionTransformEngine.toSQLException(ex, metaData.getDatabase(usedDatabaseName).getProtocolType());
154 }
155 }
156
157 @Override
158 public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
159 returnGeneratedKeys = true;
160 try {
161 return executeUpdate(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnIndexes));
162
163 } catch (final RuntimeException | SQLException ex) {
164
165 handleExceptionInTransaction(connection, metaData);
166 throw SQLExceptionTransformEngine.toSQLException(ex, metaData.getDatabase(usedDatabaseName).getProtocolType());
167 }
168 }
169
170 @Override
171 public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
172 returnGeneratedKeys = true;
173 try {
174 return executeUpdate(sql, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnNames));
175
176 } catch (final RuntimeException | SQLException ex) {
177
178 handleExceptionInTransaction(connection, metaData);
179 throw SQLExceptionTransformEngine.toSQLException(ex, metaData.getDatabase(usedDatabaseName).getProtocolType());
180 }
181 }
182
183 private int executeUpdate(final String sql, final StatementExecuteUpdateCallback updateCallback) throws SQLException {
184 currentResultSet = null;
185 QueryContext queryContext = createQueryContext(sql);
186 prepareExecute(queryContext);
187 ShardingSphereDatabase usedDatabase = metaData.getDatabase(usedDatabaseName);
188 return driverExecutorFacade.executeUpdate(usedDatabase, metaData, queryContext,
189 updateCallback, (StatementAddCallback<Statement>) (statements, parameterSets) -> this.statements.addAll(statements), this::replay);
190 }
191
192 @Override
193 public boolean execute(final String sql) throws SQLException {
194 try {
195 return execute(sql, (actualSQL, statement) -> statement.execute(actualSQL));
196
197 } catch (final SQLException ex) {
198
199 handleExceptionInTransaction(connection, metaData);
200 throw SQLExceptionTransformEngine.toSQLException(ex, metaData.getDatabase(usedDatabaseName).getProtocolType());
201 }
202 }
203
204 @Override
205 public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
206 try {
207 if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
208 returnGeneratedKeys = true;
209 }
210 return execute(sql, (actualSQL, statement) -> statement.execute(actualSQL, autoGeneratedKeys));
211
212 } catch (final SQLException ex) {
213
214 handleExceptionInTransaction(connection, metaData);
215 throw SQLExceptionTransformEngine.toSQLException(ex, metaData.getDatabase(usedDatabaseName).getProtocolType());
216 }
217 }
218
219 @Override
220 public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
221 try {
222 returnGeneratedKeys = true;
223 return execute(sql, (actualSQL, statement) -> statement.execute(actualSQL, columnIndexes));
224
225 } catch (final SQLException ex) {
226
227 handleExceptionInTransaction(connection, metaData);
228 throw SQLExceptionTransformEngine.toSQLException(ex, metaData.getDatabase(usedDatabaseName).getProtocolType());
229 }
230 }
231
232 @Override
233 public boolean execute(final String sql, final String[] columnNames) throws SQLException {
234 try {
235 returnGeneratedKeys = true;
236 return execute(sql, (actualSQL, statement) -> statement.execute(actualSQL, columnNames));
237
238 } catch (final SQLException ex) {
239
240 handleExceptionInTransaction(connection, metaData);
241 throw SQLExceptionTransformEngine.toSQLException(ex, metaData.getDatabase(usedDatabaseName).getProtocolType());
242 }
243 }
244
245 private boolean execute(final String sql, final StatementExecuteCallback statementExecuteCallback) throws SQLException {
246 currentResultSet = null;
247 QueryContext queryContext = createQueryContext(sql);
248 prepareExecute(queryContext);
249 ShardingSphereDatabase usedDatabase = metaData.getDatabase(usedDatabaseName);
250 return driverExecutorFacade.execute(usedDatabase, metaData, queryContext, statementExecuteCallback,
251 (StatementAddCallback<Statement>) (statements, parameterSets) -> this.statements.addAll(statements), this::replay);
252 }
253
254 private QueryContext createQueryContext(final String originSQL) throws SQLException {
255 ShardingSpherePreconditions.checkNotEmpty(originSQL, () -> new EmptySQLException().toSQLException());
256 HintValueContext hintValueContext = SQLHintUtils.extractHint(originSQL);
257 String sql = SQLHintUtils.removeHint(originSQL);
258 DatabaseType databaseType = metaData.getDatabase(usedDatabaseName).getProtocolType();
259 SQLStatement sqlStatement = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(databaseType).parse(sql, false);
260 SQLStatementContext sqlStatementContext = new SQLBindEngine(metaData, connection.getCurrentDatabaseName(), hintValueContext).bind(databaseType, sqlStatement, Collections.emptyList());
261 return new QueryContext(sqlStatementContext, sql, Collections.emptyList(), hintValueContext, connection.getDatabaseConnectionManager().getConnectionContext(), metaData);
262 }
263
264 private void prepareExecute(final QueryContext queryContext) throws SQLException {
265 handleAutoCommitBeforeExecution(queryContext.getSqlStatementContext().getSqlStatement(), connection);
266 sqlStatementContext = queryContext.getSqlStatementContext();
267 ShardingSpherePreconditions.checkNotNull(sqlStatementContext, () -> new IllegalStateException("Statement context can not be null"));
268 usedDatabaseName = sqlStatementContext.getTablesContext().getDatabaseName().orElse(connection.getCurrentDatabaseName());
269 connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabaseName(connection.getCurrentDatabaseName());
270 clearStatements();
271 }
272
273 private void clearStatements() throws SQLException {
274 for (Statement each : statements) {
275 each.close();
276 }
277 statements.clear();
278 }
279
280 private void replay() throws SQLException {
281 for (Statement each : statements) {
282 getMethodInvocationRecorder().replay(each);
283 }
284 }
285
286 @Override
287 public void addBatch(final String sql) throws SQLException {
288 batchStatementExecutor.addBatch(sql);
289 }
290
291 @Override
292 public void clearBatch() {
293 batchStatementExecutor.clear();
294 }
295
296 @Override
297 public int[] executeBatch() throws SQLException {
298 return batchStatementExecutor.executeBatch();
299 }
300
301 @Override
302 public ResultSet getResultSet() throws SQLException {
303 if (null != currentResultSet) {
304 return currentResultSet;
305 }
306 driverExecutorFacade.getResultSet(metaData.getDatabase(usedDatabaseName), sqlStatementContext, this, statements).ifPresent(optional -> currentResultSet = optional);
307 return currentResultSet;
308 }
309
310 @SuppressWarnings("MagicConstant")
311 @Override
312 public int getResultSetType() {
313 return statementOption.getResultSetType();
314 }
315
316 @SuppressWarnings("MagicConstant")
317 @Override
318 public int getResultSetConcurrency() {
319 return statementOption.getResultSetConcurrency();
320 }
321
322 @Override
323 public int getResultSetHoldability() {
324 return statementOption.getResultSetHoldability();
325 }
326
327 @Override
328 public boolean isAccumulate() {
329 for (DataNodeRuleAttribute each : metaData.getDatabase(usedDatabaseName).getRuleMetaData().getAttributes(DataNodeRuleAttribute.class)) {
330 if (each.isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames())) {
331 return true;
332 }
333 }
334 return false;
335 }
336
337 @Override
338 public Collection<Statement> getRoutedStatements() {
339 return statements;
340 }
341
342 @Override
343 public ResultSet getGeneratedKeys() throws SQLException {
344 Optional<GeneratedKeyContext> generatedKey = findGeneratedKey();
345 if (returnGeneratedKeys && generatedKey.isPresent() && !generatedKey.get().getGeneratedValues().isEmpty()) {
346 return new GeneratedKeysResultSet(getGeneratedKeysColumnName(generatedKey.get().getColumnName()), generatedKey.get().getGeneratedValues().iterator(), this);
347 }
348 Collection<Comparable<?>> generatedValues = new LinkedList<>();
349 for (Statement each : statements) {
350 ResultSet resultSet = each.getGeneratedKeys();
351 while (resultSet.next()) {
352 generatedValues.add((Comparable<?>) resultSet.getObject(1));
353 }
354 }
355 String columnName = generatedKey.map(GeneratedKeyContext::getColumnName).orElse(null);
356 return new GeneratedKeysResultSet(getGeneratedKeysColumnName(columnName), generatedValues.iterator(), this);
357 }
358
359 private Optional<GeneratedKeyContext> findGeneratedKey() {
360 return sqlStatementContext instanceof InsertStatementContext ? ((InsertStatementContext) sqlStatementContext).getGeneratedKeyContext() : Optional.empty();
361 }
362
363 private String getGeneratedKeysColumnName(final String columnName) {
364 return DatabaseTypedSPILoader.findService(GeneratedKeyColumnProvider.class, metaData.getDatabase(usedDatabaseName).getProtocolType())
365 .map(GeneratedKeyColumnProvider::getColumnName).orElse(columnName);
366 }
367
368 @Override
369 protected void closeExecutor() throws SQLException {
370 driverExecutorFacade.close();
371 }
372 }