1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.backend.connector;
19
20 import com.google.common.base.Preconditions;
21 import org.apache.shardingsphere.infra.binder.context.aware.CursorDefinitionAware;
22 import org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
23 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
24 import org.apache.shardingsphere.infra.binder.context.statement.ddl.CloseStatementContext;
25 import org.apache.shardingsphere.infra.binder.context.statement.ddl.CursorStatementContext;
26 import org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext;
27 import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
28 import org.apache.shardingsphere.infra.binder.context.type.CursorAvailable;
29 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
30 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
31 import org.apache.shardingsphere.infra.connection.refresher.MetaDataRefreshEngine;
32 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
33 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
34 import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
35 import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.EmptyStorageUnitException;
36 import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
37 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
38 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
39 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
40 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
41 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.metadata.JDBCQueryResultMetaData;
42 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
43 import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
44 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
45 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
46 import org.apache.shardingsphere.infra.merge.MergeEngine;
47 import org.apache.shardingsphere.infra.merge.result.MergedResult;
48 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
49 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
50 import org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtils;
51 import org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
52 import org.apache.shardingsphere.infra.session.query.QueryContext;
53 import org.apache.shardingsphere.mode.manager.ContextManager;
54 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
55 import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallback;
56 import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
57 import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
58 import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
59 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
60 import org.apache.shardingsphere.proxy.backend.handler.data.DatabaseBackendHandler;
61 import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
62 import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
63 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
64 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader;
65 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
66 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
67 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
68 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
69 import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
70 import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
71 import org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
72 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
73 import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
74 import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
75 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
76 import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
77 import org.apache.shardingsphere.transaction.api.TransactionType;
78 import org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
79
80 import java.sql.Connection;
81 import java.sql.ResultSet;
82 import java.sql.SQLException;
83 import java.sql.Statement;
84 import java.util.ArrayList;
85 import java.util.Collection;
86 import java.util.Collections;
87 import java.util.LinkedList;
88 import java.util.List;
89 import java.util.Optional;
90 import java.util.concurrent.ConcurrentHashMap;
91
92
93
94
95 public final class DatabaseConnector implements DatabaseBackendHandler {
96
97 private final ProxySQLExecutor proxySQLExecutor;
98
99 private final Collection<Statement> cachedStatements = Collections.newSetFromMap(new ConcurrentHashMap<>());
100
101 private final Collection<ResultSet> cachedResultSets = Collections.newSetFromMap(new ConcurrentHashMap<>());
102
103 private final String driverType;
104
105 private final ShardingSphereDatabase database;
106
107 private final boolean selectContainsEnhancedTable;
108
109 private final QueryContext queryContext;
110
111 private final ProxyDatabaseConnectionManager databaseConnectionManager;
112
113 private List<QueryHeader> queryHeaders;
114
115 private MergedResult mergedResult;
116
117 public DatabaseConnector(final String driverType, final ShardingSphereDatabase database, final QueryContext queryContext, final ProxyDatabaseConnectionManager databaseConnectionManager) {
118 SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
119 failedIfBackendNotReady(databaseConnectionManager.getConnectionSession(), sqlStatementContext);
120 this.driverType = driverType;
121 this.database = database;
122 this.queryContext = queryContext;
123 this.selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable();
124 this.databaseConnectionManager = databaseConnectionManager;
125 if (sqlStatementContext instanceof CursorAvailable) {
126 prepareCursorStatementContext((CursorAvailable) sqlStatementContext, databaseConnectionManager.getConnectionSession());
127 }
128 proxySQLExecutor = new ProxySQLExecutor(driverType, databaseConnectionManager, this, queryContext);
129 }
130
131 private void failedIfBackendNotReady(final ConnectionSession connectionSession, final SQLStatementContext sqlStatementContext) {
132 ShardingSphereDatabase database = ProxyContext.getInstance().getContextManager().getDatabase(connectionSession.getDatabaseName());
133 boolean isSystemSchema = SystemSchemaUtils.containsSystemSchema(sqlStatementContext.getDatabaseType(), sqlStatementContext.getTablesContext().getSchemaNames(), database);
134 ShardingSpherePreconditions.checkState(isSystemSchema || database.containsDataSource(), () -> new EmptyStorageUnitException(connectionSession.getDatabaseName()));
135 if (!isSystemSchema && !database.isComplete()) {
136 throw new EmptyRuleException(connectionSession.getDatabaseName());
137 }
138 }
139
140
141
142
143
144
145 public void add(final Statement statement) {
146 cachedStatements.add(statement);
147 }
148
149
150
151
152
153
154 public void add(final ResultSet resultSet) {
155 cachedResultSets.add(resultSet);
156 }
157
158
159
160
161
162
163
164 @Override
165 public ResponseHeader execute() throws SQLException {
166 MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
167 if (proxySQLExecutor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
168 ResultSet resultSet = doExecuteFederation(queryContext, metaDataContexts);
169 return processExecuteFederation(resultSet, metaDataContexts);
170 }
171 ExecutionContext executionContext = generateExecutionContext();
172 return isNeedImplicitCommitTransaction(executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getExecutionUnits().size() > 1)
173 ? doExecuteWithImplicitCommitTransaction(() -> doExecute(executionContext))
174 : doExecute(executionContext);
175 }
176
177 private ExecutionContext generateExecutionContext() {
178 ShardingSphereMetaData metaData = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData();
179 return new KernelProcessor().generateExecutionContext(queryContext, database, metaData.getGlobalRuleMetaData(), metaData.getProps(),
180 databaseConnectionManager.getConnectionSession().getConnectionContext());
181 }
182
183 private boolean isNeedImplicitCommitTransaction(final SQLStatement sqlStatement, final boolean multiExecutionUnits) {
184 if (!databaseConnectionManager.getConnectionSession().isAutoCommit()) {
185 return false;
186 }
187 TransactionType transactionType = TransactionUtils.getTransactionType(databaseConnectionManager.getConnectionSession().getConnectionContext().getTransactionContext());
188 TransactionStatus transactionStatus = databaseConnectionManager.getConnectionSession().getTransactionStatus();
189 if (!TransactionType.isDistributedTransaction(transactionType) || transactionStatus.isInTransaction()) {
190 return false;
191 }
192 return isWriteDMLStatement(sqlStatement) && multiExecutionUnits;
193 }
194
195 private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
196 return sqlStatement instanceof DMLStatement && !(sqlStatement instanceof SelectStatement);
197 }
198
199 private <T> T doExecuteWithImplicitCommitTransaction(final ImplicitTransactionCallback<T> callback) throws SQLException {
200 T result;
201 BackendTransactionManager transactionManager = new BackendTransactionManager(databaseConnectionManager);
202 try {
203 transactionManager.begin();
204 result = callback.execute();
205 transactionManager.commit();
206
207 } catch (final Exception ex) {
208
209 transactionManager.rollback();
210 String databaseName = databaseConnectionManager.getConnectionSession().getDatabaseName();
211 throw SQLExceptionTransformEngine.toSQLException(ex, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getProtocolType());
212 }
213 return result;
214 }
215
216 @SuppressWarnings({"unchecked", "rawtypes"})
217 private ResponseHeader doExecute(final ExecutionContext executionContext) throws SQLException {
218 if (executionContext.getExecutionUnits().isEmpty()) {
219 return new UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement());
220 }
221 proxySQLExecutor.checkExecutePrerequisites(executionContext);
222 List result = proxySQLExecutor.execute(executionContext);
223 refreshMetaData(executionContext);
224 Object executeResultSample = result.iterator().next();
225 return executeResultSample instanceof QueryResult ? processExecuteQuery(queryContext.getSqlStatementContext(), result, (QueryResult) executeResultSample)
226 : processExecuteUpdate(executionContext, result);
227 }
228
229 private ResultSet doExecuteFederation(final QueryContext queryContext, final MetaDataContexts metaDataContexts) {
230 boolean isReturnGeneratedKeys = queryContext.getSqlStatementContext().getSqlStatement() instanceof MySQLInsertStatement;
231 ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getDatabaseName());
232 DatabaseType protocolType = database.getProtocolType();
233 ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(driverType, protocolType, database.getResourceMetaData(),
234 queryContext.getSqlStatementContext().getSqlStatement(), this, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
235 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaDataContexts);
236 SQLFederationContext context = new SQLFederationContext(false, queryContext, metaDataContexts.getMetaData(), databaseConnectionManager.getConnectionSession().getProcessId());
237 return proxySQLExecutor.getSqlFederationEngine().executeQuery(prepareEngine, callback, context);
238 }
239
240 private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final MetaDataContexts metaData) {
241 int maxConnectionsSizePerQuery = metaData.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
242 JDBCBackendStatement statementManager = (JDBCBackendStatement) databaseConnectionManager.getConnectionSession().getStatementManager();
243 return new DriverExecutionPrepareEngine<>(driverType, maxConnectionsSizePerQuery, databaseConnectionManager, statementManager,
244 new StatementOption(isReturnGeneratedKeys), metaData.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getDatabaseName()).getRuleMetaData().getRules(),
245 metaData.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getDatabaseName()).getResourceMetaData().getStorageUnits());
246 }
247
248 private ResponseHeader processExecuteFederation(final ResultSet resultSet, final MetaDataContexts metaDataContexts) throws SQLException {
249 int columnCount = resultSet.getMetaData().getColumnCount();
250 queryHeaders = new ArrayList<>(columnCount);
251 ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getDatabaseName());
252 QueryHeaderBuilderEngine queryHeaderBuilderEngine = new QueryHeaderBuilderEngine(null == database ? null : database.getProtocolType());
253 for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
254 queryHeaders.add(queryHeaderBuilderEngine.build(new JDBCQueryResultMetaData(resultSet.getMetaData()), database, columnIndex));
255 }
256 mergedResult = new IteratorStreamMergedResult(Collections.singletonList(new JDBCStreamQueryResult(resultSet)));
257 return new QueryResponseHeader(queryHeaders);
258 }
259
260 private void prepareCursorStatementContext(final CursorAvailable statementContext, final ConnectionSession connectionSession) {
261 if (statementContext.getCursorName().isPresent()) {
262 String cursorName = statementContext.getCursorName().get().getIdentifier().getValue().toLowerCase();
263 prepareCursorStatementContext(statementContext, connectionSession, cursorName);
264 }
265 if (statementContext instanceof CloseStatementContext && ((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
266 connectionSession.getConnectionContext().clearCursorContext();
267 }
268 }
269
270 private void prepareCursorStatementContext(final CursorAvailable statementContext, final ConnectionSession connectionSession, final String cursorName) {
271 if (statementContext instanceof CursorStatementContext) {
272 connectionSession.getConnectionContext().getCursorContext().getCursorDefinitions().put(cursorName, (CursorStatementContext) statementContext);
273 }
274 if (statementContext instanceof CursorDefinitionAware) {
275 CursorStatementContext cursorStatementContext = (CursorStatementContext) connectionSession.getConnectionContext().getCursorContext().getCursorDefinitions().get(cursorName);
276 Preconditions.checkArgument(null != cursorStatementContext, "Cursor %s does not exist.", cursorName);
277 ((CursorDefinitionAware) statementContext).setUpCursorDefinition(cursorStatementContext);
278 }
279 if (statementContext instanceof CloseStatementContext) {
280 connectionSession.getConnectionContext().getCursorContext().removeCursor(cursorName);
281 }
282 }
283
284 private void refreshMetaData(final ExecutionContext executionContext) throws SQLException {
285 ContextManager contextManager = ProxyContext.getInstance().getContextManager();
286 new MetaDataRefreshEngine(contextManager.getInstanceContext().getModeContextManager(), database,
287 contextManager.getMetaDataContexts().getMetaData().getProps()).refresh(executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
288 }
289
290 private QueryResponseHeader processExecuteQuery(final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults, final QueryResult queryResultSample) throws SQLException {
291 queryHeaders = createQueryHeaders(sqlStatementContext, queryResultSample);
292 mergedResult = mergeQuery(sqlStatementContext, queryResults);
293 return new QueryResponseHeader(queryHeaders);
294 }
295
296 private List<QueryHeader> createQueryHeaders(final SQLStatementContext sqlStatementContext, final QueryResult queryResultSample) throws SQLException {
297 int columnCount = getColumnCount(sqlStatementContext, queryResultSample);
298 List<QueryHeader> result = new ArrayList<>(columnCount);
299 QueryHeaderBuilderEngine queryHeaderBuilderEngine = new QueryHeaderBuilderEngine(database.getProtocolType());
300 for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
301 result.add(createQueryHeader(queryHeaderBuilderEngine, sqlStatementContext, queryResultSample, database, columnIndex));
302 }
303 return result;
304 }
305
306 private int getColumnCount(final SQLStatementContext sqlStatementContext, final QueryResult queryResultSample) throws SQLException {
307 return selectContainsEnhancedTable && hasSelectExpandProjections(sqlStatementContext)
308 ? ((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().size()
309 : queryResultSample.getMetaData().getColumnCount();
310 }
311
312 private boolean hasSelectExpandProjections(final SQLStatementContext sqlStatementContext) {
313 return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
314 }
315
316 private QueryHeader createQueryHeader(final QueryHeaderBuilderEngine queryHeaderBuilderEngine, final SQLStatementContext sqlStatementContext,
317 final QueryResult queryResultSample, final ShardingSphereDatabase database, final int columnIndex) throws SQLException {
318 return selectContainsEnhancedTable && hasSelectExpandProjections(sqlStatementContext)
319 ? queryHeaderBuilderEngine.build(((SelectStatementContext) sqlStatementContext).getProjectionsContext(), queryResultSample.getMetaData(), database, columnIndex)
320 : queryHeaderBuilderEngine.build(queryResultSample.getMetaData(), database, columnIndex);
321 }
322
323 private MergedResult mergeQuery(final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
324 ShardingSphereMetaData metaData = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData();
325 MergeEngine mergeEngine = new MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(), databaseConnectionManager.getConnectionSession().getConnectionContext());
326 return mergeEngine.merge(queryResults, sqlStatementContext);
327 }
328
329 private UpdateResponseHeader processExecuteUpdate(final ExecutionContext executionContext, final Collection<UpdateResult> updateResults) {
330 Optional<GeneratedKeyContext> generatedKeyContext = executionContext.getSqlStatementContext() instanceof InsertStatementContext
331 ? ((InsertStatementContext) executionContext.getSqlStatementContext()).getGeneratedKeyContext()
332 : Optional.empty();
333 Collection<Comparable<?>> autoIncrementGeneratedValues =
334 generatedKeyContext.filter(GeneratedKeyContext::isSupportAutoIncrement).map(GeneratedKeyContext::getGeneratedValues).orElseGet(Collections::emptyList);
335 UpdateResponseHeader result = new UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement(), updateResults, autoIncrementGeneratedValues);
336 mergeUpdateCount(executionContext.getSqlStatementContext(), result);
337 return result;
338 }
339
340 private void mergeUpdateCount(final SQLStatementContext sqlStatementContext, final UpdateResponseHeader response) {
341 if (isNeedAccumulate(sqlStatementContext)) {
342 response.mergeUpdateCount();
343 }
344 }
345
346 private boolean isNeedAccumulate(final SQLStatementContext sqlStatementContext) {
347 Collection<DataNodeRuleAttribute> ruleAttributes = database.getRuleMetaData().getAttributes(DataNodeRuleAttribute.class);
348 return !ruleAttributes.isEmpty() && ruleAttributes.iterator().next().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
349 }
350
351
352
353
354
355
356
357 @Override
358 public boolean next() throws SQLException {
359 return null != mergedResult && mergedResult.next();
360 }
361
362
363
364
365
366
367
368 @Override
369 public QueryResponseRow getRowData() throws SQLException {
370 List<QueryResponseCell> cells = new ArrayList<>(queryHeaders.size());
371 for (int columnIndex = 1; columnIndex <= queryHeaders.size(); columnIndex++) {
372 Object data = mergedResult.getValue(columnIndex, Object.class);
373 cells.add(new QueryResponseCell(queryHeaders.get(columnIndex - 1).getColumnType(), data, queryHeaders.get(columnIndex - 1).getColumnTypeName()));
374 }
375 return new QueryResponseRow(cells);
376 }
377
378 @Override
379 public void close() throws SQLException {
380 Collection<SQLException> result = new LinkedList<>();
381 result.addAll(closeResultSets());
382 result.addAll(closeStatements());
383 closeSQLFederationEngine().ifPresent(result::add);
384 if (result.isEmpty()) {
385 return;
386 }
387 SQLException ex = new SQLException();
388 result.forEach(ex::setNextException);
389 throw ex;
390 }
391
392 private Collection<SQLException> closeResultSets() {
393 Collection<SQLException> result = new LinkedList<>();
394 for (ResultSet each : cachedResultSets) {
395 try {
396 each.close();
397 } catch (final SQLException ex) {
398 result.add(ex);
399 }
400 }
401 cachedResultSets.clear();
402 return result;
403 }
404
405 private Collection<SQLException> closeStatements() {
406 Collection<SQLException> result = new LinkedList<>();
407 for (Statement each : cachedStatements) {
408 try {
409 each.cancel();
410 each.close();
411 } catch (final SQLException ex) {
412 result.add(ex);
413 }
414 }
415 cachedStatements.clear();
416 return result;
417 }
418
419 private Optional<SQLException> closeSQLFederationEngine() {
420 if (null != proxySQLExecutor.getSqlFederationEngine()) {
421 try {
422 proxySQLExecutor.getSqlFederationEngine().close();
423 } catch (final SQLException ex) {
424 return Optional.of(ex);
425 }
426 }
427 return Optional.empty();
428 }
429 }