View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.sqlfederation.engine;
19  
20  import com.google.common.base.Joiner;
21  import lombok.Getter;
22  import lombok.extern.slf4j.Slf4j;
23  import org.apache.calcite.plan.Convention;
24  import org.apache.calcite.plan.RelOptUtil;
25  import org.apache.calcite.schema.SchemaPlus;
26  import org.apache.calcite.sql.SqlExplainLevel;
27  import org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
28  import org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
29  import org.apache.shardingsphere.database.exception.core.exception.syntax.table.NoSuchTableException;
30  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
31  import org.apache.shardingsphere.infra.binder.context.statement.type.dal.ExplainStatementContext;
32  import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
33  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
34  import org.apache.shardingsphere.infra.datanode.DataNode;
35  import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
36  import org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException;
37  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
38  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
39  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
40  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
41  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
42  import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
43  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
44  import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
45  import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
46  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
47  import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
48  import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
49  import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
50  import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
51  import org.apache.shardingsphere.infra.session.query.QueryContext;
52  import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
53  import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
54  import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
55  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dal.ExplainStatement;
56  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.SelectStatement;
57  import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationCompilerEngine;
58  import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan;
59  import org.apache.shardingsphere.sqlfederation.compiler.compiler.SQLStatementCompiler;
60  import org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
61  import org.apache.shardingsphere.sqlfederation.compiler.exception.SQLFederationUnsupportedSQLException;
62  import org.apache.shardingsphere.sqlfederation.compiler.planner.cache.ExecutionPlanCacheKey;
63  import org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter;
64  import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
65  import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor;
66  import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessorFactory;
67  import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
68  import org.apache.shardingsphere.sqlfederation.spi.SQLFederationDecider;
69  
70  import java.sql.Connection;
71  import java.sql.ResultSet;
72  import java.sql.SQLException;
73  import java.sql.SQLIntegrityConstraintViolationException;
74  import java.util.Arrays;
75  import java.util.Collection;
76  import java.util.Collections;
77  import java.util.HashSet;
78  import java.util.LinkedList;
79  import java.util.List;
80  import java.util.Map;
81  import java.util.Map.Entry;
82  import java.util.Optional;
83  
84  /**
85   * SQL federation engine.
86   */
87  @Getter
88  @Slf4j
89  public final class SQLFederationEngine implements AutoCloseable {
90      
91      private static final Collection<Class<?>> NEED_THROW_EXCEPTION_TYPES = Arrays.asList(SQLExecutionInterruptedException.class, SQLIntegrityConstraintViolationException.class);
92      
93      private static final int MAX_ERROR_MESSAGE_LENGTH = 5000;
94      
95      private final ProcessEngine processEngine = new ProcessEngine();
96      
97      @SuppressWarnings("rawtypes")
98      private final Map<ShardingSphereRule, SQLFederationDecider> deciders;
99      
100     private final String currentDatabaseName;
101     
102     private final String currentSchemaName;
103     
104     private final SQLFederationRule sqlFederationRule;
105     
106     private final SQLFederationProcessor processor;
107     
108     private QueryContext queryContext;
109     
110     private SchemaPlus schemaPlus;
111     
112     private ResultSet resultSet;
113     
114     public SQLFederationEngine(final String currentDatabaseName, final String currentSchemaName, final ShardingSphereMetaData metaData,
115                                final ShardingSphereStatistics statistics, final JDBCExecutor jdbcExecutor) {
116         deciders = OrderedSPILoader.getServices(SQLFederationDecider.class, metaData.getDatabase(currentDatabaseName).getRuleMetaData().getRules());
117         this.currentDatabaseName = currentDatabaseName;
118         this.currentSchemaName = currentSchemaName;
119         sqlFederationRule = metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
120         processor = SQLFederationProcessorFactory.getInstance().newInstance(statistics, jdbcExecutor);
121     }
122     
123     /**
124      * Judge whether SQL federation enabled.
125      *
126      * @return SQL federation enabled or disabled
127      */
128     public boolean isSQLFederationEnabled() {
129         return sqlFederationRule.getConfiguration().isSqlFederationEnabled();
130     }
131     
132     /**
133      * Decide use SQL federation or not.
134      *
135      * @param queryContext query context
136      * @param globalRuleMetaData global rule meta data
137      * @return use SQL federation or not
138      */
139     @SuppressWarnings({"unchecked", "rawtypes"})
140     public boolean decide(final QueryContext queryContext, final RuleMetaData globalRuleMetaData) {
141         SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
142         if (!isSQLFederationEnabled() || !isSupportedSQLStatementContext(sqlStatementContext)) {
143             return false;
144         }
145         boolean allQueryUseSQLFederation = sqlFederationRule.getConfiguration().isAllQueryUseSQLFederation();
146         if (allQueryUseSQLFederation) {
147             return true;
148         }
149         Collection<String> databaseNames = sqlStatementContext.getTablesContext().getDatabaseNames();
150         if (databaseNames.size() > 1) {
151             return true;
152         }
153         ShardingSphereDatabase usedDatabase = queryContext.getUsedDatabase();
154         Collection<DataNode> includedDataNodes = new HashSet<>();
155         for (Entry<ShardingSphereRule, SQLFederationDecider> entry : deciders.entrySet()) {
156             boolean isUseSQLFederation = entry.getValue().decide(sqlStatementContext, queryContext.getParameters(), globalRuleMetaData, usedDatabase, entry.getKey(), includedDataNodes);
157             if (isUseSQLFederation) {
158                 return true;
159             }
160         }
161         return false;
162     }
163     
164     private boolean isSupportedSQLStatementContext(final SQLStatementContext sqlStatementContext) {
165         return isSupportedSQLStatement(sqlStatementContext instanceof ExplainStatementContext
166                 ? ((ExplainStatementContext) sqlStatementContext).getSqlStatement().getExplainableSQLStatement()
167                 : sqlStatementContext.getSqlStatement());
168     }
169     
170     private boolean isSupportedSQLStatement(final SQLStatement sqlStatement) {
171         return sqlStatement instanceof SelectStatement;
172     }
173     
174     /**
175      * Execute query.
176      *
177      * @param prepareEngine prepare engine
178      * @param callback callback
179      * @param federationContext federation context
180      * @return result set
181      */
182     public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
183                                   final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext) {
184         return execute0(prepareEngine, callback, federationContext);
185     }
186     
187     private ResultSet execute0(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
188                                final JDBCExecutorCallback<? extends ExecuteResult> queryCallback, final SQLFederationContext federationContext) {
189         queryContext = federationContext.getQueryContext();
190         logSQL(queryContext, federationContext.getMetaData().getProps());
191         try {
192             processEngine.executeSQL(new ExecutionGroupContext<>(Collections.emptyList(),
193                     new ExecutionGroupReportContext(federationContext.getProcessId(), currentDatabaseName, queryContext.getConnectionContext().getGrantee())), queryContext);
194             SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext() instanceof ExplainStatementContext
195                     ? ((ExplainStatementContext) queryContext.getSqlStatementContext()).getExplainableSQLStatementContext()
196                     : queryContext.getSqlStatementContext();
197             CompilerContext compilerContext = sqlFederationRule.getCompilerContext();
198             SQLFederationRelConverter converter = new SQLFederationRelConverter(compilerContext, getSchemaPath(sqlStatementContext),
199                     sqlStatementContext.getSqlStatement().getDatabaseType(), processor.getConvention());
200             schemaPlus = converter.getSchemaPlus();
201             processor.prepare(prepareEngine, queryCallback, currentDatabaseName, currentSchemaName, federationContext, compilerContext, schemaPlus);
202             SQLFederationExecutionPlan executionPlan = compileQuery(converter, currentDatabaseName,
203                     currentSchemaName, federationContext, sqlStatementContext, queryContext.getSql(), processor.getConvention());
204             logExecutionPlan(executionPlan, federationContext.getMetaData().getProps());
205             resultSet = processor.executePlan(prepareEngine, queryCallback, executionPlan, converter, federationContext, schemaPlus);
206             return resultSet;
207             // CHECKSTYLE:OFF
208         } catch (final Exception ex) {
209             // CHECKSTYLE:ON
210             String errorMessage = splitErrorMessage(ex);
211             log.error("SQL Federation execute failed, sql {}, parameters {}, reason {}", queryContext.getSql(), queryContext.getParameters(), errorMessage);
212             closeResources(federationContext);
213             if (NEED_THROW_EXCEPTION_TYPES.stream().anyMatch(each -> each.isAssignableFrom(ex.getClass()))) {
214                 throw ex;
215             }
216             throw new SQLFederationUnsupportedSQLException(queryContext.getSql(), errorMessage);
217         }
218     }
219     
220     private String splitErrorMessage(final Exception ex) {
221         return null == ex.getMessage() ? "" : ex.getMessage().substring(0, Math.min(ex.getMessage().length(), MAX_ERROR_MESSAGE_LENGTH));
222     }
223     
224     private void closeResources(final SQLFederationContext federationContext) {
225         try {
226             processEngine.completeSQLExecution(federationContext.getProcessId());
227             close();
228             // CHECKSTYLE:OFF
229         } catch (final Exception ex) {
230             // CHECKSTYLE:ON
231             log.warn("Failed to close SQL federation engine resources: {}", ex.getMessage());
232         }
233     }
234     
235     private void logSQL(final QueryContext queryContext, final ConfigurationProperties props) {
236         if (!props.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
237             return;
238         }
239         if (queryContext.getParameters().isEmpty()) {
240             log.info("SQL Federation Logic SQL: {} ::: {} ::: {}", queryContext.getSql(), currentDatabaseName, currentSchemaName);
241         } else {
242             log.info("SQL Federation Logic SQL: {} ::: {} ::: {} ::: {}", queryContext.getSql(), queryContext.getParameters(), currentDatabaseName, currentSchemaName);
243         }
244     }
245     
246     private void logExecutionPlan(final SQLFederationExecutionPlan executionPlan, final ConfigurationProperties props) {
247         if (props.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
248             log.info("SQL Federation Execution Plan: {}", RelOptUtil.toString(executionPlan.getPhysicalPlan(), SqlExplainLevel.ALL_ATTRIBUTES).replaceAll(", id = \\d+", ""));
249         }
250     }
251     
252     private List<String> getSchemaPath(final SQLStatementContext sqlStatementContext) {
253         DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sqlStatementContext.getSqlStatement().getDatabaseType()).getDialectDatabaseMetaData();
254         // TODO set default schema according to search path result
255         if (dialectDatabaseMetaData.getSchemaOption().getDefaultSchema().isPresent()) {
256             return sqlStatementContext.getTablesContext().getSimpleTables().stream().anyMatch(each -> each.getOwner().isPresent())
257                     ? Collections.singletonList(currentDatabaseName)
258                     : Arrays.asList(currentDatabaseName, currentSchemaName);
259         }
260         return Collections.singletonList(currentDatabaseName);
261     }
262     
263     private SQLFederationExecutionPlan compileQuery(final SQLFederationRelConverter converter, final String databaseName, final String schemaName, final SQLFederationContext federationContext,
264                                                     final SQLStatementContext sqlStatementContext, final String sql, final Convention convention) {
265         SQLStatementCompiler sqlStatementCompiler = new SQLStatementCompiler(converter, convention);
266         SQLFederationCompilerEngine compilerEngine = new SQLFederationCompilerEngine(databaseName, schemaName, sqlFederationRule.getConfiguration().getExecutionPlanCache());
267         return compilerEngine.compile(buildCacheKey(federationContext, sqlStatementContext, sql, sqlStatementCompiler), false);
268     }
269     
270     private ExecutionPlanCacheKey buildCacheKey(final SQLFederationContext federationContext, final SQLStatementContext sqlStatementContext,
271                                                 final String sql, final SQLStatementCompiler sqlStatementCompiler) {
272         ExecutionPlanCacheKey result = new ExecutionPlanCacheKey(sql, sqlStatementContext.getSqlStatement(), sqlStatementCompiler);
273         Collection<SimpleTableSegment> tableSegments = sqlStatementContext.getTablesContext().getSimpleTables();
274         for (SimpleTableSegment each : tableSegments) {
275             String originalDatabase = each.getTableName().getTableBoundInfo().map(optional -> optional.getOriginalDatabase().getValue()).orElse(currentDatabaseName);
276             String originalSchema = each.getTableName().getTableBoundInfo().map(optional -> optional.getOriginalSchema().getValue()).orElse(currentSchemaName);
277             ShardingSphereTable table = federationContext.getMetaData().getDatabase(originalDatabase).getSchema(originalSchema).getTable(each.getTableName().getIdentifier().getValue());
278             ShardingSpherePreconditions.checkNotNull(table, () -> new NoSuchTableException(each.getTableName().getIdentifier().getValue()));
279             result.getTableMetaDataVersions().put(Joiner.on(".").join(Arrays.asList(originalDatabase, originalSchema, table.getName())), 0);
280         }
281         return result;
282     }
283     
284     /**
285      * Get result set.
286      *
287      * @return result set
288      */
289     public ResultSet getResultSet() {
290         SQLStatement sqlStatement = queryContext.getSqlStatementContext().getSqlStatement();
291         return sqlStatement instanceof SelectStatement || sqlStatement instanceof ExplainStatement ? resultSet : null;
292     }
293     
294     @Override
295     public void close() throws SQLException {
296         Collection<SQLException> result = new LinkedList<>();
297         closeResultSet().ifPresent(result::add);
298         release();
299         if (result.isEmpty()) {
300             return;
301         }
302         SQLException ex = new SQLException();
303         result.forEach(ex::setNextException);
304         throw ex;
305     }
306     
307     private Optional<SQLException> closeResultSet() {
308         try {
309             if (null != resultSet && !resultSet.isClosed()) {
310                 resultSet.close();
311             }
312         } catch (final SQLException ex) {
313             return Optional.of(ex);
314         }
315         return Optional.empty();
316     }
317     
318     private void release() {
319         if (null != queryContext && null != schemaPlus) {
320             processor.release(currentDatabaseName, currentSchemaName, queryContext, schemaPlus);
321         }
322     }
323 }