View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.backend.handler.distsql.rul;
19  
20  import com.google.common.base.Preconditions;
21  import lombok.Setter;
22  import org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorConnectionContextAware;
23  import org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
24  import org.apache.shardingsphere.distsql.handler.engine.DistSQLConnectionContext;
25  import org.apache.shardingsphere.distsql.handler.engine.query.DistSQLQueryExecutor;
26  import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
27  import org.apache.shardingsphere.distsql.statement.rul.sql.PreviewStatement;
28  import org.apache.shardingsphere.infra.binder.context.aware.CursorDefinitionAware;
29  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
30  import org.apache.shardingsphere.infra.binder.context.statement.ddl.CursorStatementContext;
31  import org.apache.shardingsphere.infra.binder.context.type.CursorAvailable;
32  import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
33  import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
34  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
35  import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
36  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
37  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
38  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
39  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
40  import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
41  import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
42  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
43  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
44  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
45  import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
46  import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
47  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
48  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
49  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
50  import org.apache.shardingsphere.infra.hint.HintValueContext;
51  import org.apache.shardingsphere.infra.hint.SQLHintUtils;
52  import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
53  import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
54  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
55  import org.apache.shardingsphere.infra.session.query.QueryContext;
56  import org.apache.shardingsphere.mode.manager.ContextManager;
57  import org.apache.shardingsphere.parser.rule.SQLParserRule;
58  import org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
59  import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
60  import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
61  import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
62  import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
63  import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
64  
65  import java.sql.Connection;
66  import java.sql.SQLException;
67  import java.sql.Statement;
68  import java.util.Arrays;
69  import java.util.Collection;
70  import java.util.Collections;
71  import java.util.Optional;
72  import java.util.stream.Collectors;
73  
74  /**
75   * Preview executor.
76   */
77  @Setter
78  public final class PreviewExecutor implements DistSQLQueryExecutor<PreviewStatement>, DistSQLExecutorDatabaseAware, DistSQLExecutorConnectionContextAware {
79      
80      private ShardingSphereDatabase database;
81      
82      private DistSQLConnectionContext connectionContext;
83      
84      @Override
85      public Collection<String> getColumnNames(final PreviewStatement sqlStatement) {
86          return Arrays.asList("data_source_name", "actual_sql");
87      }
88      
89      @Override
90      public Collection<LocalDataQueryResultRow> getRows(final PreviewStatement sqlStatement, final ContextManager contextManager) throws SQLException {
91          ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
92          String toBePreviewedSQL = SQLHintUtils.removeHint(sqlStatement.getSql());
93          HintValueContext hintValueContext = SQLHintUtils.extractHint(sqlStatement.getSql());
94          SQLStatement toBePreviewedStatement = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(database.getProtocolType()).parse(toBePreviewedSQL, false);
95          SQLStatementContext toBePreviewedStatementContext = new SQLBindEngine(metaData, database.getName(), hintValueContext).bind(toBePreviewedStatement, Collections.emptyList());
96          QueryContext queryContext = new QueryContext(toBePreviewedStatementContext, toBePreviewedSQL, Collections.emptyList(), hintValueContext);
97          if (toBePreviewedStatementContext instanceof CursorAvailable && toBePreviewedStatementContext instanceof CursorDefinitionAware) {
98              setUpCursorDefinition(toBePreviewedStatementContext);
99          }
100         ShardingSpherePreconditions.checkState(database.isComplete(), () -> new EmptyRuleException(database.getName()));
101         String schemaName = queryContext.getSqlStatementContext().getTablesContext().getSchemaName()
102                 .orElseGet(() -> new DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(database.getName()));
103         Collection<ExecutionUnit> executionUnits = getExecutionUnits(contextManager, schemaName, metaData, queryContext);
104         return executionUnits.stream().map(each -> new LocalDataQueryResultRow(each.getDataSourceName(), each.getSqlUnit().getSql())).collect(Collectors.toList());
105     }
106     
107     private Collection<ExecutionUnit> getExecutionUnits(final ContextManager contextManager, final String schemaName, final ShardingSphereMetaData metaData,
108                                                         final QueryContext queryContext) {
109         JDBCExecutor jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), connectionContext.getConnectionContext());
110         SQLFederationEngine federationEngine = new SQLFederationEngine(database.getName(), schemaName, metaData, contextManager.getMetaDataContexts().getStatistics(), jdbcExecutor);
111         if (federationEngine.decide(queryContext.getSqlStatementContext(), queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
112             return getFederationExecutionUnits(queryContext, metaData, federationEngine);
113         }
114         return new KernelProcessor().generateExecutionContext(queryContext, database, metaData.getGlobalRuleMetaData(), metaData.getProps(), connectionContext.getConnectionContext())
115                 .getExecutionUnits();
116     }
117     
118     private void setUpCursorDefinition(final SQLStatementContext toBePreviewedStatementContext) {
119         if (!((CursorAvailable) toBePreviewedStatementContext).getCursorName().isPresent()) {
120             return;
121         }
122         String cursorName = ((CursorAvailable) toBePreviewedStatementContext).getCursorName().get().getIdentifier().getValue().toLowerCase();
123         CursorStatementContext cursorStatementContext = (CursorStatementContext) connectionContext.getConnectionContext().getCursorContext().getCursorDefinitions().get(cursorName);
124         Preconditions.checkNotNull(cursorStatementContext, "Cursor %s does not exist.", cursorName);
125         ((CursorDefinitionAware) toBePreviewedStatementContext).setUpCursorDefinition(cursorStatementContext);
126     }
127     
128     private Collection<ExecutionUnit> getFederationExecutionUnits(final QueryContext queryContext, final ShardingSphereMetaData metaData, final SQLFederationEngine federationEngine) {
129         SQLStatement sqlStatement = queryContext.getSqlStatementContext().getSqlStatement();
130         // TODO move dialect MySQLInsertStatement into database type module @zhangliang
131         boolean isReturnGeneratedKeys = sqlStatement instanceof MySQLInsertStatement;
132         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaData.getProps());
133         SQLFederationContext context = new SQLFederationContext(true, queryContext, metaData,
134                 ((ProxyDatabaseConnectionManager) connectionContext.getDatabaseConnectionManager()).getConnectionSession().getProcessId());
135         federationEngine.executeQuery(prepareEngine, createPreviewCallback(sqlStatement), context);
136         return context.getPreviewExecutionUnits();
137     }
138     
139     private JDBCExecutorCallback<ExecuteResult> createPreviewCallback(final SQLStatement sqlStatement) {
140         return new JDBCExecutorCallback<ExecuteResult>(database.getProtocolType(), database.getResourceMetaData(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown()) {
141             
142             @Override
143             protected ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
144                 return new JDBCStreamQueryResult(statement.executeQuery(sql));
145             }
146             
147             @Override
148             protected Optional<ExecuteResult> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
149                 return Optional.empty();
150             }
151         };
152     }
153     
154     @SuppressWarnings({"unchecked", "rawtypes"})
155     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final ConfigurationProperties props) {
156         int maxConnectionsSizePerQuery = props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
157         return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connectionContext.getDatabaseConnectionManager(),
158                 connectionContext.getExecutorStatementManager(), new StatementOption(isReturnGeneratedKeys), database.getRuleMetaData().getRules(), database.getResourceMetaData().getStorageUnits());
159     }
160     
161     @Override
162     public Class<PreviewStatement> getType() {
163         return PreviewStatement.class;
164     }
165 }