View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row;
19  
20  import lombok.Getter;
21  import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
22  import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.AbstractMySQLBinlogEventPacket;
23  import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
24  import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
25  import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.MySQLBinlogProtocolValueFactory;
26  import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLNullBitmap;
27  import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
28  
29  import java.io.Serializable;
30  import java.util.LinkedList;
31  import java.util.List;
32  
33  /**
34   * MySQL binlog rows event packet.
35   *
36   * @see <a href="https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Rows__event.html">ROWS_EVENT</a>
37   * @see <a href="https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/">ROWS_EVENT</a>
38   */
39  @Getter
40  public final class MySQLBinlogRowsEventPacket extends AbstractMySQLBinlogEventPacket {
41      
42      private final long tableId;
43      
44      private final int flags;
45      
46      private final int columnNumber;
47      
48      private final MySQLNullBitmap columnsPresentBitmap;
49      
50      private final MySQLNullBitmap columnsPresentBitmap2;
51      
52      private final List<Serializable[]> rows = new LinkedList<>();
53      
54      private final List<Serializable[]> rows2 = new LinkedList<>();
55      
56      public MySQLBinlogRowsEventPacket(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
57          super(binlogEventHeader);
58          tableId = payload.readInt6();
59          flags = payload.readInt2();
60          skipExtraData(payload);
61          columnNumber = (int) payload.readIntLenenc();
62          columnsPresentBitmap = new MySQLNullBitmap(columnNumber, payload);
63          columnsPresentBitmap2 = readUpdateColumnsPresentBitmap(payload);
64      }
65      
66      private void skipExtraData(final MySQLPacketPayload payload) {
67          if (isRowsEventVersion2(getBinlogEventHeader().getEventType())) {
68              int extraDataLength = payload.readInt2() - 2;
69              payload.skipReserved(extraDataLength);
70          }
71      }
72      
73      private boolean isRowsEventVersion2(final int eventType) {
74          return MySQLBinlogEventType.WRITE_ROWS_EVENT_V2.getValue() == eventType || MySQLBinlogEventType.UPDATE_ROWS_EVENT_V2.getValue() == eventType
75                  || MySQLBinlogEventType.DELETE_ROWS_EVENT_V2.getValue() == eventType;
76      }
77      
78      private MySQLNullBitmap readUpdateColumnsPresentBitmap(final MySQLPacketPayload payload) {
79          return isUpdateRowsEvent(getBinlogEventHeader().getEventType()) ? new MySQLNullBitmap(columnNumber, payload) : null;
80      }
81      
82      private boolean isUpdateRowsEvent(final int eventType) {
83          return MySQLBinlogEventType.UPDATE_ROWS_EVENT_V2.getValue() == eventType || MySQLBinlogEventType.UPDATE_ROWS_EVENT_V1.getValue() == eventType;
84      }
85      
86      /**
87       * Read rows in binlog.
88       *
89       * @param tableMapEventPacket TABLE_MAP_EVENT packet before this ROWS_EVENT
90       * @param payload ROWS_EVENT packet payload
91       */
92      public void readRows(final MySQLBinlogTableMapEventPacket tableMapEventPacket, final MySQLPacketPayload payload) {
93          List<MySQLBinlogColumnDef> columnDefs = tableMapEventPacket.getColumnDefs();
94          while (getRemainBytesLength(payload) > 0) {
95              rows.add(readRow(columnDefs, payload));
96              if (isUpdateRowsEvent(getBinlogEventHeader().getEventType())) {
97                  rows2.add(readRow(columnDefs, payload));
98              }
99          }
100     }
101     
102     private Serializable[] readRow(final List<MySQLBinlogColumnDef> columnDefs, final MySQLPacketPayload payload) {
103         MySQLNullBitmap nullBitmap = new MySQLNullBitmap(columnNumber, payload);
104         Serializable[] result = new Serializable[columnNumber];
105         for (int i = 0; i < columnNumber; i++) {
106             MySQLBinlogColumnDef columnDef = columnDefs.get(i);
107             result[i] = nullBitmap.isNullParameter(i) ? null : MySQLBinlogProtocolValueFactory.getBinlogProtocolValue(columnDef.getColumnType()).read(columnDef, payload);
108         }
109         return result;
110     }
111     
112     @Override
113     protected void writeEvent(final MySQLPacketPayload payload) {
114         // TODO
115     }
116 }