View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
19  
20  import lombok.Getter;
21  import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
22  import org.apache.shardingsphere.database.protocol.mysql.constant.MySQLConstants;
23  import org.apache.shardingsphere.database.protocol.mysql.packet.MySQLPacket;
24  import org.apache.shardingsphere.database.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
25  import org.apache.shardingsphere.database.protocol.mysql.packet.command.query.text.MySQLTextResultSetRowPacket;
26  import org.apache.shardingsphere.database.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
27  import org.apache.shardingsphere.database.protocol.packet.DatabasePacket;
28  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
29  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
30  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
31  import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
32  import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
33  import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
34  import org.apache.shardingsphere.proxy.backend.response.header.update.MultiStatementsUpdateResponseHeader;
35  import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
36  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
37  import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
38  import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
39  import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
40  import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
41  import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
42  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DeleteStatement;
43  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.InsertStatement;
44  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.UpdateStatement;
45  import org.apache.shardingsphere.sql.parser.statement.core.util.MultiSQLSplitter;
46  
47  import java.sql.SQLException;
48  import java.util.Collection;
49  import java.util.LinkedList;
50  
51  /**
52   * COM_QUERY command packet executor for MySQL.
53   */
54  public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
55      
56      private final ConnectionSession connectionSession;
57      
58      private final ProxyBackendHandler proxyBackendHandler;
59      
60      private final int characterSet;
61      
62      @Getter
63      private volatile ResponseType responseType;
64      
65      public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
66          this.connectionSession = connectionSession;
67          DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL");
68          SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), databaseType, connectionSession);
69          proxyBackendHandler = areMultiStatements(connectionSession, sqlStatement, packet.getSQL())
70                  ? new MySQLMultiStatementsProxyBackendHandler(connectionSession, sqlStatement, packet.getSQL())
71                  : ProxyBackendHandlerFactory.newInstance(databaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
72          characterSet = connectionSession.getAttributeMap().attr(MySQLConstants.CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
73      }
74      
75      private boolean areMultiStatements(final ConnectionSession connectionSession, final SQLStatement sqlStatement, final String sql) {
76          return isMultiStatementsEnabled(connectionSession)
77                  && isSuitableMultiStatementsSQLStatement(sqlStatement)
78                  && MultiSQLSplitter.hasSameTypeMultiStatements(sqlStatement, MultiSQLSplitter.split(sql));
79      }
80      
81      private boolean isMultiStatementsEnabled(final ConnectionSession connectionSession) {
82          return connectionSession.getAttributeMap().hasAttr(MySQLConstants.OPTION_MULTI_STATEMENTS_ATTRIBUTE_KEY)
83                  && MySQLComSetOptionPacket.MYSQL_OPTION_MULTI_STATEMENTS_ON == connectionSession.getAttributeMap().attr(MySQLConstants.OPTION_MULTI_STATEMENTS_ATTRIBUTE_KEY).get();
84      }
85      
86      private boolean isSuitableMultiStatementsSQLStatement(final SQLStatement sqlStatement) {
87          return containsInsertOnDuplicateKey(sqlStatement) || sqlStatement instanceof UpdateStatement || sqlStatement instanceof DeleteStatement;
88      }
89      
90      private boolean containsInsertOnDuplicateKey(final SQLStatement sqlStatement) {
91          return sqlStatement instanceof InsertStatement && ((InsertStatement) sqlStatement).getOnDuplicateKeyColumns().isPresent();
92      }
93      
94      @Override
95      public Collection<DatabasePacket> execute() throws SQLException {
96          ResponseHeader responseHeader = proxyBackendHandler.execute();
97          if (responseHeader instanceof QueryResponseHeader) {
98              return processQuery((QueryResponseHeader) responseHeader);
99          }
100         responseType = ResponseType.UPDATE;
101         if (responseHeader instanceof MultiStatementsUpdateResponseHeader) {
102             return processMultiStatementsUpdate((MultiStatementsUpdateResponseHeader) responseHeader);
103         }
104         return processUpdate((UpdateResponseHeader) responseHeader);
105     }
106     
107     private Collection<DatabasePacket> processQuery(final QueryResponseHeader queryResponseHeader) {
108         responseType = ResponseType.QUERY;
109         return ResponsePacketBuilder.buildQueryResponsePackets(queryResponseHeader, characterSet, ServerStatusFlagCalculator.calculateFor(connectionSession, true));
110     }
111     
112     private Collection<DatabasePacket> processUpdate(final UpdateResponseHeader updateResponseHeader) {
113         return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponseHeader, ServerStatusFlagCalculator.calculateFor(connectionSession, true));
114     }
115     
116     private Collection<DatabasePacket> processMultiStatementsUpdate(final MultiStatementsUpdateResponseHeader responseHeader) {
117         Collection<DatabasePacket> result = new LinkedList<>();
118         int index = 0;
119         for (UpdateResponseHeader each : responseHeader.getUpdateResponseHeaders()) {
120             boolean lastPacket = ++index == responseHeader.getUpdateResponseHeaders().size();
121             result.addAll(ResponsePacketBuilder.buildUpdateResponsePackets(each, ServerStatusFlagCalculator.calculateFor(connectionSession, lastPacket)));
122         }
123         return result;
124     }
125     
126     @Override
127     public boolean next() throws SQLException {
128         return proxyBackendHandler.next();
129     }
130     
131     @Override
132     public MySQLPacket getQueryRowPacket() throws SQLException {
133         return new MySQLTextResultSetRowPacket(proxyBackendHandler.getRowData().getData());
134     }
135     
136     @Override
137     public void close() throws SQLException {
138         proxyBackendHandler.close();
139     }
140 }