1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
19
20 import lombok.Getter;
21 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
22 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
23 import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
24 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.MySQLTextResultSetRowPacket;
25 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
26 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
27 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
28 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
29 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
30 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
31 import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
32 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
33 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
34 import org.apache.shardingsphere.proxy.backend.response.header.update.MultiStatementsUpdateResponseHeader;
35 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
36 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
37 import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
38 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
39 import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
40 import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
41 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
42 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DeleteStatement;
43 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
44 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.UpdateStatement;
45
46 import java.sql.SQLException;
47 import java.util.Collection;
48 import java.util.LinkedList;
49
50
51
52
53 public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
54
55 private final ConnectionSession connectionSession;
56
57 private final ProxyBackendHandler proxyBackendHandler;
58
59 private final int characterSet;
60
61 @Getter
62 private volatile ResponseType responseType;
63
64 public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
65 this.connectionSession = connectionSession;
66 DatabaseType protocolDatabaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL");
67 DatabaseType parserDatabaseType = ProxySQLComQueryParser.getParserDatabaseType(protocolDatabaseType, connectionSession);
68 SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), parserDatabaseType);
69 proxyBackendHandler = areMultiStatements(connectionSession, sqlStatement, packet.getSQL()) ? new MySQLMultiStatementsHandler(connectionSession, sqlStatement, packet.getSQL())
70 : ProxyBackendHandlerFactory.newInstance(protocolDatabaseType, parserDatabaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
71 characterSet = connectionSession.getAttributeMap().attr(MySQLConstants.CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
72 }
73
74 private boolean areMultiStatements(final ConnectionSession connectionSession, final SQLStatement sqlStatement, final String sql) {
75
76 return isMultiStatementsEnabled(connectionSession) && isSuitableMultiStatementsSQLStatement(sqlStatement) && sql.contains(";");
77 }
78
79 private boolean isMultiStatementsEnabled(final ConnectionSession connectionSession) {
80 return connectionSession.getAttributeMap().hasAttr(MySQLConstants.OPTION_MULTI_STATEMENTS_ATTRIBUTE_KEY)
81 && MySQLComSetOptionPacket.MYSQL_OPTION_MULTI_STATEMENTS_ON == connectionSession.getAttributeMap().attr(MySQLConstants.OPTION_MULTI_STATEMENTS_ATTRIBUTE_KEY).get();
82 }
83
84 private boolean isSuitableMultiStatementsSQLStatement(final SQLStatement sqlStatement) {
85 return containsInsertOnDuplicateKey(sqlStatement) || sqlStatement instanceof UpdateStatement || sqlStatement instanceof DeleteStatement;
86 }
87
88 private boolean containsInsertOnDuplicateKey(final SQLStatement sqlStatement) {
89 return sqlStatement instanceof InsertStatement && ((InsertStatement) sqlStatement).getOnDuplicateKeyColumns().isPresent();
90 }
91
92 @Override
93 public Collection<DatabasePacket> execute() throws SQLException {
94 ResponseHeader responseHeader = proxyBackendHandler.execute();
95 if (responseHeader instanceof QueryResponseHeader) {
96 return processQuery((QueryResponseHeader) responseHeader);
97 }
98 responseType = ResponseType.UPDATE;
99 if (responseHeader instanceof MultiStatementsUpdateResponseHeader) {
100 return processMultiStatementsUpdate((MultiStatementsUpdateResponseHeader) responseHeader);
101 }
102 return processUpdate((UpdateResponseHeader) responseHeader);
103 }
104
105 private Collection<DatabasePacket> processQuery(final QueryResponseHeader queryResponseHeader) {
106 responseType = ResponseType.QUERY;
107 return ResponsePacketBuilder.buildQueryResponsePackets(queryResponseHeader, characterSet, ServerStatusFlagCalculator.calculateFor(connectionSession, true));
108 }
109
110 private Collection<DatabasePacket> processUpdate(final UpdateResponseHeader updateResponseHeader) {
111 return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponseHeader, ServerStatusFlagCalculator.calculateFor(connectionSession, true));
112 }
113
114 private Collection<DatabasePacket> processMultiStatementsUpdate(final MultiStatementsUpdateResponseHeader responseHeader) {
115 Collection<DatabasePacket> result = new LinkedList<>();
116 int index = 0;
117 for (UpdateResponseHeader each : responseHeader.getUpdateResponseHeaders()) {
118 boolean lastPacket = ++index == responseHeader.getUpdateResponseHeaders().size();
119 result.addAll(ResponsePacketBuilder.buildUpdateResponsePackets(each, ServerStatusFlagCalculator.calculateFor(connectionSession, lastPacket)));
120 }
121 return result;
122 }
123
124 @Override
125 public boolean next() throws SQLException {
126 return proxyBackendHandler.next();
127 }
128
129 @Override
130 public MySQLPacket getQueryRowPacket() throws SQLException {
131 return new MySQLTextResultSetRowPacket(proxyBackendHandler.getRowData().getData());
132 }
133
134 @Override
135 public void close() throws SQLException {
136 proxyBackendHandler.close();
137 }
138 }