1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.backend.handler.distsql.rul;
19
20 import com.google.common.base.Preconditions;
21 import lombok.Setter;
22 import org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorConnectionContextAware;
23 import org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
24 import org.apache.shardingsphere.distsql.handler.engine.DistSQLConnectionContext;
25 import org.apache.shardingsphere.distsql.handler.engine.query.DistSQLQueryExecutor;
26 import org.apache.shardingsphere.distsql.statement.rul.sql.PreviewStatement;
27 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
28 import org.apache.shardingsphere.infra.binder.context.statement.type.ddl.CursorHeldSQLStatementContext;
29 import org.apache.shardingsphere.infra.binder.context.statement.type.ddl.CursorStatementContext;
30 import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
31 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
32 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
33 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
34 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
35 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
36 import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.EmptyRuleException;
37 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
38 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
39 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
40 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
41 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
42 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
43 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
44 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
45 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
46 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
47 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
48 import org.apache.shardingsphere.infra.hint.HintValueContext;
49 import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
50 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
51 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
52 import org.apache.shardingsphere.infra.session.query.QueryContext;
53 import org.apache.shardingsphere.mode.manager.ContextManager;
54 import org.apache.shardingsphere.parser.rule.SQLParserRule;
55 import org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
56 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
57 import org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.cursor.CursorNameSegment;
58 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
59 import org.apache.shardingsphere.sql.parser.statement.core.statement.attribute.type.CursorSQLStatementAttribute;
60 import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
61 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
62
63 import java.sql.Connection;
64 import java.sql.SQLException;
65 import java.sql.Statement;
66 import java.util.Arrays;
67 import java.util.Collection;
68 import java.util.Collections;
69 import java.util.Optional;
70 import java.util.stream.Collectors;
71
72
73
74
75 @Setter
76 public final class PreviewExecutor implements DistSQLQueryExecutor<PreviewStatement>, DistSQLExecutorDatabaseAware, DistSQLExecutorConnectionContextAware {
77
78 private ShardingSphereDatabase database;
79
80 private DistSQLConnectionContext connectionContext;
81
82 @Override
83 public Collection<String> getColumnNames(final PreviewStatement sqlStatement) {
84 return Arrays.asList("data_source_name", "actual_sql");
85 }
86
87 @Override
88 public Collection<LocalDataQueryResultRow> getRows(final PreviewStatement sqlStatement, final ContextManager contextManager) throws SQLException {
89 ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
90 String toBePreviewedSQL = sqlStatement.getSql();
91 SQLStatement toBePreviewedStatement = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(database.getProtocolType()).parse(toBePreviewedSQL, false);
92 HintValueContext hintValueContext = connectionContext.getQueryContext().getHintValueContext();
93 hintValueContext.setSkipMetadataValidate(true);
94 String currentDatabaseName = connectionContext.getQueryContext().getConnectionContext().getCurrentDatabaseName().orElse(null);
95 SQLStatementContext toBePreviewedStatementContext = new SQLBindEngine(
96 metaData, currentDatabaseName, hintValueContext).bind(database.getProtocolType(), toBePreviewedStatement, Collections.emptyList());
97 QueryContext queryContext = new QueryContext(
98 toBePreviewedStatementContext, toBePreviewedSQL, Collections.emptyList(), hintValueContext, connectionContext.getQueryContext().getConnectionContext(), metaData);
99 if (toBePreviewedStatementContext.getSqlStatement().getAttributes().findAttribute(CursorSQLStatementAttribute.class).isPresent()
100 && toBePreviewedStatementContext instanceof CursorHeldSQLStatementContext) {
101 setUpCursorDefinition((CursorHeldSQLStatementContext) toBePreviewedStatementContext);
102 }
103 ShardingSpherePreconditions.checkState(database.isComplete(), () -> new EmptyRuleException(database.getName()));
104 String schemaName = getSchemaName(queryContext.getSqlStatementContext(), database);
105 Collection<ExecutionUnit> executionUnits = getExecutionUnits(contextManager, schemaName, metaData, queryContext);
106 return executionUnits.stream().map(each -> new LocalDataQueryResultRow(each.getDataSourceName(), each.getSqlUnit().getSql())).collect(Collectors.toList());
107 }
108
109 private String getSchemaName(final SQLStatementContext sqlStatementContext, final ShardingSphereDatabase database) {
110 String defaultSchemaName = new DatabaseTypeRegistry(sqlStatementContext.getDatabaseType()).getDefaultSchemaName(database.getName());
111 return sqlStatementContext.getTablesContext().getSchemaName().orElse(defaultSchemaName);
112 }
113
114 private Collection<ExecutionUnit> getExecutionUnits(final ContextManager contextManager, final String schemaName, final ShardingSphereMetaData metaData,
115 final QueryContext queryContext) {
116 JDBCExecutor jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), connectionContext.getQueryContext().getConnectionContext());
117 SQLFederationEngine federationEngine = new SQLFederationEngine(database.getName(), schemaName, metaData, contextManager.getMetaDataContexts().getStatistics(), jdbcExecutor);
118 if (federationEngine.decide(queryContext, metaData.getGlobalRuleMetaData())) {
119 return getFederationExecutionUnits(queryContext, metaData, federationEngine);
120 }
121 return new KernelProcessor().generateExecutionContext(queryContext, metaData.getGlobalRuleMetaData(), metaData.getProps()).getExecutionUnits();
122 }
123
124 private void setUpCursorDefinition(final CursorHeldSQLStatementContext toBePreviewedStatementContext) {
125 Optional<CursorNameSegment> cursorNameSegment = toBePreviewedStatementContext.getSqlStatement().getAttributes().getAttribute(CursorSQLStatementAttribute.class).getCursorName();
126 if (!cursorNameSegment.isPresent()) {
127 return;
128 }
129 String cursorName = cursorNameSegment.get().getIdentifier().getValue().toLowerCase();
130 CursorStatementContext cursorStatementContext = connectionContext.getQueryContext().getConnectionContext().getCursorContext().getCursorStatementContexts().get(cursorName);
131 Preconditions.checkNotNull(cursorStatementContext, "Cursor %s does not exist.", cursorName);
132 toBePreviewedStatementContext.setCursorStatementContext(cursorStatementContext);
133 }
134
135 private Collection<ExecutionUnit> getFederationExecutionUnits(final QueryContext queryContext, final ShardingSphereMetaData metaData, final SQLFederationEngine federationEngine) {
136 SQLStatement sqlStatement = queryContext.getSqlStatementContext().getSqlStatement();
137 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(metaData);
138 SQLFederationContext context = new SQLFederationContext(true, queryContext, metaData,
139 ((ProxyDatabaseConnectionManager) connectionContext.getDatabaseConnectionManager()).getConnectionSession().getProcessId());
140 federationEngine.executeQuery(prepareEngine, createPreviewCallback(sqlStatement), context);
141 return context.getPreviewExecutionUnits();
142 }
143
144 private JDBCExecutorCallback<ExecuteResult> createPreviewCallback(final SQLStatement sqlStatement) {
145 return new JDBCExecutorCallback<ExecuteResult>(database.getProtocolType(), database.getResourceMetaData(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown()) {
146
147 @Override
148 protected ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
149 return new JDBCStreamQueryResult(statement.executeQuery(sql));
150 }
151
152 @Override
153 protected Optional<ExecuteResult> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
154 return Optional.empty();
155 }
156 };
157 }
158
159 @SuppressWarnings({"unchecked", "rawtypes"})
160 private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine(final ShardingSphereMetaData metaData) {
161 int maxConnectionsSizePerQuery = metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
162 return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connectionContext.getDatabaseConnectionManager(),
163 connectionContext.getExecutorStatementManager(), new StatementOption(false), database.getRuleMetaData().getRules(), metaData);
164 }
165
166 @Override
167 public Class<PreviewStatement> getType() {
168 return PreviewStatement.class;
169 }
170 }