View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.opengauss.command.query.simple;
19  
20  import lombok.Getter;
21  import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
22  import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
23  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
24  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLDataRowPacket;
25  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
26  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
27  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.simple.PostgreSQLComQueryPacket;
28  import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
29  import org.apache.shardingsphere.db.protocol.postgresql.packet.handshake.PostgreSQLParameterStatusPacket;
30  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
31  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
32  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
33  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
34  import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
35  import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
36  import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader;
37  import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
38  import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
39  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
40  import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
41  import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
42  import org.apache.shardingsphere.proxy.frontend.postgresql.command.PortalContext;
43  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
44  import org.apache.shardingsphere.sql.parser.sql.common.segment.dal.VariableAssignSegment;
45  import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
46  import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.EmptyStatement;
47  import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.SetStatement;
48  import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
49  import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackStatement;
50  import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
51  
52  import java.sql.SQLException;
53  import java.util.ArrayList;
54  import java.util.Collection;
55  import java.util.Collections;
56  import java.util.LinkedList;
57  
58  /**
59   * Command query executor for openGauss.
60   */
61  public final class OpenGaussComQueryExecutor implements QueryCommandExecutor {
62      
63      private final PortalContext portalContext;
64      
65      private final ProxyBackendHandler proxyBackendHandler;
66      
67      @Getter
68      private volatile ResponseType responseType;
69      
70      public OpenGaussComQueryExecutor(final PortalContext portalContext, final PostgreSQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
71          this.portalContext = portalContext;
72          DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "openGauss");
73          SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), databaseType, connectionSession);
74          proxyBackendHandler = ProxyBackendHandlerFactory.newInstance(databaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
75      }
76      
77      @Override
78      public Collection<DatabasePacket> execute() throws SQLException {
79          ResponseHeader responseHeader = proxyBackendHandler.execute();
80          if (responseHeader instanceof QueryResponseHeader) {
81              return Collections.singleton(createRowDescriptionPacket((QueryResponseHeader) responseHeader));
82          }
83          responseType = ResponseType.UPDATE;
84          return createUpdatePacket((UpdateResponseHeader) responseHeader);
85      }
86      
87      private PostgreSQLRowDescriptionPacket createRowDescriptionPacket(final QueryResponseHeader queryResponseHeader) {
88          responseType = ResponseType.QUERY;
89          return new PostgreSQLRowDescriptionPacket(createColumnDescriptions(queryResponseHeader));
90      }
91      
92      private Collection<PostgreSQLColumnDescription> createColumnDescriptions(final QueryResponseHeader queryResponseHeader) {
93          Collection<PostgreSQLColumnDescription> result = new LinkedList<>();
94          int columnIndex = 0;
95          for (QueryHeader each : queryResponseHeader.getQueryHeaders()) {
96              result.add(new PostgreSQLColumnDescription(each.getColumnLabel(), ++columnIndex, each.getColumnType(), each.getColumnLength(), each.getColumnTypeName()));
97          }
98          return result;
99      }
100     
101     private Collection<DatabasePacket> createUpdatePacket(final UpdateResponseHeader updateResponseHeader) throws SQLException {
102         SQLStatement sqlStatement = updateResponseHeader.getSqlStatement();
103         if (sqlStatement instanceof CommitStatement || sqlStatement instanceof RollbackStatement) {
104             portalContext.closeAll();
105         }
106         if (sqlStatement instanceof SetStatement) {
107             return createParameterStatusResponse((SetStatement) sqlStatement);
108         }
109         return Collections.singletonList(sqlStatement instanceof EmptyStatement ? new PostgreSQLEmptyQueryResponsePacket()
110                 : new PostgreSQLCommandCompletePacket(PostgreSQLCommand.valueOf(sqlStatement.getClass()).map(PostgreSQLCommand::getTag).orElse(""), updateResponseHeader.getUpdateCount()));
111     }
112     
113     private Collection<DatabasePacket> createParameterStatusResponse(final SetStatement sqlStatement) {
114         Collection<DatabasePacket> result = new ArrayList<>(2);
115         result.add(new PostgreSQLCommandCompletePacket("SET", 0));
116         for (VariableAssignSegment each : sqlStatement.getVariableAssigns()) {
117             result.add(new PostgreSQLParameterStatusPacket(each.getVariable().getVariable(), IdentifierValue.getQuotedContent(each.getAssignValue())));
118         }
119         return result;
120     }
121     
122     @Override
123     public boolean next() throws SQLException {
124         return proxyBackendHandler.next();
125     }
126     
127     @Override
128     public PostgreSQLPacket getQueryRowPacket() throws SQLException {
129         return new PostgreSQLDataRowPacket(proxyBackendHandler.getRowData().getData());
130     }
131     
132     @Override
133     public void close() throws SQLException {
134         proxyBackendHandler.close();
135     }
136 }