1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.sqlfederation.executor.enumerable;
19
20 import lombok.RequiredArgsConstructor;
21 import lombok.SneakyThrows;
22 import org.apache.calcite.linq4j.AbstractEnumerable;
23 import org.apache.calcite.linq4j.Enumerable;
24 import org.apache.calcite.linq4j.Enumerator;
25 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
26 import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
27 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
28 import org.apache.shardingsphere.infra.database.core.metadata.database.system.SystemDatabase;
29 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
30 import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
31 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
32 import org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException;
33 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
34 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
35 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
36 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
37 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
38 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
39 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
40 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
41 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
42 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
43 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
44 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
45 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
46 import org.apache.shardingsphere.infra.hint.HintValueContext;
47 import org.apache.shardingsphere.infra.merge.MergeEngine;
48 import org.apache.shardingsphere.infra.merge.result.MergedResult;
49 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
50 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
51 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
52 import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
53 import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
54 import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
55 import org.apache.shardingsphere.infra.metadata.user.Grantee;
56 import org.apache.shardingsphere.infra.parser.sql.SQLStatementParserEngine;
57 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
58 import org.apache.shardingsphere.infra.session.query.QueryContext;
59 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
60 import org.apache.shardingsphere.sqlfederation.executor.constant.EnumerableConstants;
61 import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
62 import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
63 import org.apache.shardingsphere.sqlfederation.executor.enumerator.JDBCRowEnumerator;
64 import org.apache.shardingsphere.sqlfederation.executor.enumerator.MemoryRowEnumerator;
65 import org.apache.shardingsphere.sqlfederation.executor.utils.StatisticsAssembleUtils;
66 import org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
67 import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.EmptyRowEnumerator;
68 import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutor;
69 import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutorContext;
70
71 import java.sql.Connection;
72 import java.sql.PreparedStatement;
73 import java.sql.SQLException;
74 import java.sql.Statement;
75 import java.util.ArrayList;
76 import java.util.Collection;
77 import java.util.Collections;
78 import java.util.LinkedList;
79 import java.util.List;
80 import java.util.Optional;
81 import java.util.stream.Collectors;
82
83
84
85
86 @RequiredArgsConstructor
87 public final class EnumerableScanExecutor implements ScanExecutor {
88
89 private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine;
90
91 private final JDBCExecutor jdbcExecutor;
92
93 private final JDBCExecutorCallback<? extends ExecuteResult> callback;
94
95 private final OptimizerContext optimizerContext;
96
97 private final SQLFederationExecutorContext executorContext;
98
99 private final SQLFederationContext federationContext;
100
101 private final RuleMetaData globalRuleMetaData;
102
103 private final ShardingSphereStatistics statistics;
104
105 private final ProcessEngine processEngine = new ProcessEngine();
106
107 @Override
108 public Enumerable<Object> execute(final ShardingSphereTable table, final ScanExecutorContext scanContext) {
109 String databaseName = executorContext.getDatabaseName();
110 String schemaName = executorContext.getSchemaName();
111 DatabaseType databaseType = optimizerContext.getParserContext(databaseName).getDatabaseType();
112 if (new SystemDatabase(databaseType).getSystemSchemas().contains(schemaName)) {
113 return createMemoryEnumerable(databaseName, schemaName, table, databaseType);
114 }
115 QueryContext queryContext = createQueryContext(federationContext.getMetaData(), scanContext, databaseType, federationContext.getQueryContext().isUseCache());
116 ShardingSphereDatabase database = federationContext.getMetaData().getDatabase(databaseName);
117 ExecutionContext executionContext = new KernelProcessor().generateExecutionContext(queryContext, database, globalRuleMetaData, executorContext.getProps(), new ConnectionContext());
118 if (federationContext.isPreview()) {
119 federationContext.getPreviewExecutionUnits().addAll(executionContext.getExecutionUnits());
120 return createEmptyEnumerable();
121 }
122 return createJDBCEnumerable(queryContext, database, executionContext);
123 }
124
125 private AbstractEnumerable<Object> createJDBCEnumerable(final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext context) {
126 return new AbstractEnumerable<Object>() {
127
128 @SneakyThrows
129 @Override
130 public Enumerator<Object> enumerator() {
131 computeConnectionOffsets(context);
132
133 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(context.getRouteContext(), executorContext.getConnectionOffsets(), context.getExecutionUnits(),
134 new ExecutionGroupReportContext(federationContext.getProcessId(), database.getName(), new Grantee("", "")));
135 setParameters(executionGroupContext.getInputGroups());
136 ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(federationContext.getProcessId()).isInterrupted(),
137 SQLExecutionInterruptedException::new);
138 processEngine.executeSQL(executionGroupContext, federationContext.getQueryContext());
139 List<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
140 MergeEngine mergeEngine = new MergeEngine(federationContext.getMetaData().getGlobalRuleMetaData(), database, executorContext.getProps(), new ConnectionContext());
141 MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
142 Collection<Statement> statements = getStatements(executionGroupContext.getInputGroups());
143 return new JDBCRowEnumerator(mergedResult, queryResults.get(0).getMetaData(), statements);
144 }
145 };
146 }
147
148 private void computeConnectionOffsets(final ExecutionContext context) {
149 for (ExecutionUnit each : context.getExecutionUnits()) {
150 if (executorContext.getConnectionOffsets().containsKey(each.getDataSourceName())) {
151 int connectionOffset = executorContext.getConnectionOffsets().get(each.getDataSourceName());
152 executorContext.getConnectionOffsets().put(each.getDataSourceName(), ++connectionOffset);
153 } else {
154 executorContext.getConnectionOffsets().put(each.getDataSourceName(), 0);
155 }
156 }
157 }
158
159 private Enumerable<Object> createMemoryEnumerable(final String databaseName, final String schemaName, final ShardingSphereTable table, final DatabaseType databaseType) {
160 if (databaseType instanceof OpenGaussDatabaseType && EnumerableConstants.SYSTEM_CATALOG_TABLES.contains(table.getName())) {
161 return createMemoryEnumerator(StatisticsAssembleUtils.assembleTableData(table, federationContext.getMetaData()), table, databaseType);
162 }
163 Optional<ShardingSphereTableData> tableData = Optional.ofNullable(statistics.getDatabase(databaseName))
164 .map(optional -> optional.getSchema(schemaName)).map(optional -> optional.getTable(table.getName()));
165 return tableData.map(optional -> createMemoryEnumerator(optional, table, databaseType)).orElseGet(this::createEmptyEnumerable);
166 }
167
168 private Enumerable<Object> createMemoryEnumerator(final ShardingSphereTableData tableData, final ShardingSphereTable table, final DatabaseType databaseType) {
169 return new AbstractEnumerable<Object>() {
170
171 @Override
172 public Enumerator<Object> enumerator() {
173 return new MemoryRowEnumerator(tableData.getRows(), table.getColumns().values(), databaseType);
174 }
175 };
176 }
177
178 private Collection<Statement> getStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) {
179 Collection<Statement> result = new LinkedList<>();
180 for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) {
181 for (JDBCExecutionUnit executionUnit : each.getInputs()) {
182 result.add(executionUnit.getStorageResource());
183 }
184 }
185 return result;
186 }
187
188 private void setParameters(final Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) {
189 for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) {
190 for (JDBCExecutionUnit executionUnit : each.getInputs()) {
191 if (!(executionUnit.getStorageResource() instanceof PreparedStatement)) {
192 continue;
193 }
194 setParameters((PreparedStatement) executionUnit.getStorageResource(), executionUnit.getExecutionUnit().getSqlUnit().getParameters());
195 }
196 }
197 }
198
199 @SneakyThrows(SQLException.class)
200 private void setParameters(final PreparedStatement preparedStatement, final List<Object> params) {
201 for (int i = 0; i < params.size(); i++) {
202 preparedStatement.setObject(i + 1, params.get(i));
203 }
204 }
205
206 private QueryContext createQueryContext(final ShardingSphereMetaData metaData, final ScanExecutorContext sqlString, final DatabaseType databaseType, final boolean useCache) {
207 String sql = sqlString.getSql().replace(System.lineSeparator(), " ");
208 SQLStatement sqlStatement = new SQLStatementParserEngine(databaseType,
209 optimizerContext.getSqlParserRule().getSqlStatementCache(), optimizerContext.getSqlParserRule().getParseTreeCache()).parse(sql, useCache);
210 List<Object> params = getParameters(sqlString.getParamIndexes());
211 HintValueContext hintValueContext = new HintValueContext();
212 SQLStatementContext sqlStatementContext = new SQLBindEngine(metaData, executorContext.getDatabaseName(), hintValueContext).bind(sqlStatement, params);
213 return new QueryContext(sqlStatementContext, sql, params, hintValueContext, useCache);
214 }
215
216 private List<Object> getParameters(final int[] paramIndexes) {
217 if (null == paramIndexes) {
218 return Collections.emptyList();
219 }
220 List<Object> result = new ArrayList<>();
221 for (int each : paramIndexes) {
222 result.add(federationContext.getQueryContext().getParameters().get(each));
223 }
224 return result;
225 }
226
227 private AbstractEnumerable<Object> createEmptyEnumerable() {
228 return new AbstractEnumerable<Object>() {
229
230 @Override
231 public Enumerator<Object> enumerator() {
232 return new EmptyRowEnumerator();
233 }
234 };
235 }
236 }