View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
19  
20  import lombok.SneakyThrows;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
23  import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
25  import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
26  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
27  import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
28  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
29  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
30  import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
31  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
32  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
33  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
34  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
35  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
36  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
37  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
38  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
39  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
40  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
41  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
42  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
43  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
44  import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
45  import org.postgresql.jdbc.PgConnection;
46  import org.postgresql.replication.PGReplicationStream;
47  
48  import java.nio.ByteBuffer;
49  import java.sql.Connection;
50  import java.sql.SQLException;
51  import java.util.ArrayList;
52  import java.util.Collections;
53  import java.util.LinkedList;
54  import java.util.List;
55  import java.util.concurrent.atomic.AtomicInteger;
56  import java.util.concurrent.atomic.AtomicReference;
57  
58  /**
59   * PostgreSQL WAL dumper.
60   */
61  @HighFrequencyInvocation
62  @Slf4j
63  public final class PostgreSQLWALDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper {
64      
65      private final IncrementalDumperContext dumperContext;
66      
67      private final AtomicReference<WALPosition> walPosition;
68      
69      private final PipelineChannel channel;
70      
71      private final WALEventConverter walEventConverter;
72      
73      private final PostgreSQLLogicalReplication logicalReplication;
74      
75      private final boolean decodeWithTX;
76      
77      private List<AbstractRowEvent> rowEvents = new LinkedList<>();
78      
79      public PostgreSQLWALDumper(final IncrementalDumperContext dumperContext, final IngestPosition position,
80                                 final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
81          ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
82                  () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
83          this.dumperContext = dumperContext;
84          walPosition = new AtomicReference<>((WALPosition) position);
85          this.channel = channel;
86          walEventConverter = new WALEventConverter(dumperContext, metaDataLoader);
87          logicalReplication = new PostgreSQLLogicalReplication();
88          this.decodeWithTX = dumperContext.isDecodeWithTX();
89      }
90      
91      @SneakyThrows(InterruptedException.class)
92      @Override
93      protected void runBlocking() {
94          AtomicInteger reconnectTimes = new AtomicInteger();
95          while (isRunning()) {
96              try {
97                  dump();
98                  break;
99              } catch (final SQLException ex) {
100                 int times = reconnectTimes.incrementAndGet();
101                 log.error("Connect failed, reconnect times={}", times, ex);
102                 if (isRunning()) {
103                     Thread.sleep(5000);
104                 }
105                 if (times >= 5) {
106                     throw new IngestException(ex);
107                 }
108             }
109         }
110     }
111     
112     @SneakyThrows(InterruptedException.class)
113     private void dump() throws SQLException {
114         // TODO use unified PgConnection
115         try (
116                 Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig());
117                 PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLIngestPositionManager.getUniqueSlotName(connection, dumperContext.getJobId()),
118                         walPosition.get().getLogSequenceNumber())) {
119             PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
120             DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
121             while (isRunning()) {
122                 ByteBuffer message = stream.readPending();
123                 if (null == message) {
124                     Thread.sleep(10L);
125                     continue;
126                 }
127                 AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
128                 if (decodeWithTX) {
129                     processEventWithTX(event);
130                 } else {
131                     processEventIgnoreTX(event);
132                 }
133                 walPosition.set(new WALPosition(event.getLogSequenceNumber()));
134             }
135         }
136     }
137     
138     private void processEventWithTX(final AbstractWALEvent event) {
139         if (event instanceof BeginTXEvent) {
140             rowEvents = new ArrayList<>();
141             return;
142         }
143         if (event instanceof AbstractRowEvent) {
144             rowEvents.add((AbstractRowEvent) event);
145             return;
146         }
147         if (event instanceof CommitTXEvent) {
148             List<Record> records = new LinkedList<>();
149             for (AbstractWALEvent each : rowEvents) {
150                 records.add(walEventConverter.convert(each));
151             }
152             records.add(walEventConverter.convert(event));
153             channel.push(records);
154         }
155     }
156     
157     private void processEventIgnoreTX(final AbstractWALEvent event) {
158         if (event instanceof BeginTXEvent) {
159             return;
160         }
161         channel.push(Collections.singletonList(walEventConverter.convert(event)));
162     }
163     
164     @Override
165     protected void doStop() {
166     }
167 }