1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.commons.codec.digest.DigestUtils;
22 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
23 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
24 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
25 import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
26 import org.postgresql.replication.LogSequenceNumber;
27
28 import javax.sql.DataSource;
29 import java.sql.Connection;
30 import java.sql.PreparedStatement;
31 import java.sql.ResultSet;
32 import java.sql.SQLException;
33
34
35
36
37 @Slf4j
38 public final class PostgreSQLIngestPositionManager implements DialectIngestPositionManager {
39
40 private static final String SLOT_NAME_PREFIX = "pipeline";
41
42 private static final String DECODE_PLUGIN = "test_decoding";
43
44 private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
45
46 @Override
47 public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
48 try (Connection connection = dataSource.getConnection()) {
49 createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix));
50 return getWalPosition(connection);
51 }
52 }
53
54 @Override
55 public WALPosition init(final String data) {
56 return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data)));
57 }
58
59 private void createSlotIfNotExist(final Connection connection, final String slotName) throws SQLException {
60 if (isSlotExisting(connection, slotName)) {
61 log.info("createSlotIfNotExist, slot exist, slotName={}", slotName);
62 return;
63 }
64 String createSlotSQL = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
65 try (PreparedStatement preparedStatement = connection.prepareStatement(createSlotSQL)) {
66 preparedStatement.execute();
67 } catch (final SQLException ex) {
68 if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
69 throw ex;
70 }
71 }
72 }
73
74 private boolean isSlotExisting(final Connection connection, final String slotName) throws SQLException {
75 String checkSlotSQL = "SELECT slot_name FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
76 try (PreparedStatement preparedStatement = connection.prepareStatement(checkSlotSQL)) {
77 preparedStatement.setString(1, slotName);
78 preparedStatement.setString(2, DECODE_PLUGIN);
79 try (ResultSet resultSet = preparedStatement.executeQuery()) {
80 return resultSet.next();
81 }
82 }
83 }
84
85 private WALPosition getWalPosition(final Connection connection) throws SQLException {
86 try (
87 PreparedStatement preparedStatement = connection.prepareStatement(getLogSequenceNumberSQL(connection));
88 ResultSet resultSet = preparedStatement.executeQuery()) {
89 resultSet.next();
90 return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
91 }
92 }
93
94 private String getLogSequenceNumberSQL(final Connection connection) throws SQLException {
95 if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <= connection.getMetaData().getDatabaseMinorVersion()) {
96 return "SELECT PG_CURRENT_XLOG_LOCATION()";
97 }
98 if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
99 return "SELECT PG_CURRENT_WAL_LSN()";
100 }
101 throw new PipelineInternalException("Unsupported PostgreSQL version: " + connection.getMetaData().getDatabaseProductVersion());
102 }
103
104 @Override
105 public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
106 try (Connection connection = dataSource.getConnection()) {
107 dropSlotIfExist(connection, slotNameSuffix);
108 }
109 }
110
111 private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException {
112 String slotName = getUniqueSlotName(connection, slotNameSuffix);
113 if (!isSlotExisting(connection, slotName)) {
114 log.info("dropSlotIfExist, slot not exist, slotName={}", slotName);
115 return;
116 }
117 log.info("dropSlotIfExist, slot exist, slotName={}", slotName);
118 String dropSlotSQL = "SELECT pg_drop_replication_slot(?)";
119 try (PreparedStatement preparedStatement = connection.prepareStatement(dropSlotSQL)) {
120 preparedStatement.setString(1, slotName);
121 preparedStatement.execute();
122 }
123 }
124
125
126
127
128
129
130
131
132
133 public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
134
135 String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
136 return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
137 }
138
139 @Override
140 public String getDatabaseType() {
141 return "PostgreSQL";
142 }
143 }