1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode;
19
20 import com.google.common.base.Preconditions;
21 import lombok.RequiredArgsConstructor;
22 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
24 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
25 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseTimestampUtils;
26 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingException;
27 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
28 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
29 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
30 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
31 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
32 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
33 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
34 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
35 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
36 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
37 import org.apache.shardingsphere.infra.util.json.JsonUtils;
38 import org.opengauss.util.PGInterval;
39 import org.opengauss.util.PGobject;
40
41 import java.math.BigDecimal;
42 import java.nio.ByteBuffer;
43 import java.nio.charset.StandardCharsets;
44 import java.sql.Date;
45 import java.sql.SQLException;
46 import java.util.ArrayList;
47 import java.util.List;
48
49
50
51
52 @HighFrequencyInvocation
53 @RequiredArgsConstructor
54 public final class MppdbDecodingPlugin implements DecodingPlugin {
55
56 private final BaseTimestampUtils timestampUtils;
57
58 private final boolean decodeWithTX;
59
60 private final boolean decodeParallelly;
61
62 @Override
63 public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
64 AbstractWALEvent result;
65 byte[] bytes = new byte[data.remaining()];
66 data.get(bytes);
67 String dataText = new String(bytes, StandardCharsets.UTF_8);
68 if (decodeWithTX) {
69 result = decodeDataWithTX(dataText);
70 } else {
71 result = decodeDataIgnoreTX(dataText);
72 }
73 result.setLogSequenceNumber(logSequenceNumber);
74 return result;
75 }
76
77 private AbstractWALEvent decodeDataWithTX(final String dataText) {
78 if (decodeParallelly) {
79 return decodeParallelly(dataText);
80 } else {
81 return decodeSerially(dataText);
82 }
83 }
84
85 private AbstractWALEvent decodeSerially(final String dataText) {
86 AbstractWALEvent result = new PlaceholderEvent();
87 if (dataText.startsWith("BEGIN")) {
88 int beginIndex = dataText.indexOf("BEGIN") + "BEGIN".length() + 1;
89 result = new BeginTXEvent(Long.parseLong(dataText.substring(beginIndex)), null);
90 } else if (dataText.startsWith("COMMIT")) {
91 int commitBeginIndex = dataText.indexOf("COMMIT") + "COMMIT".length() + 1;
92 int csnBeginIndex = dataText.indexOf("CSN") + "CSN".length() + 1;
93 result = new CommitTXEvent(Long.parseLong(dataText.substring(commitBeginIndex, dataText.indexOf(' ', commitBeginIndex))), Long.parseLong(dataText.substring(csnBeginIndex)));
94 } else if (dataText.startsWith("{")) {
95 result = readTableEvent(dataText);
96 }
97 return result;
98 }
99
100 private AbstractWALEvent decodeParallelly(final String dataText) {
101 AbstractWALEvent result = new PlaceholderEvent();
102 if (dataText.startsWith("BEGIN")) {
103 int beginIndex = dataText.indexOf("CSN:") + "CSN:".length() + 1;
104 int firstLsnIndex = dataText.indexOf("first_lsn");
105 long csn = firstLsnIndex > 0 ? Long.parseLong(dataText.substring(beginIndex, firstLsnIndex - 1)) : 0L;
106 result = new BeginTXEvent(null, csn);
107 } else if (dataText.startsWith("commit") || dataText.startsWith("COMMIT")) {
108 int beginIndex = dataText.indexOf("xid:") + "xid:".length() + 1;
109 result = new CommitTXEvent(Long.parseLong(dataText.substring(beginIndex)), null);
110 } else if (dataText.startsWith("{")) {
111 result = readTableEvent(dataText);
112 }
113 return result;
114 }
115
116 private AbstractWALEvent decodeDataIgnoreTX(final String dataText) {
117 return dataText.startsWith("{") ? readTableEvent(dataText) : new PlaceholderEvent();
118 }
119
120 private AbstractRowEvent readTableEvent(final String mppData) {
121 MppTableData mppTableData;
122 mppTableData = JsonUtils.fromJsonString(mppData, MppTableData.class);
123 String rowEventType = mppTableData.getOpType();
124 PipelineSQLOperationType type;
125 try {
126 type = PipelineSQLOperationType.valueOf(rowEventType);
127 } catch (final IllegalArgumentException ex) {
128 throw new IngestException("Unknown rowEventType: " + rowEventType);
129 }
130 AbstractRowEvent result;
131 switch (type) {
132 case INSERT:
133 result = readWriteRowEvent(mppTableData);
134 break;
135 case UPDATE:
136 result = readUpdateRowEvent(mppTableData);
137 break;
138 case DELETE:
139 result = readDeleteRowEvent(mppTableData);
140 break;
141 default:
142 throw new IngestException("Unknown rowEventType: " + rowEventType);
143 }
144 String[] tableMetaData = mppTableData.getTableName().split("\\.");
145 result.setSchemaName(tableMetaData[0]);
146 result.setTableName(tableMetaData[1]);
147 return result;
148 }
149
150 private AbstractRowEvent readWriteRowEvent(final MppTableData data) {
151 WriteRowEvent result = new WriteRowEvent();
152 result.setAfterRow(getColumnDataFromMppDataEvent(data));
153 return result;
154 }
155
156 private AbstractRowEvent readUpdateRowEvent(final MppTableData data) {
157 UpdateRowEvent result = new UpdateRowEvent();
158 result.setAfterRow(getColumnDataFromMppDataEvent(data));
159 return result;
160 }
161
162 private AbstractRowEvent readDeleteRowEvent(final MppTableData data) {
163 DeleteRowEvent result = new DeleteRowEvent();
164 result.setPrimaryKeys(getDeleteColumnDataFromMppDataEvent(data));
165 return result;
166 }
167
168 private List<Object> getColumnDataFromMppDataEvent(final MppTableData data) {
169 List<Object> result = new ArrayList<>(data.getColumnsType().length);
170 for (int i = 0; i < data.getColumnsType().length; i++) {
171 result.add(readColumnData(data.getColumnsVal()[i], data.getColumnsType()[i]));
172 }
173 return result;
174 }
175
176 private List<Object> getDeleteColumnDataFromMppDataEvent(final MppTableData data) {
177 List<Object> result = new ArrayList<>(data.getOldKeysType().length);
178 for (int i = 0; i < data.getOldKeysType().length; i++) {
179 result.add(readColumnData(data.getOldKeysVal()[i], data.getOldKeysType()[i]));
180 }
181 return result;
182 }
183
184 private Object readColumnData(final String data, final String columnType) {
185 if ("null".equals(data)) {
186 return null;
187 }
188 if (columnType.startsWith("numeric")) {
189 return new BigDecimal(data);
190 }
191 if (columnType.startsWith("bit")) {
192 return decodeString(data.substring(1));
193 }
194 switch (columnType) {
195 case "tinyint":
196 case "smallint":
197 case "integer":
198 return Integer.parseInt(data);
199 case "bigint":
200 return Long.parseLong(data);
201 case "real":
202 return Float.parseFloat(data);
203 case "double precision":
204 return Double.parseDouble(data);
205 case "boolean":
206 return Boolean.parseBoolean(data);
207 case "time without time zone":
208 case "time with time zone":
209 try {
210 return timestampUtils.toTime(null, decodeString(data));
211 } catch (final SQLException ex) {
212 throw new DecodingException(ex);
213 }
214 case "date":
215 return Date.valueOf(decodeString(data));
216 case "timestamp without time zone":
217 case "timestamp with time zone":
218 case "smalldatetime":
219 try {
220 return timestampUtils.toTimestamp(null, decodeString(data));
221 } catch (final SQLException ex) {
222 throw new DecodingException(ex);
223 }
224 case "bytea":
225 case "blob":
226 return decodeBytea(data);
227 case "raw":
228 case "reltime":
229 case "int4range":
230 case "int8range":
231 case "numrange":
232 case "tsrange":
233 case "tstzrange":
234 case "daterange":
235 return decodePgObject(data, columnType);
236 case "money":
237 return decodeMoney(data);
238 case "interval":
239 return decodeInterval(data);
240 case "character varying":
241 case "text":
242 case "character":
243 case "nvarchar2":
244 case "tsquery":
245 default:
246 return decodeString(data).replace("''", "'");
247 }
248 }
249
250 private PGobject decodeInterval(final String data) {
251 try {
252 return new PGInterval(decodeString(data));
253 } catch (final SQLException ignored) {
254 return null;
255 }
256 }
257
258 private PGobject decodePgObject(final String data, final String type) {
259 try {
260 PGobject result = new PGobject();
261 result.setType(type);
262 result.setValue(decodeString(data));
263 return result;
264 } catch (final SQLException ignored) {
265 return null;
266 }
267 }
268
269 private Object decodeBytea(final String data) {
270 return decodeHex(decodeString(data).substring(2));
271 }
272
273 private String decodeMoney(final String data) {
274 String result = decodeString(data);
275 return '$' == result.charAt(0) ? result.substring(1) : result;
276 }
277
278 private String decodeString(final String data) {
279 if (data.length() > 1) {
280 int begin = '\'' == data.charAt(0) ? 1 : 0;
281 int end = data.length() + (data.charAt(data.length() - 1) == '\'' ? -1 : 0);
282 return data.substring(begin, end);
283 }
284 return data;
285 }
286
287 private byte[] decodeHex(final String hexString) {
288 int dataLength = hexString.length();
289 Preconditions.checkArgument(0 == (dataLength & 1), "Illegal hex data `%s`", hexString);
290 if (0 == dataLength) {
291 return new byte[0];
292 }
293 byte[] result = new byte[dataLength >>> 1];
294 for (int i = 0; i < dataLength; i += 2) {
295 result[i >>> 1] = decodeHexByte(hexString, i);
296 }
297 return result;
298 }
299
300 private byte decodeHexByte(final String hexString, final int index) {
301 int firstHexChar = Character.digit(hexString.charAt(index), 16);
302 int secondHexChar = Character.digit(hexString.charAt(index + 1), 16);
303 Preconditions.checkArgument(-1 != firstHexChar && -1 != secondHexChar, "Illegal hex byte `%s` in index `%d`", hexString, index);
304 return (byte) ((firstHexChar << 4) + secondHexChar);
305 }
306 }