View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty;
19  
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufUtil;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.handler.codec.ByteToMessageDecoder;
24  import lombok.extern.slf4j.Slf4j;
25  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
26  import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext;
27  import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
28  import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
29  import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
30  import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
31  import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.QueryEvent;
32  import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
33  import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
34  import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent;
35  import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
36  import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
37  import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
38  import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.management.MySQLBinlogFormatDescriptionEventPacket;
39  import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.management.MySQLBinlogRotateEventPacket;
40  import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogRowsEventPacket;
41  import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
42  import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
43  
44  import java.util.LinkedList;
45  import java.util.List;
46  import java.util.Map;
47  import java.util.Optional;
48  
49  /**
50   * MySQL binlog event packet decoder.
51   */
52  @Slf4j
53  public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
54      
55      private static final String TX_BEGIN_SQL = "BEGIN";
56      
57      private final BinlogContext binlogContext;
58      
59      private final boolean decodeWithTX;
60      
61      private List<AbstractBinlogEvent> records = new LinkedList<>();
62      
63      public MySQLBinlogEventPacketDecoder(final int checksumLength, final Map<Long, MySQLBinlogTableMapEventPacket> tableMap, final boolean decodeWithTX) {
64          this.decodeWithTX = decodeWithTX;
65          binlogContext = new BinlogContext(checksumLength, tableMap);
66      }
67      
68      @Override
69      protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
70          while (in.readableBytes() >= 1 + MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
71              in.markReaderIndex();
72              MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
73              checkPayload(payload);
74              MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
75              if (!checkEventIntegrity(in, binlogEventHeader)) {
76                  return;
77              }
78              Optional<AbstractBinlogEvent> binlogEvent = decodeEvent(binlogEventHeader, payload);
79              if (!binlogEvent.isPresent()) {
80                  skipChecksum(binlogEventHeader.getEventType(), in);
81                  return;
82              }
83              if (binlogEvent.get() instanceof PlaceholderEvent) {
84                  out.add(binlogEvent.get());
85                  skipChecksum(binlogEventHeader.getEventType(), in);
86                  return;
87              }
88              if (decodeWithTX) {
89                  processEventWithTX(binlogEvent.get(), out);
90              } else {
91                  processEventIgnoreTX(binlogEvent.get(), out);
92              }
93              skipChecksum(binlogEventHeader.getEventType(), in);
94          }
95      }
96      
97      private void checkPayload(final MySQLPacketPayload payload) {
98          int statusCode = payload.readInt1();
99          if (255 == statusCode) {
100             int errorNo = payload.readInt2();
101             payload.skipReserved(1);
102             String sqlState = payload.readStringFix(5);
103             throw new PipelineInternalException("Decode binlog event failed, errorCode: %d, sqlState: %s, errorMessage: %s", errorNo, sqlState, payload.readStringEOF());
104         }
105         if (0 != statusCode) {
106             log.debug("Illegal binlog status code {}, remaining packet \n{}", statusCode, readRemainPacket(payload));
107         }
108     }
109     
110     private String readRemainPacket(final MySQLPacketPayload payload) {
111         return ByteBufUtil.hexDump(payload.readStringFixByBytes(payload.getByteBuf().readableBytes()));
112     }
113     
114     private boolean checkEventIntegrity(final ByteBuf in, final MySQLBinlogEventHeader binlogEventHeader) {
115         if (in.readableBytes() < binlogEventHeader.getEventSize() - MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
116             log.debug("the event body is not complete, event size={}, readable bytes={}", binlogEventHeader.getEventSize(), in.readableBytes());
117             in.resetReaderIndex();
118             return false;
119         }
120         return true;
121     }
122     
123     private void processEventWithTX(final AbstractBinlogEvent binlogEvent, final List<Object> out) {
124         if (binlogEvent instanceof QueryEvent) {
125             QueryEvent queryEvent = (QueryEvent) binlogEvent;
126             if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
127                 records = new LinkedList<>();
128             } else {
129                 out.add(binlogEvent);
130             }
131         } else if (binlogEvent instanceof XidEvent) {
132             records.add(binlogEvent);
133             out.add(records);
134         } else {
135             records.add(binlogEvent);
136         }
137     }
138     
139     private void processEventIgnoreTX(final AbstractBinlogEvent binlogEvent, final List<Object> out) {
140         if (binlogEvent instanceof QueryEvent) {
141             QueryEvent queryEvent = (QueryEvent) binlogEvent;
142             if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
143                 return;
144             }
145         }
146         out.add(binlogEvent);
147     }
148     
149     private Optional<AbstractBinlogEvent> decodeEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
150         switch (MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType()).orElse(MySQLBinlogEventType.UNKNOWN_EVENT)) {
151             case ROTATE_EVENT:
152                 decodeRotateEvent(binlogEventHeader, payload);
153                 return Optional.empty();
154             case FORMAT_DESCRIPTION_EVENT:
155                 decodeFormatDescriptionEvent(binlogEventHeader, payload);
156                 return Optional.empty();
157             case TABLE_MAP_EVENT:
158                 decodeTableMapEvent(binlogEventHeader, payload);
159                 return Optional.empty();
160             case WRITE_ROWS_EVENT_V1:
161             case WRITE_ROWS_EVENT_V2:
162                 return Optional.of(decodeWriteRowsEventV2(binlogEventHeader, payload));
163             case UPDATE_ROWS_EVENT_V1:
164             case UPDATE_ROWS_EVENT_V2:
165                 return Optional.of(decodeUpdateRowsEventV2(binlogEventHeader, payload));
166             case DELETE_ROWS_EVENT_V1:
167             case DELETE_ROWS_EVENT_V2:
168                 return Optional.of(decodeDeleteRowsEventV2(binlogEventHeader, payload));
169             case QUERY_EVENT:
170                 return Optional.of(decodeQueryEvent(binlogEventHeader, payload));
171             case XID_EVENT:
172                 return Optional.of(decodeXidEvent(binlogEventHeader, payload));
173             default:
174                 return Optional.of(decodePlaceholderEvent(binlogEventHeader, payload));
175         }
176     }
177     
178     private void decodeRotateEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
179         MySQLBinlogRotateEventPacket packet = new MySQLBinlogRotateEventPacket(binlogEventHeader, payload);
180         binlogContext.setFileName(packet.getNextBinlogName());
181     }
182     
183     private void decodeFormatDescriptionEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
184         MySQLBinlogFormatDescriptionEventPacket packet = new MySQLBinlogFormatDescriptionEventPacket(binlogEventHeader, payload);
185         // MySQL MGR checksum length is 0, but the event ends up with 4 extra bytes, need to skip them.
186         int readableBytes = payload.getByteBuf().readableBytes();
187         if (binlogEventHeader.getChecksumLength() <= 0 && readableBytes > 0) {
188             if (readableBytes != 4) {
189                 log.warn("the format description event has extra bytes, readable bytes length={}, binlogEventHeader={}, formatDescriptionEvent={}", readableBytes, binlogEventHeader, packet);
190             }
191             payload.getByteBuf().skipBytes(readableBytes);
192         }
193     }
194     
195     private void decodeTableMapEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
196         MySQLBinlogTableMapEventPacket packet = new MySQLBinlogTableMapEventPacket(binlogEventHeader, payload);
197         binlogContext.putTableMapEvent(packet.getTableId(), packet);
198     }
199     
200     private WriteRowsEvent decodeWriteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
201         MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
202         packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload);
203         WriteRowsEvent result = new WriteRowsEvent();
204         initRowsEvent(result, binlogEventHeader, packet.getTableId());
205         result.setAfterRows(packet.getRows());
206         return result;
207     }
208     
209     private UpdateRowsEvent decodeUpdateRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
210         MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
211         packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload);
212         UpdateRowsEvent result = new UpdateRowsEvent();
213         initRowsEvent(result, binlogEventHeader, packet.getTableId());
214         result.setBeforeRows(packet.getRows());
215         result.setAfterRows(packet.getRows2());
216         return result;
217     }
218     
219     private DeleteRowsEvent decodeDeleteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
220         MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
221         packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload);
222         DeleteRowsEvent result = new DeleteRowsEvent();
223         initRowsEvent(result, binlogEventHeader, packet.getTableId());
224         result.setBeforeRows(packet.getRows());
225         return result;
226     }
227     
228     private void initRowsEvent(final AbstractRowsEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader, final long tableId) {
229         rowsEvent.setDatabaseName(binlogContext.getDatabaseName(tableId));
230         rowsEvent.setTableName(binlogContext.getTableName(tableId));
231         rowsEvent.setFileName(binlogContext.getFileName());
232         rowsEvent.setPosition(binlogEventHeader.getLogPos());
233         rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
234         rowsEvent.setServerId(binlogEventHeader.getServerId());
235     }
236     
237     private PlaceholderEvent decodePlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
238         PlaceholderEvent result = createPlaceholderEvent(binlogEventHeader);
239         int remainDataLength = binlogEventHeader.getEventSize() + 1 - binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex();
240         if (remainDataLength > 0) {
241             payload.skipReserved(remainDataLength);
242         }
243         return result;
244     }
245     
246     private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
247         int threadId = payload.readInt4();
248         int executionTime = payload.readInt4();
249         payload.skipReserved(1);
250         int errorCode = payload.readInt2();
251         payload.skipReserved(payload.readInt2());
252         String databaseName = payload.readStringNul();
253         String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - binlogEventHeader.getChecksumLength());
254         QueryEvent result = new QueryEvent(threadId, executionTime, errorCode, databaseName, sql);
255         result.setFileName(binlogContext.getFileName());
256         result.setPosition(binlogEventHeader.getLogPos());
257         result.setTimestamp(binlogEventHeader.getTimestamp());
258         result.setServerId(binlogEventHeader.getServerId());
259         return result;
260     }
261     
262     private XidEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
263         XidEvent result = new XidEvent(payload.readInt8());
264         result.setFileName(binlogContext.getFileName());
265         result.setPosition(binlogEventHeader.getLogPos());
266         result.setTimestamp(binlogEventHeader.getTimestamp());
267         result.setServerId(binlogEventHeader.getServerId());
268         return result;
269     }
270     
271     // TODO May be used again later, keep this method first.
272     private PlaceholderEvent createPlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader) {
273         PlaceholderEvent result = new PlaceholderEvent();
274         result.setFileName(binlogContext.getFileName());
275         result.setPosition(binlogEventHeader.getLogPos());
276         result.setTimestamp(binlogEventHeader.getTimestamp());
277         return result;
278     }
279     
280     private void skipChecksum(final int eventType, final ByteBuf in) {
281         if (0 < binlogContext.getChecksumLength() && MySQLBinlogEventType.FORMAT_DESCRIPTION_EVENT.getValue() != eventType) {
282             in.skipBytes(binlogContext.getChecksumLength());
283         }
284     }
285 }