1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.opengauss.command;
19
20 import lombok.AccessLevel;
21 import lombok.NoArgsConstructor;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.db.protocol.opengauss.packet.command.OpenGaussCommandPacketType;
24 import org.apache.shardingsphere.db.protocol.opengauss.packet.command.query.extended.bind.OpenGaussComBatchBindPacket;
25 import org.apache.shardingsphere.db.protocol.packet.command.CommandPacketType;
26 import org.apache.shardingsphere.db.protocol.packet.sql.SQLReceivedPacket;
27 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
28 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
29 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.PostgreSQLAggregatedCommandPacket;
30 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLComBindPacket;
31 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.close.PostgreSQLComClosePacket;
32 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.describe.PostgreSQLComDescribePacket;
33 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.execute.PostgreSQLComExecutePacket;
34 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.parse.PostgreSQLComParsePacket;
35 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.simple.PostgreSQLComQueryPacket;
36 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
37 import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
38 import org.apache.shardingsphere.proxy.frontend.opengauss.command.query.extended.bind.OpenGaussComBatchBindExecutor;
39 import org.apache.shardingsphere.proxy.frontend.opengauss.command.query.simple.OpenGaussComQueryExecutor;
40 import org.apache.shardingsphere.proxy.frontend.postgresql.command.PortalContext;
41 import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLComTerminationExecutor;
42 import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLUnsupportedCommandExecutor;
43 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.PostgreSQLAggregatedBatchedStatementsCommandExecutor;
44 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.PostgreSQLAggregatedCommandExecutor;
45 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.bind.PostgreSQLComBindExecutor;
46 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.close.PostgreSQLComCloseExecutor;
47 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.describe.PostgreSQLComDescribeExecutor;
48 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.execute.PostgreSQLComExecuteExecutor;
49 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.flush.PostgreSQLComFlushExecutor;
50 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.parse.PostgreSQLComParseExecutor;
51 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.sync.PostgreSQLComSyncExecutor;
52
53 import java.sql.SQLException;
54 import java.util.ArrayList;
55 import java.util.List;
56
57
58
59
60 @NoArgsConstructor(access = AccessLevel.PRIVATE)
61 @Slf4j
62 public final class OpenGaussCommandExecutorFactory {
63
64
65
66
67
68
69
70
71
72
73
74 public static CommandExecutor newInstance(final CommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket,
75 final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
76 if (commandPacket instanceof SQLReceivedPacket) {
77 log.debug("Execute packet type: {}, sql: {}", commandPacketType, ((SQLReceivedPacket) commandPacket).getSQL());
78 } else {
79 log.debug("Execute packet type: {}", commandPacketType);
80 }
81 if (!(commandPacket instanceof PostgreSQLAggregatedCommandPacket)) {
82 return getCommandExecutor(commandPacketType, commandPacket, connectionSession, portalContext);
83 }
84 PostgreSQLAggregatedCommandPacket aggregatedCommandPacket = (PostgreSQLAggregatedCommandPacket) commandPacket;
85 if (aggregatedCommandPacket.isContainsBatchedStatements() && aggregatedCommandPacket.getPackets().stream().noneMatch(OpenGaussComBatchBindPacket.class::isInstance)) {
86 return new PostgreSQLAggregatedCommandExecutor(getExecutorsOfAggregatedBatchedStatements(aggregatedCommandPacket, connectionSession, portalContext));
87 }
88 List<CommandExecutor> result = new ArrayList<>(aggregatedCommandPacket.getPackets().size());
89 for (PostgreSQLCommandPacket each : aggregatedCommandPacket.getPackets()) {
90 result.add(getCommandExecutor((CommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
91 }
92 return new PostgreSQLAggregatedCommandExecutor(result);
93 }
94
95 private static List<CommandExecutor> getExecutorsOfAggregatedBatchedStatements(final PostgreSQLAggregatedCommandPacket aggregatedCommandPacket,
96 final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
97 List<PostgreSQLCommandPacket> packets = aggregatedCommandPacket.getPackets();
98 int batchPacketBeginIndex = aggregatedCommandPacket.getBatchPacketBeginIndex();
99 int batchPacketEndIndex = aggregatedCommandPacket.getBatchPacketEndIndex();
100 List<CommandExecutor> result = new ArrayList<>(batchPacketBeginIndex + packets.size() - batchPacketEndIndex);
101 for (int i = 0; i < batchPacketBeginIndex; i++) {
102 PostgreSQLCommandPacket each = packets.get(i);
103 result.add(getCommandExecutor((CommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
104 }
105 result.add(new PostgreSQLAggregatedBatchedStatementsCommandExecutor(connectionSession, packets.subList(batchPacketBeginIndex, batchPacketEndIndex + 1)));
106 for (int i = batchPacketEndIndex + 1; i < packets.size(); i++) {
107 PostgreSQLCommandPacket each = packets.get(i);
108 result.add(getCommandExecutor((CommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
109 }
110 return result;
111 }
112
113 private static CommandExecutor getCommandExecutor(final CommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket,
114 final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
115 if (OpenGaussCommandPacketType.BATCH_BIND_COMMAND == commandPacketType) {
116 return new OpenGaussComBatchBindExecutor((OpenGaussComBatchBindPacket) commandPacket, connectionSession);
117 }
118 switch ((PostgreSQLCommandPacketType) commandPacketType) {
119 case SIMPLE_QUERY:
120 return new OpenGaussComQueryExecutor(portalContext, (PostgreSQLComQueryPacket) commandPacket, connectionSession);
121 case PARSE_COMMAND:
122 return new PostgreSQLComParseExecutor((PostgreSQLComParsePacket) commandPacket, connectionSession);
123 case BIND_COMMAND:
124 return new PostgreSQLComBindExecutor(portalContext, (PostgreSQLComBindPacket) commandPacket, connectionSession);
125 case DESCRIBE_COMMAND:
126 return new PostgreSQLComDescribeExecutor(portalContext, (PostgreSQLComDescribePacket) commandPacket, connectionSession);
127 case EXECUTE_COMMAND:
128 return new PostgreSQLComExecuteExecutor(portalContext, (PostgreSQLComExecutePacket) commandPacket);
129 case SYNC_COMMAND:
130 return new PostgreSQLComSyncExecutor(connectionSession);
131 case CLOSE_COMMAND:
132 return new PostgreSQLComCloseExecutor(portalContext, (PostgreSQLComClosePacket) commandPacket, connectionSession);
133 case FLUSH_COMMAND:
134 return new PostgreSQLComFlushExecutor();
135 case TERMINATE:
136 return new PostgreSQLComTerminationExecutor();
137 default:
138 return new PostgreSQLUnsupportedCommandExecutor();
139 }
140 }
141 }