View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
19  
20  import org.apache.shardingsphere.driver.executor.engine.batch.preparedstatement.BatchExecutionUnit;
21  import org.apache.shardingsphere.driver.executor.engine.batch.preparedstatement.BatchPreparedStatementExecutor;
22  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
23  import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
24  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
25  import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
26  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27  import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
28  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
29  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
30  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
31  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
32  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
33  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
34  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
35  import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
36  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
37  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
38  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
39  import org.apache.shardingsphere.infra.hint.HintValueContext;
40  import org.apache.shardingsphere.infra.hint.SQLHintUtils;
41  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
42  import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
43  import org.apache.shardingsphere.infra.parser.SQLParserEngine;
44  import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
45  import org.apache.shardingsphere.infra.session.query.QueryContext;
46  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
47  import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
48  import org.apache.shardingsphere.parser.rule.SQLParserRule;
49  import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
50  import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
51  import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
52  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
53  import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
54  import org.apache.shardingsphere.proxy.backend.response.header.update.MultiStatementsUpdateResponseHeader;
55  import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
56  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
57  import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
58  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
59  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.UpdateStatement;
60  
61  import java.sql.Connection;
62  import java.sql.SQLException;
63  import java.util.ArrayList;
64  import java.util.Arrays;
65  import java.util.Collection;
66  import java.util.Collections;
67  import java.util.LinkedList;
68  import java.util.List;
69  import java.util.regex.Pattern;
70  
71  /**
72   * Handler for MySQL multi statements.
73   */
74  public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
75      
76      private static final Pattern MULTI_INSERT_STATEMENTS = Pattern.compile(";(?=\\s*insert)", Pattern.CASE_INSENSITIVE);
77      
78      private static final Pattern MULTI_UPDATE_STATEMENTS = Pattern.compile(";(?=\\s*update)", Pattern.CASE_INSENSITIVE);
79      
80      private static final Pattern MULTI_DELETE_STATEMENTS = Pattern.compile(";(?=\\s*delete)", Pattern.CASE_INSENSITIVE);
81      
82      private final DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL");
83      
84      private final MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
85      
86      private final Collection<QueryContext> multiSQLQueryContexts = new LinkedList<>();
87      
88      private final ConnectionSession connectionSession;
89      
90      private final SQLStatement sqlStatementSample;
91      
92      private final BatchPreparedStatementExecutor batchExecutor;
93      
94      public MySQLMultiStatementsHandler(final ConnectionSession connectionSession, final SQLStatement sqlStatementSample, final String sql) {
95          connectionSession.getDatabaseConnectionManager().handleAutoCommit();
96          this.connectionSession = connectionSession;
97          this.sqlStatementSample = sqlStatementSample;
98          JDBCExecutor jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), connectionSession.getConnectionContext());
99          batchExecutor = new BatchPreparedStatementExecutor(metaDataContexts.getMetaData().getDatabase(connectionSession.getUsedDatabaseName()), jdbcExecutor, connectionSession.getProcessId());
100         Pattern pattern = getPattern(sqlStatementSample);
101         SQLParserEngine sqlParserEngine = getSQLParserEngine();
102         for (String each : extractMultiStatements(pattern, sql)) {
103             SQLStatement eachSQLStatement = sqlParserEngine.parse(each, false);
104             multiSQLQueryContexts.add(createQueryContext(each, eachSQLStatement));
105         }
106     }
107     
108     private Pattern getPattern(final SQLStatement sqlStatementSample) {
109         if (sqlStatementSample instanceof InsertStatement) {
110             return MULTI_INSERT_STATEMENTS;
111         }
112         return sqlStatementSample instanceof UpdateStatement ? MULTI_UPDATE_STATEMENTS : MULTI_DELETE_STATEMENTS;
113     }
114     
115     private SQLParserEngine getSQLParserEngine() {
116         MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
117         SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
118         return sqlParserRule.getSQLParserEngine(databaseType);
119     }
120     
121     private List<String> extractMultiStatements(final Pattern pattern, final String sql) {
122         // TODO Multi statements should be split by SQL Parser instead of simple regexp.
123         return Arrays.asList(pattern.split(sql));
124     }
125     
126     private QueryContext createQueryContext(final String sql, final SQLStatement sqlStatement) {
127         HintValueContext hintValueContext = SQLHintUtils.extractHint(sql);
128         SQLStatementContext sqlStatementContext = new SQLBindEngine(
129                 metaDataContexts.getMetaData(), connectionSession.getCurrentDatabaseName(), hintValueContext).bind(databaseType, sqlStatement, Collections.emptyList());
130         return new QueryContext(sqlStatementContext, sql, Collections.emptyList(), hintValueContext, connectionSession.getConnectionContext(), metaDataContexts.getMetaData());
131     }
132     
133     @Override
134     public ResponseHeader execute() throws SQLException {
135         Collection<ShardingSphereRule> rules = metaDataContexts.getMetaData().getDatabase(connectionSession.getUsedDatabaseName()).getRuleMetaData().getRules();
136         int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
137         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine =
138                 new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connectionSession.getDatabaseConnectionManager(),
139                         (JDBCBackendStatement) connectionSession.getStatementManager(), new StatementOption(false), rules, metaDataContexts.getMetaData());
140         return executeMultiStatements(prepareEngine);
141     }
142     
143     private ResponseHeader executeMultiStatements(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
144         Collection<ExecutionContext> executionContexts = createExecutionContexts();
145         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
146                 prepareEngine.prepare(connectionSession.getUsedDatabaseName(), executionContexts.iterator().next().getRouteContext(), createExecutionUnits(),
147                         new ExecutionGroupReportContext(connectionSession.getProcessId(), connectionSession.getUsedDatabaseName(), connectionSession.getConnectionContext().getGrantee()));
148         batchExecutor.init(executionGroupContext);
149         executeAddBatch(executionGroupContext);
150         return new MultiStatementsUpdateResponseHeader(buildUpdateResponseHeaders(batchExecutor.executeBatch(multiSQLQueryContexts.iterator().next().getSqlStatementContext())));
151     }
152     
153     private List<ExecutionUnit> createExecutionUnits() {
154         List<ExecutionUnit> result = new ArrayList<>(batchExecutor.getBatchExecutionUnits().size());
155         for (BatchExecutionUnit each : batchExecutor.getBatchExecutionUnits()) {
156             ExecutionUnit executionUnit = each.getExecutionUnit();
157             result.add(executionUnit);
158         }
159         return result;
160     }
161     
162     private Collection<ExecutionContext> createExecutionContexts() {
163         Collection<ExecutionContext> result = new LinkedList<>();
164         for (QueryContext each : multiSQLQueryContexts) {
165             ExecutionContext executionContext = createExecutionContext(each);
166             batchExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
167             result.add(executionContext);
168         }
169         return result;
170     }
171     
172     private ExecutionContext createExecutionContext(final QueryContext queryContext) {
173         RuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();
174         ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(connectionSession.getCurrentDatabaseName());
175         SQLAuditEngine.audit(queryContext, globalRuleMetaData, currentDatabase);
176         return new KernelProcessor().generateExecutionContext(queryContext, globalRuleMetaData, metaDataContexts.getMetaData().getProps());
177     }
178     
179     private void executeAddBatch(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext) throws SQLException {
180         for (ExecutionGroup<JDBCExecutionUnit> each : executionGroupContext.getInputGroups()) {
181             for (JDBCExecutionUnit unit : each.getInputs()) {
182                 unit.getStorageResource().addBatch(unit.getExecutionUnit().getSqlUnit().getSql());
183             }
184         }
185     }
186     
187     private Collection<UpdateResponseHeader> buildUpdateResponseHeaders(final int[] updateCounts) {
188         Collection<UpdateResponseHeader> result = new LinkedList<>();
189         for (int each : updateCounts) {
190             result.add(new UpdateResponseHeader(sqlStatementSample, Collections.singletonList(new UpdateResult(each, 0L))));
191         }
192         return result;
193     }
194 }