View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
19  
20  import lombok.Getter;
21  import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
22  import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
23  import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
24  import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.MySQLTextResultSetRowPacket;
25  import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
26  import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
27  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
28  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
29  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
30  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
31  import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
32  import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
33  import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
34  import org.apache.shardingsphere.proxy.backend.response.header.update.MultiStatementsUpdateResponseHeader;
35  import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
36  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
37  import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
38  import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
39  import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
40  import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
41  import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
42  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DeleteStatement;
43  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
44  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.UpdateStatement;
45  
46  import java.sql.SQLException;
47  import java.util.Collection;
48  import java.util.LinkedList;
49  
50  /**
51   * COM_QUERY command packet executor for MySQL.
52   */
53  public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
54      
55      private final ConnectionSession connectionSession;
56      
57      private final ProxyBackendHandler proxyBackendHandler;
58      
59      private final int characterSet;
60      
61      @Getter
62      private volatile ResponseType responseType;
63      
64      public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
65          this.connectionSession = connectionSession;
66          DatabaseType protocolDatabaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL");
67          DatabaseType parserDatabaseType = ProxySQLComQueryParser.getParserDatabaseType(protocolDatabaseType, connectionSession);
68          SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), parserDatabaseType);
69          proxyBackendHandler = areMultiStatements(connectionSession, sqlStatement, packet.getSQL()) ? new MySQLMultiStatementsHandler(connectionSession, sqlStatement, packet.getSQL())
70                  : ProxyBackendHandlerFactory.newInstance(protocolDatabaseType, parserDatabaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
71          characterSet = connectionSession.getAttributeMap().attr(MySQLConstants.CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
72      }
73      
74      private boolean areMultiStatements(final ConnectionSession connectionSession, final SQLStatement sqlStatement, final String sql) {
75          // TODO Multi statements should be identified by SQL Parser instead of checking if sql contains ";".
76          return isMultiStatementsEnabled(connectionSession) && isSuitableMultiStatementsSQLStatement(sqlStatement) && sql.contains(";");
77      }
78      
79      private boolean isMultiStatementsEnabled(final ConnectionSession connectionSession) {
80          return connectionSession.getAttributeMap().hasAttr(MySQLConstants.OPTION_MULTI_STATEMENTS_ATTRIBUTE_KEY)
81                  && MySQLComSetOptionPacket.MYSQL_OPTION_MULTI_STATEMENTS_ON == connectionSession.getAttributeMap().attr(MySQLConstants.OPTION_MULTI_STATEMENTS_ATTRIBUTE_KEY).get();
82      }
83      
84      private boolean isSuitableMultiStatementsSQLStatement(final SQLStatement sqlStatement) {
85          return containsInsertOnDuplicateKey(sqlStatement) || sqlStatement instanceof UpdateStatement || sqlStatement instanceof DeleteStatement;
86      }
87      
88      private boolean containsInsertOnDuplicateKey(final SQLStatement sqlStatement) {
89          return sqlStatement instanceof InsertStatement && ((InsertStatement) sqlStatement).getOnDuplicateKeyColumns().isPresent();
90      }
91      
92      @Override
93      public Collection<DatabasePacket> execute() throws SQLException {
94          ResponseHeader responseHeader = proxyBackendHandler.execute();
95          if (responseHeader instanceof QueryResponseHeader) {
96              return processQuery((QueryResponseHeader) responseHeader);
97          }
98          responseType = ResponseType.UPDATE;
99          if (responseHeader instanceof MultiStatementsUpdateResponseHeader) {
100             return processMultiStatementsUpdate((MultiStatementsUpdateResponseHeader) responseHeader);
101         }
102         return processUpdate((UpdateResponseHeader) responseHeader);
103     }
104     
105     private Collection<DatabasePacket> processQuery(final QueryResponseHeader queryResponseHeader) {
106         responseType = ResponseType.QUERY;
107         return ResponsePacketBuilder.buildQueryResponsePackets(queryResponseHeader, characterSet, ServerStatusFlagCalculator.calculateFor(connectionSession, true));
108     }
109     
110     private Collection<DatabasePacket> processUpdate(final UpdateResponseHeader updateResponseHeader) {
111         return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponseHeader, ServerStatusFlagCalculator.calculateFor(connectionSession, true));
112     }
113     
114     private Collection<DatabasePacket> processMultiStatementsUpdate(final MultiStatementsUpdateResponseHeader responseHeader) {
115         Collection<DatabasePacket> result = new LinkedList<>();
116         int index = 0;
117         for (UpdateResponseHeader each : responseHeader.getUpdateResponseHeaders()) {
118             boolean lastPacket = ++index == responseHeader.getUpdateResponseHeaders().size();
119             result.addAll(ResponsePacketBuilder.buildUpdateResponsePackets(each, ServerStatusFlagCalculator.calculateFor(connectionSession, lastPacket)));
120         }
121         return result;
122     }
123     
124     @Override
125     public boolean next() throws SQLException {
126         return proxyBackendHandler.next();
127     }
128     
129     @Override
130     public MySQLPacket getQueryRowPacket() throws SQLException {
131         return new MySQLTextResultSetRowPacket(proxyBackendHandler.getRowData().getData());
132     }
133     
134     @Override
135     public void close() throws SQLException {
136         proxyBackendHandler.close();
137     }
138 }