1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.opengauss.command.query.simple;
19
20 import lombok.Getter;
21 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
22 import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
23 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
24 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLDataRowPacket;
25 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
26 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
27 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.simple.PostgreSQLComQueryPacket;
28 import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
29 import org.apache.shardingsphere.db.protocol.postgresql.packet.handshake.PostgreSQLParameterStatusPacket;
30 import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
31 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
32 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
33 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
34 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
35 import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
36 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
37 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader;
38 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
39 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
40 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
41 import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
42 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
43 import org.apache.shardingsphere.proxy.frontend.postgresql.command.PortalContext;
44 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
45 import org.apache.shardingsphere.sql.parser.statement.core.segment.dal.VariableAssignSegment;
46 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
47 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dal.EmptyStatement;
48 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dal.SetStatement;
49 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.tcl.CommitStatement;
50 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.tcl.RollbackStatement;
51
52 import java.sql.SQLException;
53 import java.util.ArrayList;
54 import java.util.Collection;
55 import java.util.Collections;
56 import java.util.LinkedList;
57
58
59
60
61 public final class OpenGaussComQueryExecutor implements QueryCommandExecutor {
62
63 private final PortalContext portalContext;
64
65 private final ProxyBackendHandler proxyBackendHandler;
66
67 @Getter
68 private volatile ResponseType responseType;
69
70 public OpenGaussComQueryExecutor(final PortalContext portalContext, final PostgreSQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
71 this.portalContext = portalContext;
72 DatabaseType protocolDatabaseType = TypedSPILoader.getService(DatabaseType.class, "openGauss");
73 DatabaseType parserDatabaseType = ProxySQLComQueryParser.getParserDatabaseType(protocolDatabaseType, connectionSession);
74 SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), parserDatabaseType);
75 proxyBackendHandler = ProxyBackendHandlerFactory.newInstance(protocolDatabaseType, parserDatabaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
76 }
77
78 @Override
79 public Collection<DatabasePacket> execute() throws SQLException {
80 ResponseHeader responseHeader = proxyBackendHandler.execute();
81 if (responseHeader instanceof QueryResponseHeader) {
82 return Collections.singleton(createRowDescriptionPacket((QueryResponseHeader) responseHeader));
83 }
84 responseType = ResponseType.UPDATE;
85 return createUpdatePacket((UpdateResponseHeader) responseHeader);
86 }
87
88 private PostgreSQLRowDescriptionPacket createRowDescriptionPacket(final QueryResponseHeader queryResponseHeader) {
89 responseType = ResponseType.QUERY;
90 return new PostgreSQLRowDescriptionPacket(createColumnDescriptions(queryResponseHeader));
91 }
92
93 private Collection<PostgreSQLColumnDescription> createColumnDescriptions(final QueryResponseHeader queryResponseHeader) {
94 Collection<PostgreSQLColumnDescription> result = new LinkedList<>();
95 int columnIndex = 0;
96 for (QueryHeader each : queryResponseHeader.getQueryHeaders()) {
97 result.add(new PostgreSQLColumnDescription(each.getColumnLabel(), ++columnIndex, each.getColumnType(), each.getColumnLength(), each.getColumnTypeName()));
98 }
99 return result;
100 }
101
102 private Collection<DatabasePacket> createUpdatePacket(final UpdateResponseHeader updateResponseHeader) throws SQLException {
103 SQLStatement sqlStatement = updateResponseHeader.getSqlStatement();
104 if (sqlStatement instanceof CommitStatement || sqlStatement instanceof RollbackStatement) {
105 portalContext.closeAll();
106 }
107 if (sqlStatement instanceof SetStatement) {
108 return createParameterStatusResponse((SetStatement) sqlStatement);
109 }
110 return Collections.singletonList(sqlStatement instanceof EmptyStatement ? new PostgreSQLEmptyQueryResponsePacket()
111 : new PostgreSQLCommandCompletePacket(PostgreSQLCommand.valueOf(sqlStatement.getClass()).map(PostgreSQLCommand::getTag).orElse(""), updateResponseHeader.getUpdateCount()));
112 }
113
114 private Collection<DatabasePacket> createParameterStatusResponse(final SetStatement sqlStatement) {
115 Collection<DatabasePacket> result = new ArrayList<>(2);
116 result.add(new PostgreSQLCommandCompletePacket("SET", 0L));
117 for (VariableAssignSegment each : sqlStatement.getVariableAssigns()) {
118 result.add(new PostgreSQLParameterStatusPacket(each.getVariable().getVariable(), null == each.getAssignValue() ? null : QuoteCharacter.unwrapText(each.getAssignValue())));
119 }
120 return result;
121 }
122
123 @Override
124 public boolean next() throws SQLException {
125 return proxyBackendHandler.next();
126 }
127
128 @Override
129 public PostgreSQLPacket getQueryRowPacket() throws SQLException {
130 return new PostgreSQLDataRowPacket(proxyBackendHandler.getRowData().getData());
131 }
132
133 @Override
134 public void close() throws SQLException {
135 proxyBackendHandler.close();
136 }
137 }