1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
19
20 import lombok.Getter;
21 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
22 import org.apache.shardingsphere.database.protocol.mysql.constant.MySQLConstants;
23 import org.apache.shardingsphere.database.protocol.mysql.packet.MySQLPacket;
24 import org.apache.shardingsphere.database.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
25 import org.apache.shardingsphere.database.protocol.mysql.packet.command.query.text.MySQLTextResultSetRowPacket;
26 import org.apache.shardingsphere.database.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
27 import org.apache.shardingsphere.database.protocol.packet.DatabasePacket;
28 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
29 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
30 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
31 import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
32 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
33 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
34 import org.apache.shardingsphere.proxy.backend.response.header.update.MultiStatementsUpdateResponseHeader;
35 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
36 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
37 import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
38 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
39 import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
40 import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
41 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
42 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DeleteStatement;
43 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
44 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.UpdateStatement;
45 import org.apache.shardingsphere.sql.parser.statement.core.util.MultiSQLSplitter;
46
47 import java.sql.SQLException;
48 import java.util.Collection;
49 import java.util.LinkedList;
50
51
52
53
54 public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
55
56 private final ConnectionSession connectionSession;
57
58 private final ProxyBackendHandler proxyBackendHandler;
59
60 private final int characterSet;
61
62 @Getter
63 private volatile ResponseType responseType;
64
65 public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
66 this.connectionSession = connectionSession;
67 DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL");
68 SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), databaseType, connectionSession);
69 proxyBackendHandler = areMultiStatements(connectionSession, sqlStatement, packet.getSQL())
70 ? new MySQLMultiStatementsProxyBackendHandler(connectionSession, sqlStatement, packet.getSQL())
71 : ProxyBackendHandlerFactory.newInstance(databaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
72 characterSet = connectionSession.getAttributeMap().attr(MySQLConstants.CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
73 }
74
75 private boolean areMultiStatements(final ConnectionSession connectionSession, final SQLStatement sqlStatement, final String sql) {
76 return isMultiStatementsEnabled(connectionSession)
77 && isSuitableMultiStatementsSQLStatement(sqlStatement)
78 && MultiSQLSplitter.hasSameTypeMultiStatements(sqlStatement, MultiSQLSplitter.split(sql));
79 }
80
81 private boolean isMultiStatementsEnabled(final ConnectionSession connectionSession) {
82 return connectionSession.getAttributeMap().hasAttr(MySQLConstants.OPTION_MULTI_STATEMENTS_ATTRIBUTE_KEY)
83 && MySQLComSetOptionPacket.MYSQL_OPTION_MULTI_STATEMENTS_ON == connectionSession.getAttributeMap().attr(MySQLConstants.OPTION_MULTI_STATEMENTS_ATTRIBUTE_KEY).get();
84 }
85
86 private boolean isSuitableMultiStatementsSQLStatement(final SQLStatement sqlStatement) {
87 return containsInsertOnDuplicateKey(sqlStatement) || sqlStatement instanceof UpdateStatement || sqlStatement instanceof DeleteStatement;
88 }
89
90 private boolean containsInsertOnDuplicateKey(final SQLStatement sqlStatement) {
91 return sqlStatement instanceof InsertStatement && ((InsertStatement) sqlStatement).getOnDuplicateKeyColumns().isPresent();
92 }
93
94 @Override
95 public Collection<DatabasePacket> execute() throws SQLException {
96 ResponseHeader responseHeader = proxyBackendHandler.execute();
97 if (responseHeader instanceof QueryResponseHeader) {
98 return processQuery((QueryResponseHeader) responseHeader);
99 }
100 responseType = ResponseType.UPDATE;
101 if (responseHeader instanceof MultiStatementsUpdateResponseHeader) {
102 return processMultiStatementsUpdate((MultiStatementsUpdateResponseHeader) responseHeader);
103 }
104 return processUpdate((UpdateResponseHeader) responseHeader);
105 }
106
107 private Collection<DatabasePacket> processQuery(final QueryResponseHeader queryResponseHeader) {
108 responseType = ResponseType.QUERY;
109 return ResponsePacketBuilder.buildQueryResponsePackets(queryResponseHeader, characterSet, ServerStatusFlagCalculator.calculateFor(connectionSession, true));
110 }
111
112 private Collection<DatabasePacket> processUpdate(final UpdateResponseHeader updateResponseHeader) {
113 return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponseHeader, ServerStatusFlagCalculator.calculateFor(connectionSession, true));
114 }
115
116 private Collection<DatabasePacket> processMultiStatementsUpdate(final MultiStatementsUpdateResponseHeader responseHeader) {
117 Collection<DatabasePacket> result = new LinkedList<>();
118 int index = 0;
119 for (UpdateResponseHeader each : responseHeader.getUpdateResponseHeaders()) {
120 boolean lastPacket = ++index == responseHeader.getUpdateResponseHeaders().size();
121 result.addAll(ResponsePacketBuilder.buildUpdateResponsePackets(each, ServerStatusFlagCalculator.calculateFor(connectionSession, lastPacket)));
122 }
123 return result;
124 }
125
126 @Override
127 public boolean next() throws SQLException {
128 return proxyBackendHandler.next();
129 }
130
131 @Override
132 public MySQLPacket getQueryRowPacket() throws SQLException {
133 return new MySQLTextResultSetRowPacket(proxyBackendHandler.getRowData().getData());
134 }
135
136 @Override
137 public void close() throws SQLException {
138 proxyBackendHandler.close();
139 }
140 }