1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.postgresql.command;
19
20 import lombok.AccessLevel;
21 import lombok.NoArgsConstructor;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.db.protocol.packet.sql.SQLReceivedPacket;
24 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
25 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
26 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.PostgreSQLAggregatedCommandPacket;
27 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLComBindPacket;
28 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.close.PostgreSQLComClosePacket;
29 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.describe.PostgreSQLComDescribePacket;
30 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.execute.PostgreSQLComExecutePacket;
31 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.parse.PostgreSQLComParsePacket;
32 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.simple.PostgreSQLComQueryPacket;
33 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
34 import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
35 import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLComTerminationExecutor;
36 import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLUnsupportedCommandExecutor;
37 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.PostgreSQLAggregatedBatchedStatementsCommandExecutor;
38 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.PostgreSQLAggregatedCommandExecutor;
39 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.bind.PostgreSQLComBindExecutor;
40 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.close.PostgreSQLComCloseExecutor;
41 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.describe.PostgreSQLComDescribeExecutor;
42 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.execute.PostgreSQLComExecuteExecutor;
43 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.flush.PostgreSQLComFlushExecutor;
44 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.parse.PostgreSQLComParseExecutor;
45 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.sync.PostgreSQLComSyncExecutor;
46 import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.simple.PostgreSQLComQueryExecutor;
47
48 import java.sql.SQLException;
49 import java.util.ArrayList;
50 import java.util.List;
51
52
53
54
55 @NoArgsConstructor(access = AccessLevel.PRIVATE)
56 @Slf4j
57 public final class PostgreSQLCommandExecutorFactory {
58
59
60
61
62
63
64
65
66
67
68
69 public static CommandExecutor newInstance(final PostgreSQLCommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket,
70 final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
71 if (commandPacket instanceof SQLReceivedPacket) {
72 log.debug("Execute packet type: {}, sql: {}", commandPacketType, ((SQLReceivedPacket) commandPacket).getSQL());
73 } else {
74 log.debug("Execute packet type: {}", commandPacketType);
75 }
76 if (!(commandPacket instanceof PostgreSQLAggregatedCommandPacket)) {
77 return getCommandExecutor(commandPacketType, commandPacket, connectionSession, portalContext);
78 }
79 PostgreSQLAggregatedCommandPacket aggregatedCommandPacket = (PostgreSQLAggregatedCommandPacket) commandPacket;
80 if (aggregatedCommandPacket.isContainsBatchedStatements()) {
81 return new PostgreSQLAggregatedCommandExecutor(getExecutorsOfAggregatedBatchedStatements(aggregatedCommandPacket, connectionSession, portalContext));
82 }
83 List<CommandExecutor> result = new ArrayList<>(aggregatedCommandPacket.getPackets().size());
84 for (PostgreSQLCommandPacket each : aggregatedCommandPacket.getPackets()) {
85 result.add(getCommandExecutor((PostgreSQLCommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
86 }
87 return new PostgreSQLAggregatedCommandExecutor(result);
88 }
89
90 private static List<CommandExecutor> getExecutorsOfAggregatedBatchedStatements(final PostgreSQLAggregatedCommandPacket aggregatedCommandPacket,
91 final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
92 List<PostgreSQLCommandPacket> packets = aggregatedCommandPacket.getPackets();
93 int batchPacketBeginIndex = aggregatedCommandPacket.getBatchPacketBeginIndex();
94 int batchPacketEndIndex = aggregatedCommandPacket.getBatchPacketEndIndex();
95 List<CommandExecutor> result = new ArrayList<>(batchPacketBeginIndex + packets.size() - batchPacketEndIndex);
96 for (int i = 0; i < batchPacketBeginIndex; i++) {
97 PostgreSQLCommandPacket each = packets.get(i);
98 result.add(getCommandExecutor((PostgreSQLCommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
99 }
100 result.add(new PostgreSQLAggregatedBatchedStatementsCommandExecutor(connectionSession, packets.subList(batchPacketBeginIndex, batchPacketEndIndex + 1)));
101 for (int i = batchPacketEndIndex + 1; i < packets.size(); i++) {
102 PostgreSQLCommandPacket each = packets.get(i);
103 result.add(getCommandExecutor((PostgreSQLCommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
104 }
105 return result;
106 }
107
108 private static CommandExecutor getCommandExecutor(final PostgreSQLCommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket,
109 final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
110 switch (commandPacketType) {
111 case SIMPLE_QUERY:
112 return new PostgreSQLComQueryExecutor(portalContext, (PostgreSQLComQueryPacket) commandPacket, connectionSession);
113 case PARSE_COMMAND:
114 return new PostgreSQLComParseExecutor((PostgreSQLComParsePacket) commandPacket, connectionSession);
115 case BIND_COMMAND:
116 return new PostgreSQLComBindExecutor(portalContext, (PostgreSQLComBindPacket) commandPacket, connectionSession);
117 case DESCRIBE_COMMAND:
118 return new PostgreSQLComDescribeExecutor(portalContext, (PostgreSQLComDescribePacket) commandPacket, connectionSession);
119 case EXECUTE_COMMAND:
120 return new PostgreSQLComExecuteExecutor(portalContext, (PostgreSQLComExecutePacket) commandPacket);
121 case SYNC_COMMAND:
122 return new PostgreSQLComSyncExecutor(connectionSession);
123 case CLOSE_COMMAND:
124 return new PostgreSQLComCloseExecutor(portalContext, (PostgreSQLComClosePacket) commandPacket, connectionSession);
125 case FLUSH_COMMAND:
126 return new PostgreSQLComFlushExecutor();
127 case TERMINATE:
128 return new PostgreSQLComTerminationExecutor();
129 default:
130 return new PostgreSQLUnsupportedCommandExecutor();
131 }
132 }
133 }