View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.commons.codec.digest.DigestUtils;
22  import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
23  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
24  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
25  import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
26  import org.opengauss.replication.LogSequenceNumber;
27  
28  import javax.sql.DataSource;
29  import java.sql.CallableStatement;
30  import java.sql.Connection;
31  import java.sql.PreparedStatement;
32  import java.sql.ResultSet;
33  import java.sql.SQLException;
34  import java.util.Optional;
35  
36  /**
37   * Ingest position manager for openGauss.
38   */
39  // TODO reuse PostgreSQLIngestPositionManager
40  @Slf4j
41  public final class OpenGaussIngestPositionManager implements DialectIngestPositionManager {
42      
43      private static final String SLOT_NAME_PREFIX = "pipeline";
44      
45      private static final String DECODE_PLUGIN = "mppdb_decoding";
46      
47      private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
48      
49      @Override
50      public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
51          try (Connection connection = dataSource.getConnection()) {
52              createSlotIfNotExist(connection, slotNameSuffix);
53              return getWalPosition(connection);
54          }
55      }
56      
57      @Override
58      public WALPosition init(final String data) {
59          return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data)));
60      }
61      
62      /**
63       * Create logical replication slot if it does not exist.
64       *
65       * @param connection connection
66       * @param slotNameSuffix slotName suffix
67       * @throws SQLException SQL exception
68       */
69      private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException {
70          String slotName = getUniqueSlotName(connection, slotNameSuffix);
71          Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, slotName);
72          if (!slotInfo.isPresent()) {
73              createSlotBySQL(connection, slotName);
74              return;
75          }
76          if (null == slotInfo.get().getDatabaseName()) {
77              dropSlotIfExist(connection, slotName);
78              createSlotBySQL(connection, slotName);
79          }
80      }
81      
82      private Optional<ReplicationSlotInfo> getSlotInfo(final Connection connection, final String slotName) throws SQLException {
83          String sql = "SELECT slot_name, database FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
84          try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
85              preparedStatement.setString(1, slotName);
86              preparedStatement.setString(2, DECODE_PLUGIN);
87              try (ResultSet resultSet = preparedStatement.executeQuery()) {
88                  if (!resultSet.next()) {
89                      return Optional.empty();
90                  }
91                  return Optional.of(new ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2)));
92              }
93          }
94      }
95      
96      private void createSlotBySQL(final Connection connection, final String slotName) throws SQLException {
97          String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
98          try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
99              preparedStatement.execute();
100         } catch (final SQLException ex) {
101             if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
102                 throw ex;
103             }
104         }
105     }
106     
107     private WALPosition getWalPosition(final Connection connection) throws SQLException {
108         try (
109                 PreparedStatement preparedStatement = connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
110                 ResultSet resultSet = preparedStatement.executeQuery()) {
111             resultSet.next();
112             return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
113         }
114     }
115     
116     @Override
117     public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
118         try (Connection connection = dataSource.getConnection()) {
119             dropSlotIfExist(connection, getUniqueSlotName(connection, slotNameSuffix));
120         }
121     }
122     
123     private void dropSlotIfExist(final Connection connection, final String slotName) throws SQLException {
124         if (!getSlotInfo(connection, slotName).isPresent()) {
125             log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
126             return;
127         }
128         String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName);
129         try (CallableStatement callableStatement = connection.prepareCall(sql)) {
130             callableStatement.execute();
131         }
132     }
133     
134     /**
135      * Get the unique slot name by connection.
136      *
137      * @param connection connection
138      * @param slotNameSuffix slot name suffix
139      * @return the unique name by connection
140      * @throws SQLException failed when getCatalog
141      */
142     public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
143         // same as PostgreSQL, but length over 64 will throw an exception directly
144         String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
145         return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
146     }
147     
148     @Override
149     public String getDatabaseType() {
150         return "openGauss";
151     }
152 }