1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
19
20 import org.apache.shardingsphere.driver.executor.engine.batch.preparedstatement.BatchExecutionUnit;
21 import org.apache.shardingsphere.driver.executor.engine.batch.preparedstatement.BatchPreparedStatementExecutor;
22 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
23 import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
24 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
25 import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
26 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27 import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
28 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
29 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
30 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
31 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
32 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
33 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
34 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
35 import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
36 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
37 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
38 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
39 import org.apache.shardingsphere.infra.hint.HintValueContext;
40 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
41 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
42 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
43 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
44 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
45 import org.apache.shardingsphere.infra.session.query.QueryContext;
46 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
47 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
48 import org.apache.shardingsphere.parser.rule.SQLParserRule;
49 import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
50 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
51 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
52 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
53 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
54 import org.apache.shardingsphere.proxy.backend.response.header.update.MultiStatementsUpdateResponseHeader;
55 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
56 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
57 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
58 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
59 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.UpdateStatement;
60
61 import java.sql.Connection;
62 import java.sql.SQLException;
63 import java.util.ArrayList;
64 import java.util.Arrays;
65 import java.util.Collection;
66 import java.util.Collections;
67 import java.util.LinkedList;
68 import java.util.List;
69 import java.util.regex.Pattern;
70
71
72
73
74 public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
75
76 private static final Pattern MULTI_INSERT_STATEMENTS = Pattern.compile(";(?=\\s*insert)", Pattern.CASE_INSENSITIVE);
77
78 private static final Pattern MULTI_UPDATE_STATEMENTS = Pattern.compile(";(?=\\s*update)", Pattern.CASE_INSENSITIVE);
79
80 private static final Pattern MULTI_DELETE_STATEMENTS = Pattern.compile(";(?=\\s*delete)", Pattern.CASE_INSENSITIVE);
81
82 private final DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL");
83
84 private final MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
85
86 private final Collection<QueryContext> multiSQLQueryContexts = new LinkedList<>();
87
88 private final ConnectionSession connectionSession;
89
90 private final SQLStatement sqlStatementSample;
91
92 private final BatchPreparedStatementExecutor batchExecutor;
93
94 public MySQLMultiStatementsHandler(final ConnectionSession connectionSession, final SQLStatement sqlStatementSample, final String sql) {
95 connectionSession.getDatabaseConnectionManager().handleAutoCommit();
96 this.connectionSession = connectionSession;
97 this.sqlStatementSample = sqlStatementSample;
98 JDBCExecutor jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), connectionSession.getConnectionContext());
99 batchExecutor = new BatchPreparedStatementExecutor(metaDataContexts.getMetaData().getDatabase(connectionSession.getUsedDatabaseName()), jdbcExecutor, connectionSession.getProcessId());
100 Pattern pattern = getPattern(sqlStatementSample);
101 SQLParserEngine sqlParserEngine = getSQLParserEngine();
102 for (String each : extractMultiStatements(pattern, sql)) {
103 SQLStatement eachSQLStatement = sqlParserEngine.parse(each, false);
104 multiSQLQueryContexts.add(createQueryContext(each, eachSQLStatement));
105 }
106 }
107
108 private Pattern getPattern(final SQLStatement sqlStatementSample) {
109 if (sqlStatementSample instanceof InsertStatement) {
110 return MULTI_INSERT_STATEMENTS;
111 }
112 return sqlStatementSample instanceof UpdateStatement ? MULTI_UPDATE_STATEMENTS : MULTI_DELETE_STATEMENTS;
113 }
114
115 private SQLParserEngine getSQLParserEngine() {
116 MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
117 SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
118 return sqlParserRule.getSQLParserEngine(databaseType);
119 }
120
121 private List<String> extractMultiStatements(final Pattern pattern, final String sql) {
122
123 return Arrays.asList(pattern.split(sql));
124 }
125
126 private QueryContext createQueryContext(final String sql, final SQLStatement sqlStatement) {
127 HintValueContext hintValueContext = SQLHintUtils.extractHint(sql);
128 SQLStatementContext sqlStatementContext = new SQLBindEngine(
129 metaDataContexts.getMetaData(), connectionSession.getCurrentDatabaseName(), hintValueContext).bind(databaseType, sqlStatement, Collections.emptyList());
130 return new QueryContext(sqlStatementContext, sql, Collections.emptyList(), hintValueContext, connectionSession.getConnectionContext(), metaDataContexts.getMetaData());
131 }
132
133 @Override
134 public ResponseHeader execute() throws SQLException {
135 Collection<ShardingSphereRule> rules = metaDataContexts.getMetaData().getDatabase(connectionSession.getUsedDatabaseName()).getRuleMetaData().getRules();
136 int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
137 DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine =
138 new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connectionSession.getDatabaseConnectionManager(),
139 (JDBCBackendStatement) connectionSession.getStatementManager(), new StatementOption(false), rules, metaDataContexts.getMetaData());
140 return executeMultiStatements(prepareEngine);
141 }
142
143 private ResponseHeader executeMultiStatements(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
144 Collection<ExecutionContext> executionContexts = createExecutionContexts();
145 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
146 prepareEngine.prepare(connectionSession.getUsedDatabaseName(), executionContexts.iterator().next().getRouteContext(), createExecutionUnits(),
147 new ExecutionGroupReportContext(connectionSession.getProcessId(), connectionSession.getUsedDatabaseName(), connectionSession.getConnectionContext().getGrantee()));
148 batchExecutor.init(executionGroupContext);
149 executeAddBatch(executionGroupContext);
150 return new MultiStatementsUpdateResponseHeader(buildUpdateResponseHeaders(batchExecutor.executeBatch(multiSQLQueryContexts.iterator().next().getSqlStatementContext())));
151 }
152
153 private List<ExecutionUnit> createExecutionUnits() {
154 List<ExecutionUnit> result = new ArrayList<>(batchExecutor.getBatchExecutionUnits().size());
155 for (BatchExecutionUnit each : batchExecutor.getBatchExecutionUnits()) {
156 ExecutionUnit executionUnit = each.getExecutionUnit();
157 result.add(executionUnit);
158 }
159 return result;
160 }
161
162 private Collection<ExecutionContext> createExecutionContexts() {
163 Collection<ExecutionContext> result = new LinkedList<>();
164 for (QueryContext each : multiSQLQueryContexts) {
165 ExecutionContext executionContext = createExecutionContext(each);
166 batchExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
167 result.add(executionContext);
168 }
169 return result;
170 }
171
172 private ExecutionContext createExecutionContext(final QueryContext queryContext) {
173 RuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();
174 ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(connectionSession.getCurrentDatabaseName());
175 SQLAuditEngine.audit(queryContext, globalRuleMetaData, currentDatabase);
176 return new KernelProcessor().generateExecutionContext(queryContext, globalRuleMetaData, metaDataContexts.getMetaData().getProps());
177 }
178
179 private void executeAddBatch(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext) throws SQLException {
180 for (ExecutionGroup<JDBCExecutionUnit> each : executionGroupContext.getInputGroups()) {
181 for (JDBCExecutionUnit unit : each.getInputs()) {
182 unit.getStorageResource().addBatch(unit.getExecutionUnit().getSqlUnit().getSql());
183 }
184 }
185 }
186
187 private Collection<UpdateResponseHeader> buildUpdateResponseHeaders(final int[] updateCounts) {
188 Collection<UpdateResponseHeader> result = new LinkedList<>();
189 for (int each : updateCounts) {
190 result.add(new UpdateResponseHeader(sqlStatementSample, Collections.singletonList(new UpdateResult(each, 0L))));
191 }
192 return result;
193 }
194 }