1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
19
20 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
21 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
22 import org.postgresql.PGConnection;
23 import org.postgresql.PGProperty;
24 import org.postgresql.replication.LogSequenceNumber;
25 import org.postgresql.replication.PGReplicationStream;
26
27 import java.sql.Connection;
28 import java.sql.DriverManager;
29 import java.sql.SQLException;
30 import java.util.Properties;
31
32
33
34
35 public final class PostgreSQLLogicalReplication {
36
37
38
39
40
41
42
43
44 public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
45 Properties props = new Properties();
46 PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername());
47 PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword());
48 PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
49 PGProperty.REPLICATION.set(props, "database");
50 PGProperty.PREFER_QUERY_MODE.set(props, "simple");
51 return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props);
52 }
53
54
55
56
57
58
59
60
61
62
63 public PGReplicationStream createReplicationStream(final Connection connection, final String slotName, final BaseLogSequenceNumber startPosition) throws SQLException {
64 return connection.unwrap(PGConnection.class).getReplicationAPI()
65 .replicationStream()
66 .logical()
67 .withStartPosition((LogSequenceNumber) startPosition.get())
68 .withSlotName(slotName)
69 .withSlotOption("include-xids", true)
70 .withSlotOption("skip-empty-xacts", true)
71 .start();
72 }
73 }