View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.opengauss.command.query.simple;
19  
20  import lombok.Getter;
21  import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
22  import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
23  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
24  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLDataRowPacket;
25  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
26  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
27  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.simple.PostgreSQLComQueryPacket;
28  import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
29  import org.apache.shardingsphere.db.protocol.postgresql.packet.handshake.PostgreSQLParameterStatusPacket;
30  import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
31  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
32  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
33  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
34  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
35  import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
36  import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
37  import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader;
38  import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
39  import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
40  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
41  import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
42  import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
43  import org.apache.shardingsphere.proxy.frontend.postgresql.command.PortalContext;
44  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
45  import org.apache.shardingsphere.sql.parser.statement.core.segment.dal.VariableAssignSegment;
46  import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
47  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dal.EmptyStatement;
48  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dal.SetStatement;
49  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.tcl.CommitStatement;
50  import org.apache.shardingsphere.sql.parser.statement.core.statement.type.tcl.RollbackStatement;
51  
52  import java.sql.SQLException;
53  import java.util.ArrayList;
54  import java.util.Collection;
55  import java.util.Collections;
56  import java.util.LinkedList;
57  
58  /**
59   * Command query executor for openGauss.
60   */
61  public final class OpenGaussComQueryExecutor implements QueryCommandExecutor {
62      
63      private final PortalContext portalContext;
64      
65      private final ProxyBackendHandler proxyBackendHandler;
66      
67      @Getter
68      private volatile ResponseType responseType;
69      
70      public OpenGaussComQueryExecutor(final PortalContext portalContext, final PostgreSQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
71          this.portalContext = portalContext;
72          DatabaseType protocolDatabaseType = TypedSPILoader.getService(DatabaseType.class, "openGauss");
73          DatabaseType parserDatabaseType = ProxySQLComQueryParser.getParserDatabaseType(protocolDatabaseType, connectionSession);
74          SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), parserDatabaseType);
75          proxyBackendHandler = ProxyBackendHandlerFactory.newInstance(protocolDatabaseType, parserDatabaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
76      }
77      
78      @Override
79      public Collection<DatabasePacket> execute() throws SQLException {
80          ResponseHeader responseHeader = proxyBackendHandler.execute();
81          if (responseHeader instanceof QueryResponseHeader) {
82              return Collections.singleton(createRowDescriptionPacket((QueryResponseHeader) responseHeader));
83          }
84          responseType = ResponseType.UPDATE;
85          return createUpdatePacket((UpdateResponseHeader) responseHeader);
86      }
87      
88      private PostgreSQLRowDescriptionPacket createRowDescriptionPacket(final QueryResponseHeader queryResponseHeader) {
89          responseType = ResponseType.QUERY;
90          return new PostgreSQLRowDescriptionPacket(createColumnDescriptions(queryResponseHeader));
91      }
92      
93      private Collection<PostgreSQLColumnDescription> createColumnDescriptions(final QueryResponseHeader queryResponseHeader) {
94          Collection<PostgreSQLColumnDescription> result = new LinkedList<>();
95          int columnIndex = 0;
96          for (QueryHeader each : queryResponseHeader.getQueryHeaders()) {
97              result.add(new PostgreSQLColumnDescription(each.getColumnLabel(), ++columnIndex, each.getColumnType(), each.getColumnLength(), each.getColumnTypeName()));
98          }
99          return result;
100     }
101     
102     private Collection<DatabasePacket> createUpdatePacket(final UpdateResponseHeader updateResponseHeader) throws SQLException {
103         SQLStatement sqlStatement = updateResponseHeader.getSqlStatement();
104         if (sqlStatement instanceof CommitStatement || sqlStatement instanceof RollbackStatement) {
105             portalContext.closeAll();
106         }
107         if (sqlStatement instanceof SetStatement) {
108             return createParameterStatusResponse((SetStatement) sqlStatement);
109         }
110         return Collections.singletonList(sqlStatement instanceof EmptyStatement ? new PostgreSQLEmptyQueryResponsePacket()
111                 : new PostgreSQLCommandCompletePacket(PostgreSQLCommand.valueOf(sqlStatement.getClass()).map(PostgreSQLCommand::getTag).orElse(""), updateResponseHeader.getUpdateCount()));
112     }
113     
114     private Collection<DatabasePacket> createParameterStatusResponse(final SetStatement sqlStatement) {
115         Collection<DatabasePacket> result = new ArrayList<>(2);
116         result.add(new PostgreSQLCommandCompletePacket("SET", 0L));
117         for (VariableAssignSegment each : sqlStatement.getVariableAssigns()) {
118             result.add(new PostgreSQLParameterStatusPacket(each.getVariable().getVariable(), null == each.getAssignValue() ? null : QuoteCharacter.unwrapText(each.getAssignValue())));
119         }
120         return result;
121     }
122     
123     @Override
124     public boolean next() throws SQLException {
125         return proxyBackendHandler.next();
126     }
127     
128     @Override
129     public PostgreSQLPacket getQueryRowPacket() throws SQLException {
130         return new PostgreSQLDataRowPacket(proxyBackendHandler.getRowData().getData());
131     }
132     
133     @Override
134     public void close() throws SQLException {
135         proxyBackendHandler.close();
136     }
137 }