1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
19
20 import com.google.common.base.Preconditions;
21 import lombok.RequiredArgsConstructor;
22 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
24 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
25 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
26 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
27 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
28 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
29 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
30 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
31 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
32 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
33
34 import java.math.BigDecimal;
35 import java.nio.ByteBuffer;
36 import java.nio.charset.StandardCharsets;
37 import java.sql.Date;
38 import java.sql.SQLException;
39 import java.util.LinkedList;
40 import java.util.List;
41
42
43
44
45 @HighFrequencyInvocation
46 @RequiredArgsConstructor
47 public final class TestDecodingPlugin implements DecodingPlugin {
48
49 private final BaseTimestampUtils timestampUtils;
50
51 @Override
52 public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
53 AbstractWALEvent result;
54 String type = readEventType(data);
55 if (type.startsWith("BEGIN")) {
56 result = new BeginTXEvent(Long.parseLong(readNextSegment(data)), null);
57 } else if (type.startsWith("COMMIT")) {
58 result = new CommitTXEvent(Long.parseLong(readNextSegment(data)), null);
59 } else {
60 result = "table".equals(type) ? readTableEvent(data) : new PlaceholderEvent();
61 }
62 result.setLogSequenceNumber(logSequenceNumber);
63 return result;
64 }
65
66 private String readEventType(final ByteBuffer data) {
67 return readNextSegment(data);
68 }
69
70 private AbstractRowEvent readTableEvent(final ByteBuffer data) {
71 String tableName = readTableName(data);
72 String rowEventType = readRowEventType(data);
73 PipelineSQLOperationType type;
74 try {
75 type = PipelineSQLOperationType.valueOf(rowEventType);
76 } catch (final IllegalArgumentException ex) {
77 throw new IngestException("Unknown rowEventType: " + rowEventType);
78 }
79 AbstractRowEvent result;
80 switch (type) {
81 case INSERT:
82 result = readWriteRowEvent(data);
83 break;
84 case UPDATE:
85 result = readUpdateRowEvent(data);
86 break;
87 case DELETE:
88 result = readDeleteRowEvent(data);
89 break;
90 default:
91 throw new IngestException("Unknown rowEventType: " + rowEventType);
92 }
93 String[] tableMetaData = tableName.split("\\.");
94 result.setSchemaName(tableMetaData[0]);
95 result.setTableName(tableMetaData[1].substring(0, tableMetaData[1].length() - 1));
96 return result;
97 }
98
99 private AbstractRowEvent readWriteRowEvent(final ByteBuffer data) {
100 WriteRowEvent result = new WriteRowEvent();
101 List<Object> afterColumns = new LinkedList<>();
102 while (data.hasRemaining()) {
103 afterColumns.add(readColumn(data));
104 }
105 result.setAfterRow(afterColumns);
106 return result;
107 }
108
109 private AbstractRowEvent readUpdateRowEvent(final ByteBuffer data) {
110 UpdateRowEvent result = new UpdateRowEvent();
111 List<Object> afterColumns = new LinkedList<>();
112 while (data.hasRemaining()) {
113 afterColumns.add(readColumn(data));
114 }
115 result.setAfterRow(afterColumns);
116 return result;
117 }
118
119 private AbstractRowEvent readDeleteRowEvent(final ByteBuffer data) {
120 DeleteRowEvent result = new DeleteRowEvent();
121 List<Object> afterColumns = new LinkedList<>();
122 while (data.hasRemaining()) {
123 afterColumns.add(readColumn(data));
124 }
125 result.setPrimaryKeys(afterColumns);
126 return result;
127 }
128
129 private String readTableName(final ByteBuffer data) {
130 return readNextSegment(data);
131 }
132
133 private String readRowEventType(final ByteBuffer data) {
134 String result = readNextSegment(data);
135 return result.substring(0, result.length() - 1);
136 }
137
138 private Object readColumn(final ByteBuffer data) {
139 readColumnName(data);
140 String columnType = readColumnType(data);
141 data.get();
142 return readColumnData(data, columnType);
143 }
144
145 private String readColumnName(final ByteBuffer data) {
146 StringBuilder eventType = new StringBuilder();
147 while (data.hasRemaining()) {
148 char c = (char) data.get();
149 if ('[' == c) {
150 return eventType.toString();
151 }
152 eventType.append(c);
153 }
154 return eventType.toString();
155 }
156
157 private String readColumnType(final ByteBuffer data) {
158 StringBuilder eventType = new StringBuilder();
159 while (data.hasRemaining()) {
160 char c = (char) data.get();
161 if (']' == c) {
162 return eventType.toString();
163 }
164 eventType.append(c);
165 }
166 return eventType.toString();
167 }
168
169 private Object readColumnData(final ByteBuffer data, final String columnType) {
170 data.mark();
171 if ('n' == data.get() && data.remaining() >= 3 && 'u' == data.get() && 'l' == data.get()) {
172 if (data.hasRemaining()) {
173 data.get();
174 }
175 return null;
176 }
177 data.reset();
178 if (columnType.startsWith("numeric")) {
179 return new BigDecimal(readNextSegment(data));
180 }
181 if (columnType.startsWith("bit") || columnType.startsWith("bit varying")) {
182 return readNextSegment(data);
183 }
184 switch (columnType) {
185 case "smallint":
186 return Short.parseShort(readNextSegment(data));
187 case "integer":
188 return Integer.parseInt(readNextSegment(data));
189 case "bigint":
190 return Long.parseLong(readNextSegment(data));
191 case "real":
192 return Float.parseFloat(readNextSegment(data));
193 case "double precision":
194 return Double.parseDouble(readNextSegment(data));
195 case "boolean":
196 return Boolean.parseBoolean(readNextSegment(data));
197 case "time without time zone":
198 try {
199 return timestampUtils.toTime(null, readNextString(data));
200 } catch (final SQLException ex) {
201 throw new DecodingException(ex);
202 }
203 case "date":
204 return Date.valueOf(readNextString(data));
205 case "timestamp without time zone":
206 try {
207 return timestampUtils.toTimestamp(null, readNextString(data));
208 } catch (final SQLException ex) {
209 throw new DecodingException(ex);
210 }
211 case "bytea":
212 return decodeHex(readNextString(data).substring(2));
213 case "json":
214 case "jsonb":
215 return readNextJson(data);
216 default:
217 return readNextString(data);
218 }
219 }
220
221 private String readNextSegment(final ByteBuffer data) {
222 StringBuilder eventType = new StringBuilder();
223 while (data.hasRemaining()) {
224 char c = (char) data.get();
225 if (' ' == c) {
226 return eventType.toString();
227 }
228 eventType.append(c);
229 }
230 return eventType.toString();
231 }
232
233 private String readNextJson(final ByteBuffer data) {
234 data.get();
235 int offset = 0;
236 int startPosition = data.position();
237 int level = 0;
238 while (data.hasRemaining()) {
239 offset++;
240 char c = (char) data.get();
241 if ('{' == c) {
242 level++;
243 } else if ('}' == c) {
244 level--;
245 if (0 != level) {
246 continue;
247 }
248 if ('\'' != data.get()) {
249 throw new IngestException("Read json data unexpected exception");
250 }
251 if (data.hasRemaining()) {
252 data.get();
253 }
254 return readStringSegment(data, startPosition, offset).replace("''", "'");
255 }
256 }
257 return null;
258 }
259
260 private String readStringSegment(final ByteBuffer data, final int startPosition, final int offset) {
261 byte[] result = new byte[offset];
262 for (int i = 0; i < offset; i++) {
263 result[i] = data.get(startPosition + i);
264 }
265 return new String(result, StandardCharsets.UTF_8);
266 }
267
268 private String readNextString(final ByteBuffer data) {
269 int offset = 0;
270 data.get();
271 int startPosition = data.position();
272 while (data.hasRemaining()) {
273 char c = (char) data.get();
274 offset++;
275 if ('\'' == c) {
276 if (!data.hasRemaining()) {
277 offset--;
278 return readStringSegment(data, startPosition, offset).replace("''", "'");
279 }
280 char c2 = (char) data.get();
281 if ('\'' == c2) {
282 offset++;
283 continue;
284 }
285 if (' ' == c2) {
286 offset--;
287 return readStringSegment(data, startPosition, offset).replace("''", "'");
288 }
289 }
290 }
291 return readStringSegment(data, startPosition, offset);
292 }
293
294 private byte[] decodeHex(final String hexString) {
295 int dataLength = hexString.length();
296 Preconditions.checkArgument(0 == (dataLength & 1), "Illegal hex data `%s`", hexString);
297 if (0 == dataLength) {
298 return new byte[0];
299 }
300 byte[] result = new byte[dataLength >>> 1];
301 for (int i = 0; i < dataLength; i += 2) {
302 result[i >>> 1] = decodeHexByte(hexString, i);
303 }
304 return result;
305 }
306
307 private byte decodeHexByte(final String hexString, final int index) {
308 int firstHexChar = Character.digit(hexString.charAt(index), 16);
309 int secondHexChar = Character.digit(hexString.charAt(index + 1), 16);
310 Preconditions.checkArgument(-1 != firstHexChar && -1 != secondHexChar, "Illegal hex byte `%s` in index `%d`", hexString, index);
311 return (byte) ((firstHexChar << 4) + secondHexChar);
312 }
313 }