1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
19
20 import lombok.SneakyThrows;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
23 import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
25 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
26 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper;
27 import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
28 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
29 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
30 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
31 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
32 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
33 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
34 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
35 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
36 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLTimestampUtils;
37 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.TestDecodingPlugin;
38 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
39 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
40 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
41 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
42 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
43 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
44 import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
45 import org.postgresql.jdbc.PgConnection;
46 import org.postgresql.replication.PGReplicationStream;
47
48 import java.nio.ByteBuffer;
49 import java.sql.Connection;
50 import java.sql.SQLException;
51 import java.util.ArrayList;
52 import java.util.Collections;
53 import java.util.LinkedList;
54 import java.util.List;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import java.util.concurrent.atomic.AtomicReference;
57
58
59
60
61 @HighFrequencyInvocation
62 @Slf4j
63 public final class PostgreSQLWALDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper {
64
65 private final IncrementalDumperContext dumperContext;
66
67 private final AtomicReference<WALPosition> walPosition;
68
69 private final PipelineChannel channel;
70
71 private final WALEventConverter walEventConverter;
72
73 private final PostgreSQLLogicalReplication logicalReplication;
74
75 private final boolean decodeWithTX;
76
77 private List<AbstractRowEvent> rowEvents = new LinkedList<>();
78
79 public PostgreSQLWALDumper(final IncrementalDumperContext dumperContext, final IngestPosition position,
80 final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
81 ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()),
82 () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
83 this.dumperContext = dumperContext;
84 walPosition = new AtomicReference<>((WALPosition) position);
85 this.channel = channel;
86 walEventConverter = new WALEventConverter(dumperContext, metaDataLoader);
87 logicalReplication = new PostgreSQLLogicalReplication();
88 this.decodeWithTX = dumperContext.isDecodeWithTX();
89 }
90
91 @SneakyThrows(InterruptedException.class)
92 @Override
93 protected void runBlocking() {
94 AtomicInteger reconnectTimes = new AtomicInteger();
95 while (isRunning()) {
96 try {
97 dump();
98 break;
99 } catch (final SQLException ex) {
100 int times = reconnectTimes.incrementAndGet();
101 log.error("Connect failed, reconnect times={}", times, ex);
102 if (isRunning()) {
103 Thread.sleep(5000);
104 }
105 if (times >= 5) {
106 throw new IngestException(ex);
107 }
108 }
109 }
110 }
111
112 @SneakyThrows(InterruptedException.class)
113 private void dump() throws SQLException {
114
115 try (
116 Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig());
117 PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLIngestPositionManager.getUniqueSlotName(connection, dumperContext.getJobId()),
118 walPosition.get().getLogSequenceNumber())) {
119 PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
120 DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
121 while (isRunning()) {
122 ByteBuffer message = stream.readPending();
123 if (null == message) {
124 Thread.sleep(10L);
125 continue;
126 }
127 AbstractWALEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
128 if (decodeWithTX) {
129 processEventWithTX(event);
130 } else {
131 processEventIgnoreTX(event);
132 }
133 walPosition.set(new WALPosition(event.getLogSequenceNumber()));
134 }
135 }
136 }
137
138 private void processEventWithTX(final AbstractWALEvent event) {
139 if (event instanceof BeginTXEvent) {
140 rowEvents = new ArrayList<>();
141 return;
142 }
143 if (event instanceof AbstractRowEvent) {
144 rowEvents.add((AbstractRowEvent) event);
145 return;
146 }
147 if (event instanceof CommitTXEvent) {
148 List<Record> records = new LinkedList<>();
149 for (AbstractWALEvent each : rowEvents) {
150 records.add(walEventConverter.convert(each));
151 }
152 records.add(walEventConverter.convert(event));
153 channel.push(records);
154 }
155 }
156
157 private void processEventIgnoreTX(final AbstractWALEvent event) {
158 if (event instanceof BeginTXEvent) {
159 return;
160 }
161 channel.push(Collections.singletonList(walEventConverter.convert(event)));
162 }
163
164 @Override
165 protected void doStop() {
166 }
167 }