View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
19  
20  import com.google.common.base.Preconditions;
21  import lombok.RequiredArgsConstructor;
22  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23  import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
24  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
25  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
26  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
27  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
28  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
29  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
30  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
31  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
32  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
33  
34  import java.math.BigDecimal;
35  import java.nio.ByteBuffer;
36  import java.nio.charset.StandardCharsets;
37  import java.sql.Date;
38  import java.sql.SQLException;
39  import java.util.LinkedList;
40  import java.util.List;
41  
42  /**
43   * Test decoding plugin.
44   */
45  @HighFrequencyInvocation
46  @RequiredArgsConstructor
47  public final class TestDecodingPlugin implements DecodingPlugin {
48      
49      private final BaseTimestampUtils timestampUtils;
50      
51      @Override
52      public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
53          AbstractWALEvent result;
54          String type = readEventType(data);
55          if (type.startsWith("BEGIN")) {
56              result = new BeginTXEvent(Long.parseLong(readNextSegment(data)), null);
57          } else if (type.startsWith("COMMIT")) {
58              result = new CommitTXEvent(Long.parseLong(readNextSegment(data)), null);
59          } else {
60              result = "table".equals(type) ? readTableEvent(data) : new PlaceholderEvent();
61          }
62          result.setLogSequenceNumber(logSequenceNumber);
63          return result;
64      }
65      
66      private String readEventType(final ByteBuffer data) {
67          return readNextSegment(data);
68      }
69      
70      private AbstractRowEvent readTableEvent(final ByteBuffer data) {
71          String tableName = readTableName(data);
72          String rowEventType = readRowEventType(data);
73          PipelineSQLOperationType type;
74          try {
75              type = PipelineSQLOperationType.valueOf(rowEventType);
76          } catch (final IllegalArgumentException ex) {
77              throw new IngestException("Unknown rowEventType: " + rowEventType);
78          }
79          AbstractRowEvent result;
80          switch (type) {
81              case INSERT:
82                  result = readWriteRowEvent(data);
83                  break;
84              case UPDATE:
85                  result = readUpdateRowEvent(data);
86                  break;
87              case DELETE:
88                  result = readDeleteRowEvent(data);
89                  break;
90              default:
91                  throw new IngestException("Unknown rowEventType: " + rowEventType);
92          }
93          String[] tableMetaData = tableName.split("\\.");
94          result.setSchemaName(tableMetaData[0]);
95          result.setTableName(tableMetaData[1].substring(0, tableMetaData[1].length() - 1));
96          return result;
97      }
98      
99      private AbstractRowEvent readWriteRowEvent(final ByteBuffer data) {
100         WriteRowEvent result = new WriteRowEvent();
101         List<Object> afterColumns = new LinkedList<>();
102         while (data.hasRemaining()) {
103             afterColumns.add(readColumn(data));
104         }
105         result.setAfterRow(afterColumns);
106         return result;
107     }
108     
109     private AbstractRowEvent readUpdateRowEvent(final ByteBuffer data) {
110         UpdateRowEvent result = new UpdateRowEvent();
111         List<Object> afterColumns = new LinkedList<>();
112         while (data.hasRemaining()) {
113             afterColumns.add(readColumn(data));
114         }
115         result.setAfterRow(afterColumns);
116         return result;
117     }
118     
119     private AbstractRowEvent readDeleteRowEvent(final ByteBuffer data) {
120         DeleteRowEvent result = new DeleteRowEvent();
121         List<Object> afterColumns = new LinkedList<>();
122         while (data.hasRemaining()) {
123             afterColumns.add(readColumn(data));
124         }
125         result.setPrimaryKeys(afterColumns);
126         return result;
127     }
128     
129     private String readTableName(final ByteBuffer data) {
130         return readNextSegment(data);
131     }
132     
133     private String readRowEventType(final ByteBuffer data) {
134         String result = readNextSegment(data);
135         return result.substring(0, result.length() - 1);
136     }
137     
138     private Object readColumn(final ByteBuffer data) {
139         readColumnName(data);
140         String columnType = readColumnType(data);
141         data.get();
142         return readColumnData(data, columnType);
143     }
144     
145     private String readColumnName(final ByteBuffer data) {
146         StringBuilder eventType = new StringBuilder();
147         while (data.hasRemaining()) {
148             char c = (char) data.get();
149             if ('[' == c) {
150                 return eventType.toString();
151             }
152             eventType.append(c);
153         }
154         return eventType.toString();
155     }
156     
157     private String readColumnType(final ByteBuffer data) {
158         StringBuilder eventType = new StringBuilder();
159         while (data.hasRemaining()) {
160             char c = (char) data.get();
161             if (']' == c) {
162                 return eventType.toString();
163             }
164             eventType.append(c);
165         }
166         return eventType.toString();
167     }
168     
169     private Object readColumnData(final ByteBuffer data, final String columnType) {
170         data.mark();
171         if ('n' == data.get() && data.remaining() >= 3 && 'u' == data.get() && 'l' == data.get()) {
172             if (data.hasRemaining()) {
173                 data.get();
174             }
175             return null;
176         }
177         data.reset();
178         if (columnType.startsWith("numeric")) {
179             return new BigDecimal(readNextSegment(data));
180         }
181         if (columnType.startsWith("bit") || columnType.startsWith("bit varying")) {
182             return readNextSegment(data);
183         }
184         switch (columnType) {
185             case "smallint":
186                 return Short.parseShort(readNextSegment(data));
187             case "integer":
188                 return Integer.parseInt(readNextSegment(data));
189             case "bigint":
190                 return Long.parseLong(readNextSegment(data));
191             case "real":
192                 return Float.parseFloat(readNextSegment(data));
193             case "double precision":
194                 return Double.parseDouble(readNextSegment(data));
195             case "boolean":
196                 return Boolean.parseBoolean(readNextSegment(data));
197             case "time without time zone":
198                 try {
199                     return timestampUtils.toTime(null, readNextString(data));
200                 } catch (final SQLException ex) {
201                     throw new DecodingException(ex);
202                 }
203             case "date":
204                 return Date.valueOf(readNextString(data));
205             case "timestamp without time zone":
206                 try {
207                     return timestampUtils.toTimestamp(null, readNextString(data));
208                 } catch (final SQLException ex) {
209                     throw new DecodingException(ex);
210                 }
211             case "bytea":
212                 return decodeHex(readNextString(data).substring(2));
213             case "json":
214             case "jsonb":
215                 return readNextJson(data);
216             default:
217                 return readNextString(data);
218         }
219     }
220     
221     private String readNextSegment(final ByteBuffer data) {
222         StringBuilder eventType = new StringBuilder();
223         while (data.hasRemaining()) {
224             char c = (char) data.get();
225             if (' ' == c) {
226                 return eventType.toString();
227             }
228             eventType.append(c);
229         }
230         return eventType.toString();
231     }
232     
233     private String readNextJson(final ByteBuffer data) {
234         data.get();
235         int offset = 0;
236         int startPosition = data.position();
237         int level = 0;
238         while (data.hasRemaining()) {
239             offset++;
240             char c = (char) data.get();
241             if ('{' == c) {
242                 level++;
243             } else if ('}' == c) {
244                 level--;
245                 if (0 != level) {
246                     continue;
247                 }
248                 if ('\'' != data.get()) {
249                     throw new IngestException("Read json data unexpected exception");
250                 }
251                 if (data.hasRemaining()) {
252                     data.get();
253                 }
254                 return readStringSegment(data, startPosition, offset).replace("''", "'");
255             }
256         }
257         return null;
258     }
259     
260     private String readStringSegment(final ByteBuffer data, final int startPosition, final int offset) {
261         byte[] result = new byte[offset];
262         for (int i = 0; i < offset; i++) {
263             result[i] = data.get(startPosition + i);
264         }
265         return new String(result, StandardCharsets.UTF_8);
266     }
267     
268     private String readNextString(final ByteBuffer data) {
269         int offset = 0;
270         data.get();
271         int startPosition = data.position();
272         while (data.hasRemaining()) {
273             char c = (char) data.get();
274             offset++;
275             if ('\'' == c) {
276                 if (!data.hasRemaining()) {
277                     offset--;
278                     return readStringSegment(data, startPosition, offset).replace("''", "'");
279                 }
280                 char c2 = (char) data.get();
281                 if ('\'' == c2) {
282                     offset++;
283                     continue;
284                 }
285                 if (' ' == c2) {
286                     offset--;
287                     return readStringSegment(data, startPosition, offset).replace("''", "'");
288                 }
289             }
290         }
291         return readStringSegment(data, startPosition, offset);
292     }
293     
294     private byte[] decodeHex(final String hexString) {
295         int dataLength = hexString.length();
296         Preconditions.checkArgument(0 == (dataLength & 1), "Illegal hex data `%s`", hexString);
297         if (0 == dataLength) {
298             return new byte[0];
299         }
300         byte[] result = new byte[dataLength >>> 1];
301         for (int i = 0; i < dataLength; i += 2) {
302             result[i >>> 1] = decodeHexByte(hexString, i);
303         }
304         return result;
305     }
306     
307     private byte decodeHexByte(final String hexString, final int index) {
308         int firstHexChar = Character.digit(hexString.charAt(index), 16);
309         int secondHexChar = Character.digit(hexString.charAt(index + 1), 16);
310         Preconditions.checkArgument(-1 != firstHexChar && -1 != secondHexChar, "Illegal hex byte `%s` in index `%d`", hexString, index);
311         return (byte) ((firstHexChar << 4) + secondHexChar);
312     }
313 }