1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.commons.codec.digest.DigestUtils;
22 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
23 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
24 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
25 import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
26 import org.opengauss.replication.LogSequenceNumber;
27
28 import javax.sql.DataSource;
29 import java.sql.CallableStatement;
30 import java.sql.Connection;
31 import java.sql.PreparedStatement;
32 import java.sql.ResultSet;
33 import java.sql.SQLException;
34 import java.util.Optional;
35
36
37
38
39
40 @Slf4j
41 public final class OpenGaussIngestPositionManager implements DialectIngestPositionManager {
42
43 private static final String SLOT_NAME_PREFIX = "pipeline";
44
45 private static final String DECODE_PLUGIN = "mppdb_decoding";
46
47 private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
48
49 @Override
50 public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
51 try (Connection connection = dataSource.getConnection()) {
52 createSlotIfNotExist(connection, slotNameSuffix);
53 return getWalPosition(connection);
54 }
55 }
56
57 @Override
58 public WALPosition init(final String data) {
59 return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data)));
60 }
61
62
63
64
65
66
67
68
69 private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException {
70 String slotName = getUniqueSlotName(connection, slotNameSuffix);
71 Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, slotName);
72 if (!slotInfo.isPresent()) {
73 createSlotBySQL(connection, slotName);
74 return;
75 }
76 if (null == slotInfo.get().getDatabaseName()) {
77 dropSlotIfExist(connection, slotName);
78 createSlotBySQL(connection, slotName);
79 }
80 }
81
82 private Optional<ReplicationSlotInfo> getSlotInfo(final Connection connection, final String slotName) throws SQLException {
83 String sql = "SELECT slot_name, database FROM pg_replication_slots WHERE slot_name=? AND plugin=?";
84 try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
85 preparedStatement.setString(1, slotName);
86 preparedStatement.setString(2, DECODE_PLUGIN);
87 try (ResultSet resultSet = preparedStatement.executeQuery()) {
88 if (!resultSet.next()) {
89 return Optional.empty();
90 }
91 return Optional.of(new ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2)));
92 }
93 }
94 }
95
96 private void createSlotBySQL(final Connection connection, final String slotName) throws SQLException {
97 String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
98 try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
99 preparedStatement.execute();
100 } catch (final SQLException ex) {
101 if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
102 throw ex;
103 }
104 }
105 }
106
107 private WALPosition getWalPosition(final Connection connection) throws SQLException {
108 try (
109 PreparedStatement preparedStatement = connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
110 ResultSet resultSet = preparedStatement.executeQuery()) {
111 resultSet.next();
112 return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
113 }
114 }
115
116 @Override
117 public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
118 try (Connection connection = dataSource.getConnection()) {
119 dropSlotIfExist(connection, getUniqueSlotName(connection, slotNameSuffix));
120 }
121 }
122
123 private void dropSlotIfExist(final Connection connection, final String slotName) throws SQLException {
124 if (!getSlotInfo(connection, slotName).isPresent()) {
125 log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
126 return;
127 }
128 String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName);
129 try (CallableStatement callableStatement = connection.prepareCall(sql)) {
130 callableStatement.execute();
131 }
132 }
133
134
135
136
137
138
139
140
141
142 public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
143
144 String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
145 return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
146 }
147
148 @Override
149 public String getDatabaseType() {
150 return "openGauss";
151 }
152 }