1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.sqlfederation.engine;
19
20 import com.google.common.base.Joiner;
21 import lombok.Getter;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.calcite.plan.Convention;
24 import org.apache.calcite.plan.RelOptUtil;
25 import org.apache.calcite.schema.SchemaPlus;
26 import org.apache.calcite.sql.SqlExplainLevel;
27 import org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
28 import org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
29 import org.apache.shardingsphere.database.exception.core.exception.syntax.table.NoSuchTableException;
30 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
31 import org.apache.shardingsphere.infra.binder.context.statement.type.dal.ExplainStatementContext;
32 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
33 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
34 import org.apache.shardingsphere.infra.datanode.DataNode;
35 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
36 import org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException;
37 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
38 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
39 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
40 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
41 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
42 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
43 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
44 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
45 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
46 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
47 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
48 import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
49 import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
50 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
51 import org.apache.shardingsphere.infra.session.query.QueryContext;
52 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
53 import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
54 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
55 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dal.ExplainStatement;
56 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.SelectStatement;
57 import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationCompilerEngine;
58 import org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan;
59 import org.apache.shardingsphere.sqlfederation.compiler.compiler.SQLStatementCompiler;
60 import org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
61 import org.apache.shardingsphere.sqlfederation.compiler.exception.SQLFederationUnsupportedSQLException;
62 import org.apache.shardingsphere.sqlfederation.compiler.planner.cache.ExecutionPlanCacheKey;
63 import org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter;
64 import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
65 import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor;
66 import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessorFactory;
67 import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
68 import org.apache.shardingsphere.sqlfederation.spi.SQLFederationDecider;
69
70 import java.sql.Connection;
71 import java.sql.ResultSet;
72 import java.sql.SQLException;
73 import java.sql.SQLIntegrityConstraintViolationException;
74 import java.util.Arrays;
75 import java.util.Collection;
76 import java.util.Collections;
77 import java.util.HashSet;
78 import java.util.LinkedList;
79 import java.util.List;
80 import java.util.Map;
81 import java.util.Map.Entry;
82 import java.util.Optional;
83
84
85
86
87 @Getter
88 @Slf4j
89 public final class SQLFederationEngine implements AutoCloseable {
90
91 private static final Collection<Class<?>> NEED_THROW_EXCEPTION_TYPES = Arrays.asList(SQLExecutionInterruptedException.class, SQLIntegrityConstraintViolationException.class);
92
93 private static final int MAX_ERROR_MESSAGE_LENGTH = 5000;
94
95 private final ProcessEngine processEngine = new ProcessEngine();
96
97 @SuppressWarnings("rawtypes")
98 private final Map<ShardingSphereRule, SQLFederationDecider> deciders;
99
100 private final String currentDatabaseName;
101
102 private final String currentSchemaName;
103
104 private final SQLFederationRule sqlFederationRule;
105
106 private final SQLFederationProcessor processor;
107
108 private QueryContext queryContext;
109
110 private SchemaPlus schemaPlus;
111
112 private ResultSet resultSet;
113
114 public SQLFederationEngine(final String currentDatabaseName, final String currentSchemaName, final ShardingSphereMetaData metaData,
115 final ShardingSphereStatistics statistics, final JDBCExecutor jdbcExecutor) {
116 deciders = OrderedSPILoader.getServices(SQLFederationDecider.class, metaData.getDatabase(currentDatabaseName).getRuleMetaData().getRules());
117 this.currentDatabaseName = currentDatabaseName;
118 this.currentSchemaName = currentSchemaName;
119 sqlFederationRule = metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
120 processor = SQLFederationProcessorFactory.getInstance().newInstance(statistics, jdbcExecutor);
121 }
122
123
124
125
126
127
128 public boolean isSQLFederationEnabled() {
129 return sqlFederationRule.getConfiguration().isSqlFederationEnabled();
130 }
131
132
133
134
135
136
137
138
139 @SuppressWarnings({"unchecked", "rawtypes"})
140 public boolean decide(final QueryContext queryContext, final RuleMetaData globalRuleMetaData) {
141 SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
142 if (!isSQLFederationEnabled() || !isSupportedSQLStatementContext(sqlStatementContext)) {
143 return false;
144 }
145 boolean allQueryUseSQLFederation = sqlFederationRule.getConfiguration().isAllQueryUseSQLFederation();
146 if (allQueryUseSQLFederation) {
147 return true;
148 }
149 Collection<String> databaseNames = sqlStatementContext.getTablesContext().getDatabaseNames();
150 if (databaseNames.size() > 1) {
151 return true;
152 }
153 ShardingSphereDatabase usedDatabase = queryContext.getUsedDatabase();
154 Collection<DataNode> includedDataNodes = new HashSet<>();
155 for (Entry<ShardingSphereRule, SQLFederationDecider> entry : deciders.entrySet()) {
156 boolean isUseSQLFederation = entry.getValue().decide(sqlStatementContext, queryContext.getParameters(), globalRuleMetaData, usedDatabase, entry.getKey(), includedDataNodes);
157 if (isUseSQLFederation) {
158 return true;
159 }
160 }
161 return false;
162 }
163
164 private boolean isSupportedSQLStatementContext(final SQLStatementContext sqlStatementContext) {
165 return isSupportedSQLStatement(sqlStatementContext instanceof ExplainStatementContext
166 ? ((ExplainStatementContext) sqlStatementContext).getSqlStatement().getExplainableSQLStatement()
167 : sqlStatementContext.getSqlStatement());
168 }
169
170 private boolean isSupportedSQLStatement(final SQLStatement sqlStatement) {
171 return sqlStatement instanceof SelectStatement;
172 }
173
174
175
176
177
178
179
180
181
182 public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
183 final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext) {
184 return execute0(prepareEngine, callback, federationContext);
185 }
186
187 private ResultSet execute0(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
188 final JDBCExecutorCallback<? extends ExecuteResult> queryCallback, final SQLFederationContext federationContext) {
189 queryContext = federationContext.getQueryContext();
190 logSQL(queryContext, federationContext.getMetaData().getProps());
191 try {
192 processEngine.executeSQL(new ExecutionGroupContext<>(Collections.emptyList(),
193 new ExecutionGroupReportContext(federationContext.getProcessId(), currentDatabaseName, queryContext.getConnectionContext().getGrantee())), queryContext);
194 SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext() instanceof ExplainStatementContext
195 ? ((ExplainStatementContext) queryContext.getSqlStatementContext()).getExplainableSQLStatementContext()
196 : queryContext.getSqlStatementContext();
197 CompilerContext compilerContext = sqlFederationRule.getCompilerContext();
198 SQLFederationRelConverter converter = new SQLFederationRelConverter(compilerContext, getSchemaPath(sqlStatementContext),
199 sqlStatementContext.getSqlStatement().getDatabaseType(), processor.getConvention());
200 schemaPlus = converter.getSchemaPlus();
201 processor.prepare(prepareEngine, queryCallback, currentDatabaseName, currentSchemaName, federationContext, compilerContext, schemaPlus);
202 SQLFederationExecutionPlan executionPlan = compileQuery(converter, currentDatabaseName,
203 currentSchemaName, federationContext, sqlStatementContext, queryContext.getSql(), processor.getConvention());
204 logExecutionPlan(executionPlan, federationContext.getMetaData().getProps());
205 resultSet = processor.executePlan(prepareEngine, queryCallback, executionPlan, converter, federationContext, schemaPlus);
206 return resultSet;
207
208 } catch (final Exception ex) {
209
210 String errorMessage = splitErrorMessage(ex);
211 log.error("SQL Federation execute failed, sql {}, parameters {}, reason {}", queryContext.getSql(), queryContext.getParameters(), errorMessage);
212 closeResources(federationContext);
213 if (NEED_THROW_EXCEPTION_TYPES.stream().anyMatch(each -> each.isAssignableFrom(ex.getClass()))) {
214 throw ex;
215 }
216 throw new SQLFederationUnsupportedSQLException(queryContext.getSql(), errorMessage);
217 }
218 }
219
220 private String splitErrorMessage(final Exception ex) {
221 return null == ex.getMessage() ? "" : ex.getMessage().substring(0, Math.min(ex.getMessage().length(), MAX_ERROR_MESSAGE_LENGTH));
222 }
223
224 private void closeResources(final SQLFederationContext federationContext) {
225 try {
226 processEngine.completeSQLExecution(federationContext.getProcessId());
227 close();
228
229 } catch (final Exception ex) {
230
231 log.warn("Failed to close SQL federation engine resources: {}", ex.getMessage());
232 }
233 }
234
235 private void logSQL(final QueryContext queryContext, final ConfigurationProperties props) {
236 if (!props.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
237 return;
238 }
239 if (queryContext.getParameters().isEmpty()) {
240 log.info("SQL Federation Logic SQL: {} ::: {} ::: {}", queryContext.getSql(), currentDatabaseName, currentSchemaName);
241 } else {
242 log.info("SQL Federation Logic SQL: {} ::: {} ::: {} ::: {}", queryContext.getSql(), queryContext.getParameters(), currentDatabaseName, currentSchemaName);
243 }
244 }
245
246 private void logExecutionPlan(final SQLFederationExecutionPlan executionPlan, final ConfigurationProperties props) {
247 if (props.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
248 log.info("SQL Federation Execution Plan: {}", RelOptUtil.toString(executionPlan.getPhysicalPlan(), SqlExplainLevel.ALL_ATTRIBUTES).replaceAll(", id = \\d+", ""));
249 }
250 }
251
252 private List<String> getSchemaPath(final SQLStatementContext sqlStatementContext) {
253 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sqlStatementContext.getSqlStatement().getDatabaseType()).getDialectDatabaseMetaData();
254
255 if (dialectDatabaseMetaData.getSchemaOption().getDefaultSchema().isPresent()) {
256 return sqlStatementContext.getTablesContext().getSimpleTables().stream().anyMatch(each -> each.getOwner().isPresent())
257 ? Collections.singletonList(currentDatabaseName)
258 : Arrays.asList(currentDatabaseName, currentSchemaName);
259 }
260 return Collections.singletonList(currentDatabaseName);
261 }
262
263 private SQLFederationExecutionPlan compileQuery(final SQLFederationRelConverter converter, final String databaseName, final String schemaName, final SQLFederationContext federationContext,
264 final SQLStatementContext sqlStatementContext, final String sql, final Convention convention) {
265 SQLStatementCompiler sqlStatementCompiler = new SQLStatementCompiler(converter, convention);
266 SQLFederationCompilerEngine compilerEngine = new SQLFederationCompilerEngine(databaseName, schemaName, sqlFederationRule.getConfiguration().getExecutionPlanCache());
267 return compilerEngine.compile(buildCacheKey(federationContext, sqlStatementContext, sql, sqlStatementCompiler), false);
268 }
269
270 private ExecutionPlanCacheKey buildCacheKey(final SQLFederationContext federationContext, final SQLStatementContext sqlStatementContext,
271 final String sql, final SQLStatementCompiler sqlStatementCompiler) {
272 ExecutionPlanCacheKey result = new ExecutionPlanCacheKey(sql, sqlStatementContext.getSqlStatement(), sqlStatementCompiler);
273 Collection<SimpleTableSegment> tableSegments = sqlStatementContext.getTablesContext().getSimpleTables();
274 for (SimpleTableSegment each : tableSegments) {
275 String originalDatabase = each.getTableName().getTableBoundInfo().map(optional -> optional.getOriginalDatabase().getValue()).orElse(currentDatabaseName);
276 String originalSchema = each.getTableName().getTableBoundInfo().map(optional -> optional.getOriginalSchema().getValue()).orElse(currentSchemaName);
277 ShardingSphereTable table = federationContext.getMetaData().getDatabase(originalDatabase).getSchema(originalSchema).getTable(each.getTableName().getIdentifier().getValue());
278 ShardingSpherePreconditions.checkNotNull(table, () -> new NoSuchTableException(each.getTableName().getIdentifier().getValue()));
279 result.getTableMetaDataVersions().put(Joiner.on(".").join(Arrays.asList(originalDatabase, originalSchema, table.getName())), 0);
280 }
281 return result;
282 }
283
284
285
286
287
288
289 public ResultSet getResultSet() {
290 SQLStatement sqlStatement = queryContext.getSqlStatementContext().getSqlStatement();
291 return sqlStatement instanceof SelectStatement || sqlStatement instanceof ExplainStatement ? resultSet : null;
292 }
293
294 @Override
295 public void close() throws SQLException {
296 Collection<SQLException> result = new LinkedList<>();
297 closeResultSet().ifPresent(result::add);
298 release();
299 if (result.isEmpty()) {
300 return;
301 }
302 SQLException ex = new SQLException();
303 result.forEach(ex::setNextException);
304 throw ex;
305 }
306
307 private Optional<SQLException> closeResultSet() {
308 try {
309 if (null != resultSet && !resultSet.isClosed()) {
310 resultSet.close();
311 }
312 } catch (final SQLException ex) {
313 return Optional.of(ex);
314 }
315 return Optional.empty();
316 }
317
318 private void release() {
319 if (null != queryContext && null != schemaPlus) {
320 processor.release(currentDatabaseName, currentSchemaName, queryContext, schemaPlus);
321 }
322 }
323 }