View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.opengauss.command;
19  
20  import lombok.AccessLevel;
21  import lombok.NoArgsConstructor;
22  import lombok.extern.slf4j.Slf4j;
23  import org.apache.shardingsphere.db.protocol.opengauss.packet.command.OpenGaussCommandPacketType;
24  import org.apache.shardingsphere.db.protocol.opengauss.packet.command.query.extended.bind.OpenGaussComBatchBindPacket;
25  import org.apache.shardingsphere.db.protocol.packet.command.CommandPacketType;
26  import org.apache.shardingsphere.db.protocol.packet.sql.SQLReceivedPacket;
27  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
28  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
29  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.PostgreSQLAggregatedCommandPacket;
30  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLComBindPacket;
31  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.close.PostgreSQLComClosePacket;
32  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.describe.PostgreSQLComDescribePacket;
33  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.execute.PostgreSQLComExecutePacket;
34  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.parse.PostgreSQLComParsePacket;
35  import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.simple.PostgreSQLComQueryPacket;
36  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
37  import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
38  import org.apache.shardingsphere.proxy.frontend.opengauss.command.query.extended.bind.OpenGaussComBatchBindExecutor;
39  import org.apache.shardingsphere.proxy.frontend.opengauss.command.query.simple.OpenGaussComQueryExecutor;
40  import org.apache.shardingsphere.proxy.frontend.postgresql.command.PortalContext;
41  import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLComTerminationExecutor;
42  import org.apache.shardingsphere.proxy.frontend.postgresql.command.generic.PostgreSQLUnsupportedCommandExecutor;
43  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.PostgreSQLAggregatedBatchedStatementsCommandExecutor;
44  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.PostgreSQLAggregatedCommandExecutor;
45  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.bind.PostgreSQLComBindExecutor;
46  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.close.PostgreSQLComCloseExecutor;
47  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.describe.PostgreSQLComDescribeExecutor;
48  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.execute.PostgreSQLComExecuteExecutor;
49  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.flush.PostgreSQLComFlushExecutor;
50  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.parse.PostgreSQLComParseExecutor;
51  import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.extended.sync.PostgreSQLComSyncExecutor;
52  
53  import java.sql.SQLException;
54  import java.util.ArrayList;
55  import java.util.List;
56  
57  /**
58   * Command executor factory for openGauss.
59   */
60  @NoArgsConstructor(access = AccessLevel.PRIVATE)
61  @Slf4j
62  public final class OpenGaussCommandExecutorFactory {
63      
64      /**
65       * Create new instance of command executor.
66       *
67       * @param commandPacketType command packet type for PostgreSQL/openGauss
68       * @param commandPacket command packet for PostgreSQL/openGauss
69       * @param connectionSession connection session
70       * @param portalContext PostgreSQL portal context
71       * @return created instance
72       * @throws SQLException SQL exception
73       */
74      public static CommandExecutor newInstance(final CommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket,
75                                                final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
76          if (commandPacket instanceof SQLReceivedPacket) {
77              log.debug("Execute packet type: {}, sql: {}", commandPacketType, ((SQLReceivedPacket) commandPacket).getSQL());
78          } else {
79              log.debug("Execute packet type: {}", commandPacketType);
80          }
81          if (!(commandPacket instanceof PostgreSQLAggregatedCommandPacket)) {
82              return getCommandExecutor(commandPacketType, commandPacket, connectionSession, portalContext);
83          }
84          PostgreSQLAggregatedCommandPacket aggregatedCommandPacket = (PostgreSQLAggregatedCommandPacket) commandPacket;
85          if (aggregatedCommandPacket.isContainsBatchedStatements() && aggregatedCommandPacket.getPackets().stream().noneMatch(OpenGaussComBatchBindPacket.class::isInstance)) {
86              return new PostgreSQLAggregatedCommandExecutor(getExecutorsOfAggregatedBatchedStatements(aggregatedCommandPacket, connectionSession, portalContext));
87          }
88          List<CommandExecutor> result = new ArrayList<>(aggregatedCommandPacket.getPackets().size());
89          for (PostgreSQLCommandPacket each : aggregatedCommandPacket.getPackets()) {
90              result.add(getCommandExecutor((CommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
91          }
92          return new PostgreSQLAggregatedCommandExecutor(result);
93      }
94      
95      private static List<CommandExecutor> getExecutorsOfAggregatedBatchedStatements(final PostgreSQLAggregatedCommandPacket aggregatedCommandPacket,
96                                                                                     final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
97          List<PostgreSQLCommandPacket> packets = aggregatedCommandPacket.getPackets();
98          int batchPacketBeginIndex = aggregatedCommandPacket.getBatchPacketBeginIndex();
99          int batchPacketEndIndex = aggregatedCommandPacket.getBatchPacketEndIndex();
100         List<CommandExecutor> result = new ArrayList<>(batchPacketBeginIndex + packets.size() - batchPacketEndIndex);
101         for (int i = 0; i < batchPacketBeginIndex; i++) {
102             PostgreSQLCommandPacket each = packets.get(i);
103             result.add(getCommandExecutor((CommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
104         }
105         result.add(new PostgreSQLAggregatedBatchedStatementsCommandExecutor(connectionSession, packets.subList(batchPacketBeginIndex, batchPacketEndIndex + 1)));
106         for (int i = batchPacketEndIndex + 1; i < packets.size(); i++) {
107             PostgreSQLCommandPacket each = packets.get(i);
108             result.add(getCommandExecutor((CommandPacketType) each.getIdentifier(), each, connectionSession, portalContext));
109         }
110         return result;
111     }
112     
113     private static CommandExecutor getCommandExecutor(final CommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket,
114                                                       final ConnectionSession connectionSession, final PortalContext portalContext) throws SQLException {
115         if (OpenGaussCommandPacketType.BATCH_BIND_COMMAND == commandPacketType) {
116             return new OpenGaussComBatchBindExecutor((OpenGaussComBatchBindPacket) commandPacket, connectionSession);
117         }
118         switch ((PostgreSQLCommandPacketType) commandPacketType) {
119             case SIMPLE_QUERY:
120                 return new OpenGaussComQueryExecutor(portalContext, (PostgreSQLComQueryPacket) commandPacket, connectionSession);
121             case PARSE_COMMAND:
122                 return new PostgreSQLComParseExecutor((PostgreSQLComParsePacket) commandPacket, connectionSession);
123             case BIND_COMMAND:
124                 return new PostgreSQLComBindExecutor(portalContext, (PostgreSQLComBindPacket) commandPacket, connectionSession);
125             case DESCRIBE_COMMAND:
126                 return new PostgreSQLComDescribeExecutor(portalContext, (PostgreSQLComDescribePacket) commandPacket, connectionSession);
127             case EXECUTE_COMMAND:
128                 return new PostgreSQLComExecuteExecutor(portalContext, (PostgreSQLComExecutePacket) commandPacket);
129             case SYNC_COMMAND:
130                 return new PostgreSQLComSyncExecutor(connectionSession);
131             case CLOSE_COMMAND:
132                 return new PostgreSQLComCloseExecutor(portalContext, (PostgreSQLComClosePacket) commandPacket, connectionSession);
133             case FLUSH_COMMAND:
134                 return new PostgreSQLComFlushExecutor();
135             case TERMINATE:
136                 return new PostgreSQLComTerminationExecutor();
137             default:
138                 return new PostgreSQLUnsupportedCommandExecutor();
139         }
140     }
141 }