View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.postgresql.command;
19  
20  import lombok.AccessLevel;
21  import lombok.NoArgsConstructor;
22  import lombok.extern.slf4j.Slf4j;
23  import org.apache.shardingsphere.db.protocol.packet.sql.SQLReceivedPacket;
24  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
25  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
26  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.PostgreSQLAggregatedCommandPacket;
27  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLComBindPacket;
28  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.close.PostgreSQLComClosePacket;
29  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.describe.PostgreSQLComDescribePacket;
30  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.execute.PostgreSQLComExecutePacket;
31  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.parse.PostgreSQLComParsePacket;
32  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.simple.PostgreSQLComQueryPacket;
33  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
34  import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
35  import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLComTerminationExecutor;
36  import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLUnsupportedCommandExecutor;
37  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.PostgreSQLAggregatedBatchedStatementsCommandExecutor;
38  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.PostgreSQLAggregatedCommandExecutor;
39  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.bind.PostgreSQLComBindExecutor;
40  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.close.PostgreSQLComCloseExecutor;
41  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.describe.PostgreSQLComDescribeExecutor;
42  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.execute.PostgreSQLComExecuteExecutor;
43  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.flush.PostgreSQLComFlushExecutor;
44  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.parse.PostgreSQLComParseExecutor;
45  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.sync.PostgreSQLComSyncExecutor;
46  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.simple.PostgreSQLComQueryExecutor;
47  
48  import java.sql.SQLException;
49  import java.util.ArrayList;
50  import java.util.List;
51  
52  /**
53   * Command executor factory for PostgreSQL.
54   */
55  @NoArgsConstructor(access = AccessLevel.PRIVATE)
56  @Slf4j
57  public final class PostgreSQLCommandExecutorFactory {
58      
59      /**
60       * Create new instance of command executor.
61       *
62       * @param commandPacketType command packet type for PostgreSQL
63       * @param commandPacket command packet for PostgreSQL
64       * @param connectionSession connection session
65       * @param portalContext PostgreSQL portal context
66       * @return created instance
67       * @throws SQLException SQL exception
68       */
69      public static CommandExecutor newInstance(final PostgreSQLCommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket,
70                                                final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
71          if (commandPacket instanceof SQLReceivedPacket) {
72              log.debug("Execute packet type: {}, sql: {}", commandPacketType, ((SQLReceivedPacket) commandPacket).getSQL());
73          } else {
74              log.debug("Execute packet type: {}", commandPacketType);
75          }
76          if (!(commandPacket instanceof PostgreSQLAggregatedCommandPacket)) {
77              return getCommandExecutor(commandPacketType, commandPacket, connectionSession, portalContext);
78          }
79          PostgreSQLAggregatedCommandPacket aggregatedCommandPacket = (PostgreSQLAggregatedCommandPacket) commandPacket;
80          if (aggregatedCommandPacket.isContainsBatchedStatements()) {
81              return new PostgreSQLAggregatedCommandExecutor(getExecutorsOfAggregatedBatchedStatements(aggregatedCommandPacket, connectionSession, portalContext));
82          }
83          List<CommandExecutor> result = new ArrayList<>(aggregatedCommandPacket.getPackets().size());
84          for (PostgreSQLCommandPacket each : aggregatedCommandPacket.getPackets()) {
85              result.add(getCommandExecutor((PostgreSQLCommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
86          }
87          return new PostgreSQLAggregatedCommandExecutor(result);
88      }
89      
90      private static List<CommandExecutor> getExecutorsOfAggregatedBatchedStatements(final PostgreSQLAggregatedCommandPacket aggregatedCommandPacket,
91                                                                                     final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
92          List<PostgreSQLCommandPacket> packets = aggregatedCommandPacket.getPackets();
93          int batchPacketBeginIndex = aggregatedCommandPacket.getBatchPacketBeginIndex();
94          int batchPacketEndIndex = aggregatedCommandPacket.getBatchPacketEndIndex();
95          List<CommandExecutor> result = new ArrayList<>(batchPacketBeginIndex + packets.size() - batchPacketEndIndex);
96          for (int i = 0; i < batchPacketBeginIndex; i++) {
97              PostgreSQLCommandPacket each = packets.get(i);
98              result.add(getCommandExecutor((PostgreSQLCommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
99          }
100         result.add(new PostgreSQLAggregatedBatchedStatementsCommandExecutor(connectionSession, packets.subList(batchPacketBeginIndex, batchPacketEndIndex + 1)));
101         for (int i = batchPacketEndIndex + 1; i < packets.size(); i++) {
102             PostgreSQLCommandPacket each = packets.get(i);
103             result.add(getCommandExecutor((PostgreSQLCommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
104         }
105         return result;
106     }
107     
108     private static CommandExecutor getCommandExecutor(final PostgreSQLCommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket,
109                                                       final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
110         switch (commandPacketType) {
111             case SIMPLE_QUERY:
112                 return new PostgreSQLComQueryExecutor(portalContext, (PostgreSQLComQueryPacket) commandPacket, connectionSession);
113             case PARSE_COMMAND:
114                 return new PostgreSQLComParseExecutor((PostgreSQLComParsePacket) commandPacket, connectionSession);
115             case BIND_COMMAND:
116                 return new PostgreSQLComBindExecutor(portalContext, (PostgreSQLComBindPacket) commandPacket, connectionSession);
117             case DESCRIBE_COMMAND:
118                 return new PostgreSQLComDescribeExecutor(portalContext, (PostgreSQLComDescribePacket) commandPacket, connectionSession);
119             case EXECUTE_COMMAND:
120                 return new PostgreSQLComExecuteExecutor(portalContext, (PostgreSQLComExecutePacket) commandPacket);
121             case SYNC_COMMAND:
122                 return new PostgreSQLComSyncExecutor(connectionSession);
123             case CLOSE_COMMAND:
124                 return new PostgreSQLComCloseExecutor(portalContext, (PostgreSQLComClosePacket) commandPacket, connectionSession);
125             case FLUSH_COMMAND:
126                 return new PostgreSQLComFlushExecutor();
127             case TERMINATE:
128                 return new PostgreSQLComTerminationExecutor();
129             default:
130                 return new PostgreSQLUnsupportedCommandExecutor();
131         }
132     }
133 }