1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.sqlfederation.engine;
19
20 import lombok.Getter;
21 import org.apache.calcite.adapter.enumerable.EnumerableInterpretable;
22 import org.apache.calcite.adapter.enumerable.EnumerableRel;
23 import org.apache.calcite.adapter.java.JavaTypeFactory;
24 import org.apache.calcite.config.CalciteConnectionConfig;
25 import org.apache.calcite.config.CalciteConnectionConfigImpl;
26 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
27 import org.apache.calcite.linq4j.Enumerator;
28 import org.apache.calcite.prepare.CalciteCatalogReader;
29 import org.apache.calcite.runtime.Bindable;
30 import org.apache.calcite.schema.Schema;
31 import org.apache.calcite.schema.Table;
32 import org.apache.calcite.sql.validate.SqlValidator;
33 import org.apache.calcite.sql2rel.SqlToRelConverter;
34 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
35 import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
36 import org.apache.shardingsphere.infra.datanode.DataNode;
37 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
38 import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.table.NoSuchTableException;
39 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
40 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
41 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
42 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
43 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
44 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
45 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
46 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
47 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
48 import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
49 import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
50 import org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtils;
51 import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
52 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
53 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
54 import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationBindContext;
55 import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
56 import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
57 import org.apache.shardingsphere.sqlfederation.executor.enumerable.EnumerableScanExecutor;
58 import org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationCompilerEngine;
59 import org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationExecutionPlan;
60 import org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
61 import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerMetaData;
62 import org.apache.shardingsphere.sqlfederation.optimizer.exception.SQLFederationUnsupportedSQLException;
63 import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.SQLFederationTable;
64 import org.apache.shardingsphere.sqlfederation.optimizer.planner.cache.ExecutionPlanCacheKey;
65 import org.apache.shardingsphere.sqlfederation.optimizer.planner.util.SQLFederationPlannerUtils;
66 import org.apache.shardingsphere.sqlfederation.optimizer.statement.SQLStatementCompiler;
67 import org.apache.shardingsphere.sqlfederation.resultset.SQLFederationResultSet;
68 import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
69 import org.apache.shardingsphere.sqlfederation.spi.SQLFederationDecider;
70
71 import java.sql.Connection;
72 import java.sql.ResultSet;
73 import java.sql.SQLException;
74 import java.util.Collection;
75 import java.util.Collections;
76 import java.util.HashMap;
77 import java.util.HashSet;
78 import java.util.List;
79 import java.util.Map;
80 import java.util.Map.Entry;
81
82
83
84
85 @Getter
86 public final class SQLFederationEngine implements AutoCloseable {
87
88 private static final int DEFAULT_METADATA_VERSION = 0;
89
90 private static final JavaTypeFactory DEFAULT_DATA_TYPE_FACTORY = new JavaTypeFactoryImpl();
91
92 private final ProcessEngine processEngine = new ProcessEngine();
93
94 @SuppressWarnings("rawtypes")
95 private final Map<ShardingSphereRule, SQLFederationDecider> deciders;
96
97 private final String databaseName;
98
99 private final String schemaName;
100
101 private final ShardingSphereMetaData metaData;
102
103 private final ShardingSphereStatistics statistics;
104
105 private final JDBCExecutor jdbcExecutor;
106
107 private final SQLFederationRule sqlFederationRule;
108
109 private ResultSet resultSet;
110
111 public SQLFederationEngine(final String databaseName, final String schemaName, final ShardingSphereMetaData metaData, final ShardingSphereStatistics statistics, final JDBCExecutor jdbcExecutor) {
112 deciders = OrderedSPILoader.getServices(SQLFederationDecider.class, metaData.getDatabase(databaseName).getRuleMetaData().getRules());
113 this.databaseName = databaseName;
114 this.schemaName = schemaName;
115 this.metaData = metaData;
116 this.statistics = statistics;
117 this.jdbcExecutor = jdbcExecutor;
118 sqlFederationRule = metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
119 }
120
121
122
123
124
125
126
127
128
129
130 @SuppressWarnings({"unchecked", "rawtypes"})
131 public boolean decide(final SQLStatementContext sqlStatementContext, final List<Object> parameters, final ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
132
133 if (isQuerySystemSchema(sqlStatementContext, database)) {
134 return true;
135 }
136
137 boolean sqlFederationEnabled = sqlFederationRule.getConfiguration().isSqlFederationEnabled();
138 if (!sqlFederationEnabled || !(sqlStatementContext instanceof SelectStatementContext)) {
139 return false;
140 }
141 boolean allQueryUseSQLFederation = sqlFederationRule.getConfiguration().isAllQueryUseSQLFederation();
142 if (allQueryUseSQLFederation) {
143 return true;
144 }
145 Collection<DataNode> includedDataNodes = new HashSet<>();
146 for (Entry<ShardingSphereRule, SQLFederationDecider> entry : deciders.entrySet()) {
147 boolean isUseSQLFederation = entry.getValue().decide((SelectStatementContext) sqlStatementContext, parameters, globalRuleMetaData, database, entry.getKey(), includedDataNodes);
148 if (isUseSQLFederation) {
149 return true;
150 }
151 }
152 return false;
153 }
154
155 private boolean isQuerySystemSchema(final SQLStatementContext sqlStatementContext, final ShardingSphereDatabase database) {
156 return sqlStatementContext instanceof SelectStatementContext
157 && (SystemSchemaUtils.containsSystemSchema(sqlStatementContext.getDatabaseType(), sqlStatementContext.getTablesContext().getSchemaNames(), database)
158 || SystemSchemaUtils.isOpenGaussSystemCatalogQuery(sqlStatementContext.getDatabaseType(),
159 ((SelectStatementContext) sqlStatementContext).getSqlStatement().getProjections().getProjections()));
160 }
161
162
163
164
165
166
167
168
169
170
171 public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
172 final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext) {
173 try {
174 String databaseName = federationContext.getQueryContext().getDatabaseNameFromSQLStatement().orElse(this.databaseName);
175 String schemaName = federationContext.getQueryContext().getSchemaNameFromSQLStatement().orElse(this.schemaName);
176 OptimizerMetaData optimizerMetaData = sqlFederationRule.getOptimizerContext().getMetaData(databaseName);
177 CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDialectProps());
178 CalciteCatalogReader catalogReader = SQLFederationPlannerUtils.createCatalogReader(schemaName, optimizerMetaData.getSchema(schemaName), DEFAULT_DATA_TYPE_FACTORY, connectionConfig);
179 SqlValidator validator = SQLFederationPlannerUtils.createSqlValidator(catalogReader, DEFAULT_DATA_TYPE_FACTORY,
180 sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDatabaseType(), connectionConfig);
181 SqlToRelConverter converter = SQLFederationPlannerUtils.createSqlToRelConverter(catalogReader, validator, SQLFederationPlannerUtils.createRelOptCluster(DEFAULT_DATA_TYPE_FACTORY),
182 sqlFederationRule.getOptimizerContext().getSqlParserRule(), sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDatabaseType(), true);
183 Schema sqlFederationSchema = catalogReader.getRootSchema().plus().getSubSchema(schemaName);
184 SQLFederationExecutionPlan executionPlan = compileQuery(prepareEngine, callback, federationContext, databaseName, schemaName, sqlFederationSchema, converter);
185 resultSet = executePlan(federationContext, executionPlan, validator, converter, sqlFederationSchema);
186 return resultSet;
187
188 } catch (final Exception ex) {
189
190 throw new SQLFederationUnsupportedSQLException(federationContext.getQueryContext().getSql());
191 }
192 }
193
194 private SQLFederationExecutionPlan compileQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> callback,
195 final SQLFederationContext federationContext, final String databaseName, final String schemaName, final Schema sqlFederationSchema,
196 final SqlToRelConverter converter) {
197 SQLStatementContext sqlStatementContext = federationContext.getQueryContext().getSqlStatementContext();
198 ShardingSpherePreconditions.checkState(sqlStatementContext instanceof SelectStatementContext, () -> new IllegalArgumentException("SQL statement context must be select statement context."));
199 registerTableScanExecutor(sqlFederationSchema, prepareEngine, callback, federationContext, sqlFederationRule.getOptimizerContext(), databaseName, schemaName);
200 SQLStatementCompiler sqlStatementCompiler = new SQLStatementCompiler(converter);
201 SQLFederationCompilerEngine compilerEngine = new SQLFederationCompilerEngine(databaseName, schemaName, sqlFederationRule.getConfiguration().getExecutionPlanCache());
202
203 return compilerEngine.compile(buildCacheKey(federationContext, (SelectStatementContext) sqlStatementContext, sqlStatementCompiler, databaseName, schemaName), false);
204 }
205
206 @SuppressWarnings("unchecked")
207 private ResultSet executePlan(final SQLFederationContext federationContext, final SQLFederationExecutionPlan executionPlan, final SqlValidator validator, final SqlToRelConverter converter,
208 final Schema sqlFederationSchema) {
209 try {
210 Bindable<Object> executablePlan = EnumerableInterpretable.toBindable(Collections.emptyMap(), null, (EnumerableRel) executionPlan.getPhysicalPlan(), EnumerableRel.Prefer.ARRAY);
211 Map<String, Object> params = createParameters(federationContext.getQueryContext().getParameters());
212 Enumerator<Object> enumerator = executablePlan.bind(new SQLFederationBindContext(validator, converter, params)).enumerator();
213 return new SQLFederationResultSet(enumerator, sqlFederationSchema, (SelectStatementContext) federationContext.getQueryContext().getSqlStatementContext(),
214 executionPlan.getResultColumnType());
215 } finally {
216 processEngine.completeSQLExecution(federationContext.getProcessId());
217 }
218 }
219
220 private ExecutionPlanCacheKey buildCacheKey(final SQLFederationContext federationContext, final SelectStatementContext selectStatementContext,
221 final SQLStatementCompiler sqlStatementCompiler, final String databaseName, final String schemaName) {
222 ShardingSphereSchema schema = federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
223 ExecutionPlanCacheKey result =
224 new ExecutionPlanCacheKey(federationContext.getQueryContext().getSql(), selectStatementContext.getSqlStatement(), selectStatementContext.getDatabaseType().getType(),
225 sqlStatementCompiler);
226 for (String each : selectStatementContext.getTablesContext().getTableNames()) {
227 ShardingSphereTable table = schema.getTable(each);
228 ShardingSpherePreconditions.checkNotNull(table, () -> new NoSuchTableException(each));
229
230 result.getTableMetaDataVersions().put(table.getName(), DEFAULT_METADATA_VERSION);
231 }
232 return result;
233 }
234
235 private void registerTableScanExecutor(final Schema sqlFederationSchema, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
236 final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext,
237 final OptimizerContext optimizerContext, final String databaseName, final String schemaName) {
238 if (null == sqlFederationSchema) {
239 return;
240 }
241 SQLFederationExecutorContext executorContext = new SQLFederationExecutorContext(databaseName, schemaName, metaData.getProps());
242 EnumerableScanExecutor scanExecutor =
243 new EnumerableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, executorContext, federationContext, metaData.getGlobalRuleMetaData(), statistics);
244
245 for (ShardingSphereTable each : metaData.getDatabase(databaseName).getSchema(schemaName).getTables().values()) {
246 Table table = sqlFederationSchema.getTable(each.getName());
247 if (table instanceof SQLFederationTable) {
248 ((SQLFederationTable) table).setScanExecutor(scanExecutor);
249 }
250 }
251 }
252
253 private Map<String, Object> createParameters(final List<Object> params) {
254 Map<String, Object> result = new HashMap<>(params.size(), 1F);
255 int index = 0;
256 for (Object each : params) {
257 result.put("?" + index++, each);
258 }
259 return result;
260 }
261
262 @Override
263 public void close() throws SQLException {
264 if (null != resultSet) {
265 resultSet.close();
266 }
267 }
268 }