View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode;
19  
20  import com.google.common.base.Preconditions;
21  import lombok.RequiredArgsConstructor;
22  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
23  import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
24  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
25  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseTimestampUtils;
26  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingException;
27  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
28  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
29  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
30  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
31  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
32  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
33  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
34  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
35  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
36  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
37  import org.apache.shardingsphere.infra.util.json.JsonUtils;
38  import org.opengauss.util.PGInterval;
39  import org.opengauss.util.PGobject;
40  
41  import java.math.BigDecimal;
42  import java.nio.ByteBuffer;
43  import java.nio.charset.StandardCharsets;
44  import java.sql.Date;
45  import java.sql.SQLException;
46  import java.util.ArrayList;
47  import java.util.List;
48  
49  /**
50   * Mppdb decoding plugin in openGauss.
51   */
52  @HighFrequencyInvocation
53  @RequiredArgsConstructor
54  public final class MppdbDecodingPlugin implements DecodingPlugin {
55      
56      private final BaseTimestampUtils timestampUtils;
57      
58      private final boolean decodeWithTX;
59      
60      private final boolean decodeParallelly;
61      
62      @Override
63      public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
64          AbstractWALEvent result;
65          byte[] bytes = new byte[data.remaining()];
66          data.get(bytes);
67          String dataText = new String(bytes, StandardCharsets.UTF_8);
68          if (decodeWithTX) {
69              result = decodeDataWithTX(dataText);
70          } else {
71              result = decodeDataIgnoreTX(dataText);
72          }
73          result.setLogSequenceNumber(logSequenceNumber);
74          return result;
75      }
76      
77      private AbstractWALEvent decodeDataWithTX(final String dataText) {
78          if (decodeParallelly) {
79              return decodeParallelly(dataText);
80          } else {
81              return decodeSerially(dataText);
82          }
83      }
84      
85      private AbstractWALEvent decodeSerially(final String dataText) {
86          AbstractWALEvent result = new PlaceholderEvent();
87          if (dataText.startsWith("BEGIN")) {
88              int beginIndex = dataText.indexOf("BEGIN") + "BEGIN".length() + 1;
89              result = new BeginTXEvent(Long.parseLong(dataText.substring(beginIndex)), null);
90          } else if (dataText.startsWith("COMMIT")) {
91              int commitBeginIndex = dataText.indexOf("COMMIT") + "COMMIT".length() + 1;
92              int csnBeginIndex = dataText.indexOf("CSN") + "CSN".length() + 1;
93              result = new CommitTXEvent(Long.parseLong(dataText.substring(commitBeginIndex, dataText.indexOf(' ', commitBeginIndex))), Long.parseLong(dataText.substring(csnBeginIndex)));
94          } else if (dataText.startsWith("{")) {
95              result = readTableEvent(dataText);
96          }
97          return result;
98      }
99      
100     private AbstractWALEvent decodeParallelly(final String dataText) {
101         AbstractWALEvent result = new PlaceholderEvent();
102         if (dataText.startsWith("BEGIN")) {
103             int beginIndex = dataText.indexOf("CSN:") + "CSN:".length() + 1;
104             int firstLsnIndex = dataText.indexOf("first_lsn");
105             long csn = firstLsnIndex > 0 ? Long.parseLong(dataText.substring(beginIndex, firstLsnIndex - 1)) : 0L;
106             result = new BeginTXEvent(null, csn);
107         } else if (dataText.startsWith("commit") || dataText.startsWith("COMMIT")) {
108             int beginIndex = dataText.indexOf("xid:") + "xid:".length() + 1;
109             result = new CommitTXEvent(Long.parseLong(dataText.substring(beginIndex)), null);
110         } else if (dataText.startsWith("{")) {
111             result = readTableEvent(dataText);
112         }
113         return result;
114     }
115     
116     private AbstractWALEvent decodeDataIgnoreTX(final String dataText) {
117         return dataText.startsWith("{") ? readTableEvent(dataText) : new PlaceholderEvent();
118     }
119     
120     private AbstractRowEvent readTableEvent(final String mppData) {
121         MppTableData mppTableData;
122         mppTableData = JsonUtils.fromJsonString(mppData, MppTableData.class);
123         String rowEventType = mppTableData.getOpType();
124         PipelineSQLOperationType type;
125         try {
126             type = PipelineSQLOperationType.valueOf(rowEventType);
127         } catch (final IllegalArgumentException ex) {
128             throw new IngestException("Unknown rowEventType: " + rowEventType);
129         }
130         AbstractRowEvent result;
131         switch (type) {
132             case INSERT:
133                 result = readWriteRowEvent(mppTableData);
134                 break;
135             case UPDATE:
136                 result = readUpdateRowEvent(mppTableData);
137                 break;
138             case DELETE:
139                 result = readDeleteRowEvent(mppTableData);
140                 break;
141             default:
142                 throw new IngestException("Unknown rowEventType: " + rowEventType);
143         }
144         String[] tableMetaData = mppTableData.getTableName().split("\\.");
145         result.setSchemaName(tableMetaData[0]);
146         result.setTableName(tableMetaData[1]);
147         return result;
148     }
149     
150     private AbstractRowEvent readWriteRowEvent(final MppTableData data) {
151         WriteRowEvent result = new WriteRowEvent();
152         result.setAfterRow(getColumnDataFromMppDataEvent(data));
153         return result;
154     }
155     
156     private AbstractRowEvent readUpdateRowEvent(final MppTableData data) {
157         UpdateRowEvent result = new UpdateRowEvent();
158         result.setAfterRow(getColumnDataFromMppDataEvent(data));
159         return result;
160     }
161     
162     private AbstractRowEvent readDeleteRowEvent(final MppTableData data) {
163         DeleteRowEvent result = new DeleteRowEvent();
164         result.setPrimaryKeys(getDeleteColumnDataFromMppDataEvent(data));
165         return result;
166     }
167     
168     private List<Object> getColumnDataFromMppDataEvent(final MppTableData data) {
169         List<Object> result = new ArrayList<>(data.getColumnsType().length);
170         for (int i = 0; i < data.getColumnsType().length; i++) {
171             result.add(readColumnData(data.getColumnsVal()[i], data.getColumnsType()[i]));
172         }
173         return result;
174     }
175     
176     private List<Object> getDeleteColumnDataFromMppDataEvent(final MppTableData data) {
177         List<Object> result = new ArrayList<>(data.getOldKeysType().length);
178         for (int i = 0; i < data.getOldKeysType().length; i++) {
179             result.add(readColumnData(data.getOldKeysVal()[i], data.getOldKeysType()[i]));
180         }
181         return result;
182     }
183     
184     private Object readColumnData(final String data, final String columnType) {
185         if ("null".equals(data)) {
186             return null;
187         }
188         if (columnType.startsWith("numeric")) {
189             return new BigDecimal(data);
190         }
191         if (columnType.startsWith("bit")) {
192             return decodeString(data.substring(1));
193         }
194         switch (columnType) {
195             case "tinyint":
196             case "smallint":
197             case "integer":
198                 return Integer.parseInt(data);
199             case "bigint":
200                 return Long.parseLong(data);
201             case "real":
202                 return Float.parseFloat(data);
203             case "double precision":
204                 return Double.parseDouble(data);
205             case "boolean":
206                 return Boolean.parseBoolean(data);
207             case "time without time zone":
208             case "time with time zone":
209                 try {
210                     return timestampUtils.toTime(null, decodeString(data));
211                 } catch (final SQLException ex) {
212                     throw new DecodingException(ex);
213                 }
214             case "date":
215                 return Date.valueOf(decodeString(data));
216             case "timestamp without time zone":
217             case "timestamp with time zone":
218             case "smalldatetime":
219                 try {
220                     return timestampUtils.toTimestamp(null, decodeString(data));
221                 } catch (final SQLException ex) {
222                     throw new DecodingException(ex);
223                 }
224             case "bytea":
225             case "blob":
226                 return decodeBytea(data);
227             case "raw":
228             case "reltime":
229             case "int4range":
230             case "int8range":
231             case "numrange":
232             case "tsrange":
233             case "tstzrange":
234             case "daterange":
235                 return decodePgObject(data, columnType);
236             case "money":
237                 return decodeMoney(data);
238             case "interval":
239                 return decodeInterval(data);
240             case "character varying":
241             case "text":
242             case "character":
243             case "nvarchar2":
244             case "tsquery":
245             default:
246                 return decodeString(data).replace("''", "'");
247         }
248     }
249     
250     private PGobject decodeInterval(final String data) {
251         try {
252             return new PGInterval(decodeString(data));
253         } catch (final SQLException ignored) {
254             return null;
255         }
256     }
257     
258     private PGobject decodePgObject(final String data, final String type) {
259         try {
260             PGobject result = new PGobject();
261             result.setType(type);
262             result.setValue(decodeString(data));
263             return result;
264         } catch (final SQLException ignored) {
265             return null;
266         }
267     }
268     
269     private Object decodeBytea(final String data) {
270         return decodeHex(decodeString(data).substring(2));
271     }
272     
273     private String decodeMoney(final String data) {
274         String result = decodeString(data);
275         return '$' == result.charAt(0) ? result.substring(1) : result;
276     }
277     
278     private String decodeString(final String data) {
279         if (data.length() > 1) {
280             int begin = '\'' == data.charAt(0) ? 1 : 0;
281             int end = data.length() + (data.charAt(data.length() - 1) == '\'' ? -1 : 0);
282             return data.substring(begin, end);
283         }
284         return data;
285     }
286     
287     private byte[] decodeHex(final String hexString) {
288         int dataLength = hexString.length();
289         Preconditions.checkArgument(0 == (dataLength & 1), "Illegal hex data `%s`", hexString);
290         if (0 == dataLength) {
291             return new byte[0];
292         }
293         byte[] result = new byte[dataLength >>> 1];
294         for (int i = 0; i < dataLength; i += 2) {
295             result[i >>> 1] = decodeHexByte(hexString, i);
296         }
297         return result;
298     }
299     
300     private byte decodeHexByte(final String hexString, final int index) {
301         int firstHexChar = Character.digit(hexString.charAt(index), 16);
302         int secondHexChar = Character.digit(hexString.charAt(index + 1), 16);
303         Preconditions.checkArgument(-1 != firstHexChar && -1 != secondHexChar, "Illegal hex byte `%s` in index `%d`", hexString, index);
304         return (byte) ((firstHexChar << 4) + secondHexChar);
305     }
306 }