View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
19  
20  import lombok.Getter;
21  import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
22  import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
23  import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
24  import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.MySQLTextResultSetRowPacket;
25  import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
26  import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
27  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
28  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
29  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
30  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
31  import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
32  import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
33  import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
34  import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
35  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
36  import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
37  import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
38  import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
39  import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
40  import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
41  import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
42  import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
43  
44  import java.sql.SQLException;
45  import java.util.Collection;
46  
47  /**
48   * COM_QUERY command packet executor for MySQL.
49   */
50  public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
51      
52      private final ConnectionSession connectionSession;
53      
54      private final ProxyBackendHandler proxyBackendHandler;
55      
56      private final int characterSet;
57      
58      @Getter
59      private volatile ResponseType responseType;
60      
61      public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
62          this.connectionSession = connectionSession;
63          DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL");
64          SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), databaseType, connectionSession);
65          proxyBackendHandler = areMultiStatements(connectionSession, sqlStatement, packet.getSQL()) ? new MySQLMultiStatementsHandler(connectionSession, sqlStatement, packet.getSQL())
66                  : ProxyBackendHandlerFactory.newInstance(databaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
67          characterSet = connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
68      }
69      
70      private boolean areMultiStatements(final ConnectionSession connectionSession, final SQLStatement sqlStatement, final String sql) {
71          // TODO Multi statements should be identified by SQL Parser instead of checking if sql contains ";".
72          return isMultiStatementsEnabled(connectionSession) && isSuitableMultiStatementsSQLStatement(sqlStatement) && sql.contains(";");
73      }
74      
75      private boolean isMultiStatementsEnabled(final ConnectionSession connectionSession) {
76          return connectionSession.getAttributeMap().hasAttr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS)
77                  && MySQLComSetOptionPacket.MYSQL_OPTION_MULTI_STATEMENTS_ON == connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS).get();
78      }
79      
80      private boolean isSuitableMultiStatementsSQLStatement(final SQLStatement sqlStatement) {
81          return sqlStatement instanceof UpdateStatement || sqlStatement instanceof DeleteStatement;
82      }
83      
84      @Override
85      public Collection<DatabasePacket> execute() throws SQLException {
86          ResponseHeader responseHeader = proxyBackendHandler.execute();
87          if (responseHeader instanceof QueryResponseHeader) {
88              return processQuery((QueryResponseHeader) responseHeader);
89          }
90          responseType = ResponseType.UPDATE;
91          return processUpdate((UpdateResponseHeader) responseHeader);
92      }
93      
94      private Collection<DatabasePacket> processQuery(final QueryResponseHeader queryResponseHeader) {
95          responseType = ResponseType.QUERY;
96          return ResponsePacketBuilder.buildQueryResponsePackets(queryResponseHeader, characterSet, ServerStatusFlagCalculator.calculateFor(connectionSession));
97      }
98      
99      private Collection<DatabasePacket> processUpdate(final UpdateResponseHeader updateResponseHeader) {
100         return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponseHeader, ServerStatusFlagCalculator.calculateFor(connectionSession));
101     }
102     
103     @Override
104     public boolean next() throws SQLException {
105         return proxyBackendHandler.next();
106     }
107     
108     @Override
109     public MySQLPacket getQueryRowPacket() throws SQLException {
110         return new MySQLTextResultSetRowPacket(proxyBackendHandler.getRowData().getData());
111     }
112     
113     @Override
114     public void close() throws SQLException {
115         proxyBackendHandler.close();
116     }
117 }