View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
19  
20  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
21  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
22  import org.postgresql.PGConnection;
23  import org.postgresql.PGProperty;
24  import org.postgresql.replication.LogSequenceNumber;
25  import org.postgresql.replication.PGReplicationStream;
26  
27  import java.sql.Connection;
28  import java.sql.DriverManager;
29  import java.sql.SQLException;
30  import java.util.Properties;
31  
32  /**
33   * PostgreSQL logical replication.
34   */
35  public final class PostgreSQLLogicalReplication {
36      
37      /**
38       * Create connection.
39       *
40       * @param pipelineDataSourceConfig pipeline data source configuration
41       * @return PostgreSQL connection
42       * @throws SQLException SQL exception
43       */
44      public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
45          Properties props = new Properties();
46          PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername());
47          PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword());
48          PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
49          PGProperty.REPLICATION.set(props, "database");
50          PGProperty.PREFER_QUERY_MODE.set(props, "simple");
51          return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props);
52      }
53      
54      /**
55       * Create PostgreSQL replication stream.
56       *
57       * @param connection connection
58       * @param slotName slot name
59       * @param startPosition start position
60       * @return replication stream
61       * @throws SQLException SQL exception
62       */
63      public PGReplicationStream createReplicationStream(final Connection connection, final String slotName, final BaseLogSequenceNumber startPosition) throws SQLException {
64          return connection.unwrap(PGConnection.class).getReplicationAPI()
65                  .replicationStream()
66                  .logical()
67                  .withStartPosition((LogSequenceNumber) startPosition.get())
68                  .withSlotName(slotName)
69                  .withSlotOption("include-xids", true)
70                  .withSlotOption("skip-empty-xacts", true)
71                  .start();
72      }
73  }