1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.simple;
19
20 import lombok.Getter;
21 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
22 import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
23 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription;
24 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLDataRowPacket;
25 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
26 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
27 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.simple.PostgreSQLComQueryPacket;
28 import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
29 import org.apache.shardingsphere.db.protocol.postgresql.packet.handshake.PostgreSQLParameterStatusPacket;
30 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
31 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
32 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
33 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandlerFactory;
34 import org.apache.shardingsphere.proxy.backend.handler.ProxySQLComQueryParser;
35 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
36 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader;
37 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
38 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
39 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
40 import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
41 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
42 import org.apache.shardingsphere.proxy.frontend.postgresql.command.PortalContext;
43 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
44 import org.apache.shardingsphere.sql.parser.sql.common.segment.dal.VariableAssignSegment;
45 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
46 import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.EmptyStatement;
47 import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.SetStatement;
48 import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
49 import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackStatement;
50 import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
51
52 import java.sql.SQLException;
53 import java.util.ArrayList;
54 import java.util.Collection;
55 import java.util.Collections;
56 import java.util.LinkedList;
57
58
59
60
61 public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
62
63 private final PortalContext portalContext;
64
65 private final ProxyBackendHandler proxyBackendHandler;
66
67 @Getter
68 private volatile ResponseType responseType;
69
70 public PostgreSQLComQueryExecutor(final PortalContext portalContext, final PostgreSQLComQueryPacket packet, final ConnectionSession connectionSession) throws SQLException {
71 this.portalContext = portalContext;
72 DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "openGauss");
73 SQLStatement sqlStatement = ProxySQLComQueryParser.parse(packet.getSQL(), databaseType, connectionSession);
74 proxyBackendHandler = ProxyBackendHandlerFactory.newInstance(databaseType, packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
75 }
76
77 @Override
78 public Collection<DatabasePacket> execute() throws SQLException {
79 ResponseHeader responseHeader = proxyBackendHandler.execute();
80 if (responseHeader instanceof QueryResponseHeader) {
81 return Collections.singleton(createRowDescriptionPacket((QueryResponseHeader) responseHeader));
82 }
83 responseType = ResponseType.UPDATE;
84 return createUpdatePacket((UpdateResponseHeader) responseHeader);
85 }
86
87 private PostgreSQLRowDescriptionPacket createRowDescriptionPacket(final QueryResponseHeader queryResponseHeader) {
88 responseType = ResponseType.QUERY;
89 return new PostgreSQLRowDescriptionPacket(createColumnDescriptions(queryResponseHeader));
90 }
91
92 private Collection<PostgreSQLColumnDescription> createColumnDescriptions(final QueryResponseHeader queryResponseHeader) {
93 Collection<PostgreSQLColumnDescription> result = new LinkedList<>();
94 int columnIndex = 0;
95 for (QueryHeader each : queryResponseHeader.getQueryHeaders()) {
96 result.add(new PostgreSQLColumnDescription(each.getColumnLabel(), ++columnIndex, each.getColumnType(), each.getColumnLength(), each.getColumnTypeName()));
97 }
98 return result;
99 }
100
101 private Collection<DatabasePacket> createUpdatePacket(final UpdateResponseHeader updateResponseHeader) throws SQLException {
102 SQLStatement sqlStatement = updateResponseHeader.getSqlStatement();
103 if (sqlStatement instanceof CommitStatement || sqlStatement instanceof RollbackStatement) {
104 portalContext.closeAll();
105 }
106 if (sqlStatement instanceof SetStatement) {
107 return createParameterStatusResponse((SetStatement) sqlStatement);
108 }
109 return Collections.singletonList(sqlStatement instanceof EmptyStatement ? new PostgreSQLEmptyQueryResponsePacket()
110 : new PostgreSQLCommandCompletePacket(PostgreSQLCommand.valueOf(sqlStatement.getClass()).map(PostgreSQLCommand::getTag).orElse(""), updateResponseHeader.getUpdateCount()));
111 }
112
113 private Collection<DatabasePacket> createParameterStatusResponse(final SetStatement sqlStatement) {
114 Collection<DatabasePacket> result = new ArrayList<>(2);
115 result.add(new PostgreSQLCommandCompletePacket("SET", 0));
116 for (VariableAssignSegment each : sqlStatement.getVariableAssigns()) {
117 result.add(new PostgreSQLParameterStatusPacket(each.getVariable().getVariable(), IdentifierValue.getQuotedContent(each.getAssignValue())));
118 }
119 return result;
120 }
121
122 @Override
123 public boolean next() throws SQLException {
124 return proxyBackendHandler.next();
125 }
126
127 @Override
128 public PostgreSQLPacket getQueryRowPacket() throws SQLException {
129 return new PostgreSQLDataRowPacket(proxyBackendHandler.getRowData().getData());
130 }
131
132 @Override
133 public void close() throws SQLException {
134 proxyBackendHandler.close();
135 }
136 }