View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
19  
20  import lombok.extern.slf4j.Slf4j;
21  import org.apache.commons.codec.digest.DigestUtils;
22  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
23  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
24  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
25  import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
26  import org.postgresql.replication.LogSequenceNumber;
27  
28  import javax.sql.DataSource;
29  import java.sql.Connection;
30  import java.sql.PreparedStatement;
31  import java.sql.ResultSet;
32  import java.sql.SQLException;
33  
34  /**
35   * Ingest position manager for PostgreSQL.
36   */
37  @Slf4j
38  public final class PostgreSQLIngestPositionManager implements DialectIngestPositionManager {
39      
40      private static final String SLOT_NAME_PREFIX = "pipeline";
41      
42      private static final String DECODE_PLUGIN = "test_decoding";
43      
44      private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
45      
46      @Override
47      public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
48          try (Connection connection = dataSource.getConnection()) {
49              createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix));
50              return getWalPosition(connection);
51          }
52      }
53      
54      @Override
55      public WALPosition init(final String data) {
56          return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data)));
57      }
58      
59      private void createSlotIfNotExist(final Connection connection, final String slotName) throws SQLException {
60          if (isSlotExisting(connection, slotName)) {
61              log.info("createSlotIfNotExist, slot exist, slotName={}", slotName);
62              return;
63          }
64          String createSlotSQL = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
65          try (PreparedStatement preparedStatement = connection.prepareStatement(createSlotSQL)) {
66              preparedStatement.execute();
67          } catch (final SQLException ex) {
68              if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
69                  throw ex;
70              }
71          }
72      }
73      
74      private boolean isSlotExisting(final Connection connection, final String slotName) throws SQLException {
75          String checkSlotSQL = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
76          try (PreparedStatement preparedStatement = connection.prepareStatement(checkSlotSQL)) {
77              preparedStatement.setString(1, slotName);
78              preparedStatement.setString(2, DECODE_PLUGIN);
79              try (ResultSet resultSet = preparedStatement.executeQuery()) {
80                  return resultSet.next();
81              }
82          }
83      }
84      
85      private WALPosition getWalPosition(final Connection connection) throws SQLException {
86          try (
87                  PreparedStatement preparedStatement = connection.prepareStatement(getLogSequenceNumberSQL(connection));
88                  ResultSet resultSet = preparedStatement.executeQuery()) {
89              resultSet.next();
90              return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
91          }
92      }
93      
94      private String getLogSequenceNumberSQL(final Connection connection) throws SQLException {
95          if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <= connection.getMetaData().getDatabaseMinorVersion()) {
96              return "SELECT PG_CURRENT_XLOG_LOCATION()";
97          }
98          if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
99              return "SELECT PG_CURRENT_WAL_LSN()";
100         }
101         throw new PipelineInternalException("Unsupported PostgreSQL version: " + connection.getMetaData().getDatabaseProductVersion());
102     }
103     
104     @Override
105     public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
106         try (Connection connection = dataSource.getConnection()) {
107             dropSlotIfExist(connection, slotNameSuffix);
108         }
109     }
110     
111     private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException {
112         String slotName = getUniqueSlotName(connection, slotNameSuffix);
113         if (!isSlotExisting(connection, slotName)) {
114             log.info("dropSlotIfExist, slot not exist, slotName={}", slotName);
115             return;
116         }
117         log.info("dropSlotIfExist, slot exist, slotName={}", slotName);
118         String dropSlotSQL = "SELECT pg_drop_replication_slot(?)";
119         try (PreparedStatement preparedStatement = connection.prepareStatement(dropSlotSQL)) {
120             preparedStatement.setString(1, slotName);
121             preparedStatement.execute();
122         }
123     }
124     
125     /**
126      * Get the unique slot name by connection.
127      *
128      * @param connection the connection
129      * @param slotNameSuffix slot name suffix
130      * @return the unique name by connection
131      * @throws SQLException failed when getCatalog
132      */
133     public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
134         // PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit
135         String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
136         return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
137     }
138     
139     @Override
140     public String getDatabaseType() {
141         return "PostgreSQL";
142     }
143 }