View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.sqlfederation.executor.enumerable;
19  
20  import lombok.RequiredArgsConstructor;
21  import lombok.SneakyThrows;
22  import org.apache.calcite.linq4j.AbstractEnumerable;
23  import org.apache.calcite.linq4j.Enumerable;
24  import org.apache.calcite.linq4j.Enumerator;
25  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
26  import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
27  import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
28  import org.apache.shardingsphere.infra.database.core.metadata.database.system.SystemDatabase;
29  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
30  import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
31  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
32  import org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException;
33  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
34  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
35  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
36  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
37  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
38  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
39  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
40  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
41  import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
42  import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
43  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
44  import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
45  import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
46  import org.apache.shardingsphere.infra.hint.HintValueContext;
47  import org.apache.shardingsphere.infra.merge.MergeEngine;
48  import org.apache.shardingsphere.infra.merge.result.MergedResult;
49  import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
50  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
51  import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
52  import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
53  import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
54  import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
55  import org.apache.shardingsphere.infra.metadata.user.Grantee;
56  import org.apache.shardingsphere.infra.parser.sql.SQLStatementParserEngine;
57  import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
58  import org.apache.shardingsphere.infra.session.query.QueryContext;
59  import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
60  import org.apache.shardingsphere.sqlfederation.executor.constant.EnumerableConstants;
61  import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
62  import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
63  import org.apache.shardingsphere.sqlfederation.executor.enumerator.JDBCRowEnumerator;
64  import org.apache.shardingsphere.sqlfederation.executor.enumerator.MemoryRowEnumerator;
65  import org.apache.shardingsphere.sqlfederation.executor.utils.StatisticsAssembleUtils;
66  import org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
67  import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.EmptyRowEnumerator;
68  import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutor;
69  import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutorContext;
70  
71  import java.sql.Connection;
72  import java.sql.PreparedStatement;
73  import java.sql.SQLException;
74  import java.sql.Statement;
75  import java.util.ArrayList;
76  import java.util.Collection;
77  import java.util.Collections;
78  import java.util.LinkedList;
79  import java.util.List;
80  import java.util.Optional;
81  import java.util.stream.Collectors;
82  
83  /**
84   * Enumerable scan executor.
85   */
86  @RequiredArgsConstructor
87  public final class EnumerableScanExecutor implements ScanExecutor {
88      
89      private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine;
90      
91      private final JDBCExecutor jdbcExecutor;
92      
93      private final JDBCExecutorCallback<? extends ExecuteResult> callback;
94      
95      private final OptimizerContext optimizerContext;
96      
97      private final SQLFederationExecutorContext executorContext;
98      
99      private final SQLFederationContext federationContext;
100     
101     private final RuleMetaData globalRuleMetaData;
102     
103     private final ShardingSphereStatistics statistics;
104     
105     private final ProcessEngine processEngine = new ProcessEngine();
106     
107     @Override
108     public Enumerable<Object> execute(final ShardingSphereTable table, final ScanExecutorContext scanContext) {
109         String databaseName = executorContext.getDatabaseName();
110         String schemaName = executorContext.getSchemaName();
111         DatabaseType databaseType = optimizerContext.getParserContext(databaseName).getDatabaseType();
112         if (new SystemDatabase(databaseType).getSystemSchemas().contains(schemaName)) {
113             return createMemoryEnumerable(databaseName, schemaName, table, databaseType);
114         }
115         QueryContext queryContext = createQueryContext(federationContext.getMetaData(), scanContext, databaseType, federationContext.getQueryContext().isUseCache());
116         ShardingSphereDatabase database = federationContext.getMetaData().getDatabase(databaseName);
117         ExecutionContext executionContext = new KernelProcessor().generateExecutionContext(queryContext, database, globalRuleMetaData, executorContext.getProps(), new ConnectionContext());
118         if (federationContext.isPreview()) {
119             federationContext.getPreviewExecutionUnits().addAll(executionContext.getExecutionUnits());
120             return createEmptyEnumerable();
121         }
122         return createJDBCEnumerable(queryContext, database, executionContext);
123     }
124     
125     private AbstractEnumerable<Object> createJDBCEnumerable(final QueryContext queryContext, final ShardingSphereDatabase database, final ExecutionContext context) {
126         return new AbstractEnumerable<Object>() {
127             
128             @SneakyThrows
129             @Override
130             public Enumerator<Object> enumerator() {
131                 computeConnectionOffsets(context);
132                 // TODO pass grantee from proxy and jdbc adapter
133                 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(context.getRouteContext(), executorContext.getConnectionOffsets(), context.getExecutionUnits(),
134                         new ExecutionGroupReportContext(federationContext.getProcessId(), database.getName(), new Grantee("", "")));
135                 setParameters(executionGroupContext.getInputGroups());
136                 ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(federationContext.getProcessId()).isInterrupted(),
137                         SQLExecutionInterruptedException::new);
138                 processEngine.executeSQL(executionGroupContext, federationContext.getQueryContext());
139                 List<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
140                 MergeEngine mergeEngine = new MergeEngine(federationContext.getMetaData().getGlobalRuleMetaData(), database, executorContext.getProps(), new ConnectionContext());
141                 MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext());
142                 Collection<Statement> statements = getStatements(executionGroupContext.getInputGroups());
143                 return new JDBCRowEnumerator(mergedResult, queryResults.get(0).getMetaData(), statements);
144             }
145         };
146     }
147     
148     private void computeConnectionOffsets(final ExecutionContext context) {
149         for (ExecutionUnit each : context.getExecutionUnits()) {
150             if (executorContext.getConnectionOffsets().containsKey(each.getDataSourceName())) {
151                 int connectionOffset = executorContext.getConnectionOffsets().get(each.getDataSourceName());
152                 executorContext.getConnectionOffsets().put(each.getDataSourceName(), ++connectionOffset);
153             } else {
154                 executorContext.getConnectionOffsets().put(each.getDataSourceName(), 0);
155             }
156         }
157     }
158     
159     private Enumerable<Object> createMemoryEnumerable(final String databaseName, final String schemaName, final ShardingSphereTable table, final DatabaseType databaseType) {
160         if (databaseType instanceof OpenGaussDatabaseType && EnumerableConstants.SYSTEM_CATALOG_TABLES.contains(table.getName())) {
161             return createMemoryEnumerator(StatisticsAssembleUtils.assembleTableData(table, federationContext.getMetaData()), table, databaseType);
162         }
163         Optional<ShardingSphereTableData> tableData = Optional.ofNullable(statistics.getDatabase(databaseName))
164                 .map(optional -> optional.getSchema(schemaName)).map(optional -> optional.getTable(table.getName()));
165         return tableData.map(optional -> createMemoryEnumerator(optional, table, databaseType)).orElseGet(this::createEmptyEnumerable);
166     }
167     
168     private Enumerable<Object> createMemoryEnumerator(final ShardingSphereTableData tableData, final ShardingSphereTable table, final DatabaseType databaseType) {
169         return new AbstractEnumerable<Object>() {
170             
171             @Override
172             public Enumerator<Object> enumerator() {
173                 return new MemoryRowEnumerator(tableData.getRows(), table.getColumns().values(), databaseType);
174             }
175         };
176     }
177     
178     private Collection<Statement> getStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) {
179         Collection<Statement> result = new LinkedList<>();
180         for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) {
181             for (JDBCExecutionUnit executionUnit : each.getInputs()) {
182                 result.add(executionUnit.getStorageResource());
183             }
184         }
185         return result;
186     }
187     
188     private void setParameters(final Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) {
189         for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) {
190             for (JDBCExecutionUnit executionUnit : each.getInputs()) {
191                 if (!(executionUnit.getStorageResource() instanceof PreparedStatement)) {
192                     continue;
193                 }
194                 setParameters((PreparedStatement) executionUnit.getStorageResource(), executionUnit.getExecutionUnit().getSqlUnit().getParameters());
195             }
196         }
197     }
198     
199     @SneakyThrows(SQLException.class)
200     private void setParameters(final PreparedStatement preparedStatement, final List<Object> params) {
201         for (int i = 0; i < params.size(); i++) {
202             preparedStatement.setObject(i + 1, params.get(i));
203         }
204     }
205     
206     private QueryContext createQueryContext(final ShardingSphereMetaData metaData, final ScanExecutorContext sqlString, final DatabaseType databaseType, final boolean useCache) {
207         String sql = sqlString.getSql().replace(System.lineSeparator(), " ");
208         SQLStatement sqlStatement = new SQLStatementParserEngine(databaseType,
209                 optimizerContext.getSqlParserRule().getSqlStatementCache(), optimizerContext.getSqlParserRule().getParseTreeCache()).parse(sql, useCache);
210         List<Object> params = getParameters(sqlString.getParamIndexes());
211         HintValueContext hintValueContext = new HintValueContext();
212         SQLStatementContext sqlStatementContext = new SQLBindEngine(metaData, executorContext.getDatabaseName(), hintValueContext).bind(sqlStatement, params);
213         return new QueryContext(sqlStatementContext, sql, params, hintValueContext, useCache);
214     }
215     
216     private List<Object> getParameters(final int[] paramIndexes) {
217         if (null == paramIndexes) {
218             return Collections.emptyList();
219         }
220         List<Object> result = new ArrayList<>();
221         for (int each : paramIndexes) {
222             result.add(federationContext.getQueryContext().getParameters().get(each));
223         }
224         return result;
225     }
226     
227     private AbstractEnumerable<Object> createEmptyEnumerable() {
228         return new AbstractEnumerable<Object>() {
229             
230             @Override
231             public Enumerator<Object> enumerator() {
232                 return new EmptyRowEnumerator();
233             }
234         };
235     }
236 }