1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row;
19
20 import lombok.Getter;
21 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
22 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.AbstractMySQLBinlogEventPacket;
23 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
24 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
25 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.MySQLBinlogProtocolValueFactory;
26 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLNullBitmap;
27 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
28
29 import java.io.Serializable;
30 import java.util.LinkedList;
31 import java.util.List;
32
33
34
35
36
37
38
39 @Getter
40 public final class MySQLBinlogRowsEventPacket extends AbstractMySQLBinlogEventPacket {
41
42 private final long tableId;
43
44 private final int flags;
45
46 private final int columnNumber;
47
48 private final MySQLNullBitmap columnsPresentBitmap;
49
50 private final MySQLNullBitmap columnsPresentBitmap2;
51
52 private final List<Serializable[]> rows = new LinkedList<>();
53
54 private final List<Serializable[]> rows2 = new LinkedList<>();
55
56 public MySQLBinlogRowsEventPacket(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
57 super(binlogEventHeader);
58 tableId = payload.readInt6();
59 flags = payload.readInt2();
60 skipExtraData(payload);
61 columnNumber = (int) payload.readIntLenenc();
62 columnsPresentBitmap = new MySQLNullBitmap(columnNumber, payload);
63 columnsPresentBitmap2 = readUpdateColumnsPresentBitmap(payload);
64 }
65
66 private void skipExtraData(final MySQLPacketPayload payload) {
67 if (isRowsEventVersion2(getBinlogEventHeader().getEventType())) {
68 int extraDataLength = payload.readInt2() - 2;
69 payload.skipReserved(extraDataLength);
70 }
71 }
72
73 private boolean isRowsEventVersion2(final int eventType) {
74 return MySQLBinlogEventType.WRITE_ROWS_EVENT_V2.getValue() == eventType || MySQLBinlogEventType.UPDATE_ROWS_EVENT_V2.getValue() == eventType
75 || MySQLBinlogEventType.DELETE_ROWS_EVENT_V2.getValue() == eventType;
76 }
77
78 private MySQLNullBitmap readUpdateColumnsPresentBitmap(final MySQLPacketPayload payload) {
79 return isUpdateRowsEvent(getBinlogEventHeader().getEventType()) ? new MySQLNullBitmap(columnNumber, payload) : null;
80 }
81
82 private boolean isUpdateRowsEvent(final int eventType) {
83 return MySQLBinlogEventType.UPDATE_ROWS_EVENT_V2.getValue() == eventType || MySQLBinlogEventType.UPDATE_ROWS_EVENT_V1.getValue() == eventType;
84 }
85
86
87
88
89
90
91
92 public void readRows(final MySQLBinlogTableMapEventPacket tableMapEventPacket, final MySQLPacketPayload payload) {
93 List<MySQLBinlogColumnDef> columnDefs = tableMapEventPacket.getColumnDefs();
94 while (getRemainBytesLength(payload) > 0) {
95 rows.add(readRow(columnDefs, payload));
96 if (isUpdateRowsEvent(getBinlogEventHeader().getEventType())) {
97 rows2.add(readRow(columnDefs, payload));
98 }
99 }
100 }
101
102 private Serializable[] readRow(final List<MySQLBinlogColumnDef> columnDefs, final MySQLPacketPayload payload) {
103 MySQLNullBitmap nullBitmap = new MySQLNullBitmap(columnNumber, payload);
104 Serializable[] result = new Serializable[columnNumber];
105 for (int i = 0; i < columnNumber; i++) {
106 MySQLBinlogColumnDef columnDef = columnDefs.get(i);
107 result[i] = nullBitmap.isNullParameter(i) ? null : MySQLBinlogProtocolValueFactory.getBinlogProtocolValue(columnDef.getColumnType()).read(columnDef, payload);
108 }
109 return result;
110 }
111
112 @Override
113 protected void writeEvent(final MySQLPacketPayload payload) {
114
115 }
116 }