1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
19
20 import lombok.Getter;
21 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
22 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
23 import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
24 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.MySQLTextResultSetRowPacket;
25 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
26 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
27 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
28 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
29 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
30 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
31 import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
32 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
33 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
34 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
35 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
36 import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
37 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
38 import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
39 import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
40 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
41 import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
42 import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
43
44 import java.sql.SQLException;
45 import java.util.Collection;
46
47
48
49
50 public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
51
52 private final ConnectionSession connectionSession;
53
54 private final ProxyBackendHandler proxyBackendHandler;
55
56 private final int characterSet;
57
58 @Getter
59 private volatile ResponseType responseType;
60
61 public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
62 this.connectionSession = connectionSession;
63 DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL");
64 SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), databaseType, connectionSession);
65 proxyBackendHandler = areMultiStatements(connectionSession, sqlStatement, packet.getSQL()) ? new MySQLMultiStatementsHandler(connectionSession, sqlStatement, packet.getSQL())
66 : ProxyBackendHandlerFactory.newInstance(databaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
67 characterSet = connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
68 }
69
70 private boolean areMultiStatements(final ConnectionSession connectionSession, final SQLStatement sqlStatement, final String sql) {
71
72 return isMultiStatementsEnabled(connectionSession) && isSuitableMultiStatementsSQLStatement(sqlStatement) && sql.contains(";");
73 }
74
75 private boolean isMultiStatementsEnabled(final ConnectionSession connectionSession) {
76 return connectionSession.getAttributeMap().hasAttr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS)
77 && MySQLComSetOptionPacket.MYSQL_OPTION_MULTI_STATEMENTS_ON == connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS).get();
78 }
79
80 private boolean isSuitableMultiStatementsSQLStatement(final SQLStatement sqlStatement) {
81 return sqlStatement instanceof UpdateStatement || sqlStatement instanceof DeleteStatement;
82 }
83
84 @Override
85 public Collection<DatabasePacket> execute() throws SQLException {
86 ResponseHeader responseHeader = proxyBackendHandler.execute();
87 if (responseHeader instanceof QueryResponseHeader) {
88 return processQuery((QueryResponseHeader) responseHeader);
89 }
90 responseType = ResponseType.UPDATE;
91 return processUpdate((UpdateResponseHeader) responseHeader);
92 }
93
94 private Collection<DatabasePacket> processQuery(final QueryResponseHeader queryResponseHeader) {
95 responseType = ResponseType.QUERY;
96 return ResponsePacketBuilder.buildQueryResponsePackets(queryResponseHeader, characterSet, ServerStatusFlagCalculator.calculateFor(connectionSession));
97 }
98
99 private Collection<DatabasePacket> processUpdate(final UpdateResponseHeader updateResponseHeader) {
100 return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponseHeader, ServerStatusFlagCalculator.calculateFor(connectionSession));
101 }
102
103 @Override
104 public boolean next() throws SQLException {
105 return proxyBackendHandler.next();
106 }
107
108 @Override
109 public MySQLPacket getQueryRowPacket() throws SQLException {
110 return new MySQLTextResultSetRowPacket(proxyBackendHandler.getRowData().getData());
111 }
112
113 @Override
114 public void close() throws SQLException {
115 proxyBackendHandler.close();
116 }
117 }