View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.sqlfederation.engine;
19  
20  import lombok.Getter;
21  import org.apache.calcite.adapter.enumerable.EnumerableInterpretable;
22  import org.apache.calcite.adapter.enumerable.EnumerableRel;
23  import org.apache.calcite.adapter.java.JavaTypeFactory;
24  import org.apache.calcite.config.CalciteConnectionConfig;
25  import org.apache.calcite.config.CalciteConnectionConfigImpl;
26  import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
27  import org.apache.calcite.linq4j.Enumerator;
28  import org.apache.calcite.prepare.CalciteCatalogReader;
29  import org.apache.calcite.runtime.Bindable;
30  import org.apache.calcite.schema.Schema;
31  import org.apache.calcite.schema.Table;
32  import org.apache.calcite.sql.validate.SqlValidator;
33  import org.apache.calcite.sql2rel.SqlToRelConverter;
34  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
35  import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
36  import org.apache.shardingsphere.infra.datanode.DataNode;
37  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
38  import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.table.NoSuchTableException;
39  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
40  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
41  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
42  import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
43  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
44  import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
45  import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
46  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
47  import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
48  import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
49  import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
50  import org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtils;
51  import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
52  import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
53  import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
54  import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationBindContext;
55  import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
56  import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
57  import org.apache.shardingsphere.sqlfederation.executor.enumerable.EnumerableScanExecutor;
58  import org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationCompilerEngine;
59  import org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationExecutionPlan;
60  import org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
61  import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerMetaData;
62  import org.apache.shardingsphere.sqlfederation.optimizer.exception.SQLFederationUnsupportedSQLException;
63  import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.SQLFederationTable;
64  import org.apache.shardingsphere.sqlfederation.optimizer.planner.cache.ExecutionPlanCacheKey;
65  import org.apache.shardingsphere.sqlfederation.optimizer.planner.util.SQLFederationPlannerUtils;
66  import org.apache.shardingsphere.sqlfederation.optimizer.statement.SQLStatementCompiler;
67  import org.apache.shardingsphere.sqlfederation.resultset.SQLFederationResultSet;
68  import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
69  import org.apache.shardingsphere.sqlfederation.spi.SQLFederationDecider;
70  
71  import java.sql.Connection;
72  import java.sql.ResultSet;
73  import java.sql.SQLException;
74  import java.util.Collection;
75  import java.util.Collections;
76  import java.util.HashMap;
77  import java.util.HashSet;
78  import java.util.List;
79  import java.util.Map;
80  import java.util.Map.Entry;
81  
82  /**
83   * SQL federation engine.
84   */
85  @Getter
86  public final class SQLFederationEngine implements AutoCloseable {
87      
88      private static final int DEFAULT_METADATA_VERSION = 0;
89      
90      private static final JavaTypeFactory DEFAULT_DATA_TYPE_FACTORY = new JavaTypeFactoryImpl();
91      
92      private final ProcessEngine processEngine = new ProcessEngine();
93      
94      @SuppressWarnings("rawtypes")
95      private final Map<ShardingSphereRule, SQLFederationDecider> deciders;
96      
97      private final String databaseName;
98      
99      private final String schemaName;
100     
101     private final ShardingSphereMetaData metaData;
102     
103     private final ShardingSphereStatistics statistics;
104     
105     private final JDBCExecutor jdbcExecutor;
106     
107     private final SQLFederationRule sqlFederationRule;
108     
109     private ResultSet resultSet;
110     
111     public SQLFederationEngine(final String databaseName, final String schemaName, final ShardingSphereMetaData metaData, final ShardingSphereStatistics statistics, final JDBCExecutor jdbcExecutor) {
112         deciders = OrderedSPILoader.getServices(SQLFederationDecider.class, metaData.getDatabase(databaseName).getRuleMetaData().getRules());
113         this.databaseName = databaseName;
114         this.schemaName = schemaName;
115         this.metaData = metaData;
116         this.statistics = statistics;
117         this.jdbcExecutor = jdbcExecutor;
118         sqlFederationRule = metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
119     }
120     
121     /**
122      * Decide use SQL federation or not.
123      *
124      * @param sqlStatementContext SQL statement context
125      * @param parameters SQL parameters
126      * @param database ShardingSphere database
127      * @param globalRuleMetaData global rule meta data
128      * @return use SQL federation or not
129      */
130     @SuppressWarnings({"unchecked", "rawtypes"})
131     public boolean decide(final SQLStatementContext sqlStatementContext, final List<Object> parameters, final ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
132         // TODO BEGIN: move this logic to SQLFederationDecider implement class when we remove sql federation type
133         if (isQuerySystemSchema(sqlStatementContext, database)) {
134             return true;
135         }
136         // TODO END
137         boolean sqlFederationEnabled = sqlFederationRule.getConfiguration().isSqlFederationEnabled();
138         if (!sqlFederationEnabled || !(sqlStatementContext instanceof SelectStatementContext)) {
139             return false;
140         }
141         boolean allQueryUseSQLFederation = sqlFederationRule.getConfiguration().isAllQueryUseSQLFederation();
142         if (allQueryUseSQLFederation) {
143             return true;
144         }
145         Collection<DataNode> includedDataNodes = new HashSet<>();
146         for (Entry<ShardingSphereRule, SQLFederationDecider> entry : deciders.entrySet()) {
147             boolean isUseSQLFederation = entry.getValue().decide((SelectStatementContext) sqlStatementContext, parameters, globalRuleMetaData, database, entry.getKey(), includedDataNodes);
148             if (isUseSQLFederation) {
149                 return true;
150             }
151         }
152         return false;
153     }
154     
155     private boolean isQuerySystemSchema(final SQLStatementContext sqlStatementContext, final ShardingSphereDatabase database) {
156         return sqlStatementContext instanceof SelectStatementContext
157                 && (SystemSchemaUtils.containsSystemSchema(sqlStatementContext.getDatabaseType(), sqlStatementContext.getTablesContext().getSchemaNames(), database)
158                         || SystemSchemaUtils.isOpenGaussSystemCatalogQuery(sqlStatementContext.getDatabaseType(),
159                                 ((SelectStatementContext) sqlStatementContext).getSqlStatement().getProjections().getProjections()));
160     }
161     
162     /**
163      * Execute query.
164      *
165      * @param prepareEngine prepare engine
166      * @param callback callback
167      * @param federationContext federation context
168      * @return result set
169      * @throws SQLFederationUnsupportedSQLException SQL federation unsupported SQL exception
170      */
171     public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
172                                   final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext) {
173         try {
174             String databaseName = federationContext.getQueryContext().getDatabaseNameFromSQLStatement().orElse(this.databaseName);
175             String schemaName = federationContext.getQueryContext().getSchemaNameFromSQLStatement().orElse(this.schemaName);
176             OptimizerMetaData optimizerMetaData = sqlFederationRule.getOptimizerContext().getMetaData(databaseName);
177             CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDialectProps());
178             CalciteCatalogReader catalogReader = SQLFederationPlannerUtils.createCatalogReader(schemaName, optimizerMetaData.getSchema(schemaName), DEFAULT_DATA_TYPE_FACTORY, connectionConfig);
179             SqlValidator validator = SQLFederationPlannerUtils.createSqlValidator(catalogReader, DEFAULT_DATA_TYPE_FACTORY,
180                     sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDatabaseType(), connectionConfig);
181             SqlToRelConverter converter = SQLFederationPlannerUtils.createSqlToRelConverter(catalogReader, validator, SQLFederationPlannerUtils.createRelOptCluster(DEFAULT_DATA_TYPE_FACTORY),
182                     sqlFederationRule.getOptimizerContext().getSqlParserRule(), sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDatabaseType(), true);
183             Schema sqlFederationSchema = catalogReader.getRootSchema().plus().getSubSchema(schemaName);
184             SQLFederationExecutionPlan executionPlan = compileQuery(prepareEngine, callback, federationContext, databaseName, schemaName, sqlFederationSchema, converter);
185             resultSet = executePlan(federationContext, executionPlan, validator, converter, sqlFederationSchema);
186             return resultSet;
187             // CHECKSTYLE:OFF
188         } catch (final Exception ex) {
189             // CHECKSTYLE:ON
190             throw new SQLFederationUnsupportedSQLException(federationContext.getQueryContext().getSql());
191         }
192     }
193     
194     private SQLFederationExecutionPlan compileQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> callback,
195                                                     final SQLFederationContext federationContext, final String databaseName, final String schemaName, final Schema sqlFederationSchema,
196                                                     final SqlToRelConverter converter) {
197         SQLStatementContext sqlStatementContext = federationContext.getQueryContext().getSqlStatementContext();
198         ShardingSpherePreconditions.checkState(sqlStatementContext instanceof SelectStatementContext, () -> new IllegalArgumentException("SQL statement context must be select statement context."));
199         registerTableScanExecutor(sqlFederationSchema, prepareEngine, callback, federationContext, sqlFederationRule.getOptimizerContext(), databaseName, schemaName);
200         SQLStatementCompiler sqlStatementCompiler = new SQLStatementCompiler(converter);
201         SQLFederationCompilerEngine compilerEngine = new SQLFederationCompilerEngine(databaseName, schemaName, sqlFederationRule.getConfiguration().getExecutionPlanCache());
202         // TODO open useCache flag when ShardingSphereTable contains version
203         return compilerEngine.compile(buildCacheKey(federationContext, (SelectStatementContext) sqlStatementContext, sqlStatementCompiler, databaseName, schemaName), false);
204     }
205     
206     @SuppressWarnings("unchecked")
207     private ResultSet executePlan(final SQLFederationContext federationContext, final SQLFederationExecutionPlan executionPlan, final SqlValidator validator, final SqlToRelConverter converter,
208                                   final Schema sqlFederationSchema) {
209         try {
210             Bindable<Object> executablePlan = EnumerableInterpretable.toBindable(Collections.emptyMap(), null, (EnumerableRel) executionPlan.getPhysicalPlan(), EnumerableRel.Prefer.ARRAY);
211             Map<String, Object> params = createParameters(federationContext.getQueryContext().getParameters());
212             Enumerator<Object> enumerator = executablePlan.bind(new SQLFederationBindContext(validator, converter, params)).enumerator();
213             return new SQLFederationResultSet(enumerator, sqlFederationSchema, (SelectStatementContext) federationContext.getQueryContext().getSqlStatementContext(),
214                     executionPlan.getResultColumnType());
215         } finally {
216             processEngine.completeSQLExecution(federationContext.getProcessId());
217         }
218     }
219     
220     private ExecutionPlanCacheKey buildCacheKey(final SQLFederationContext federationContext, final SelectStatementContext selectStatementContext,
221                                                 final SQLStatementCompiler sqlStatementCompiler, final String databaseName, final String schemaName) {
222         ShardingSphereSchema schema = federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
223         ExecutionPlanCacheKey result =
224                 new ExecutionPlanCacheKey(federationContext.getQueryContext().getSql(), selectStatementContext.getSqlStatement(), selectStatementContext.getDatabaseType().getType(),
225                         sqlStatementCompiler);
226         for (String each : selectStatementContext.getTablesContext().getTableNames()) {
227             ShardingSphereTable table = schema.getTable(each);
228             ShardingSpherePreconditions.checkNotNull(table, () -> new NoSuchTableException(each));
229             // TODO replace DEFAULT_METADATA_VERSION with actual version in ShardingSphereTable
230             result.getTableMetaDataVersions().put(table.getName(), DEFAULT_METADATA_VERSION);
231         }
232         return result;
233     }
234     
235     private void registerTableScanExecutor(final Schema sqlFederationSchema, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
236                                            final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext,
237                                            final OptimizerContext optimizerContext, final String databaseName, final String schemaName) {
238         if (null == sqlFederationSchema) {
239             return;
240         }
241         SQLFederationExecutorContext executorContext = new SQLFederationExecutorContext(databaseName, schemaName, metaData.getProps());
242         EnumerableScanExecutor scanExecutor =
243                 new EnumerableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, executorContext, federationContext, metaData.getGlobalRuleMetaData(), statistics);
244         // TODO register only the required tables
245         for (ShardingSphereTable each : metaData.getDatabase(databaseName).getSchema(schemaName).getTables().values()) {
246             Table table = sqlFederationSchema.getTable(each.getName());
247             if (table instanceof SQLFederationTable) {
248                 ((SQLFederationTable) table).setScanExecutor(scanExecutor);
249             }
250         }
251     }
252     
253     private Map<String, Object> createParameters(final List<Object> params) {
254         Map<String, Object> result = new HashMap<>(params.size(), 1F);
255         int index = 0;
256         for (Object each : params) {
257             result.put("?" + index++, each);
258         }
259         return result;
260     }
261     
262     @Override
263     public void close() throws SQLException {
264         if (null != resultSet) {
265             resultSet.close();
266         }
267     }
268 }