1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty;
19
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufUtil;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.handler.codec.ByteToMessageDecoder;
24 import lombok.extern.slf4j.Slf4j;
25 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
26 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext;
27 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
28 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
29 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
30 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
31 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.QueryEvent;
32 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
33 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
34 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent;
35 import org.apache.shardingsphere.db.protocol.constant.CommonConstants;
36 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
37 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
38 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.management.MySQLBinlogFormatDescriptionEventPacket;
39 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.management.MySQLBinlogRotateEventPacket;
40 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogRowsEventPacket;
41 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
42 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
43
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.Map;
47 import java.util.Optional;
48
49
50
51
52 @Slf4j
53 public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
54
55 private static final String TX_BEGIN_SQL = "BEGIN";
56
57 private final BinlogContext binlogContext;
58
59 private final boolean decodeWithTX;
60
61 private List<AbstractBinlogEvent> records = new LinkedList<>();
62
63 public MySQLBinlogEventPacketDecoder(final int checksumLength, final Map<Long, MySQLBinlogTableMapEventPacket> tableMap, final boolean decodeWithTX) {
64 this.decodeWithTX = decodeWithTX;
65 binlogContext = new BinlogContext(checksumLength, tableMap);
66 }
67
68 @Override
69 protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
70 while (in.readableBytes() >= 1 + MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
71 in.markReaderIndex();
72 MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
73 checkPayload(payload);
74 MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
75 if (!checkEventIntegrity(in, binlogEventHeader)) {
76 return;
77 }
78 Optional<AbstractBinlogEvent> binlogEvent = decodeEvent(binlogEventHeader, payload);
79 if (!binlogEvent.isPresent()) {
80 skipChecksum(binlogEventHeader.getEventType(), in);
81 return;
82 }
83 if (binlogEvent.get() instanceof PlaceholderEvent) {
84 out.add(binlogEvent.get());
85 skipChecksum(binlogEventHeader.getEventType(), in);
86 return;
87 }
88 if (decodeWithTX) {
89 processEventWithTX(binlogEvent.get(), out);
90 } else {
91 processEventIgnoreTX(binlogEvent.get(), out);
92 }
93 skipChecksum(binlogEventHeader.getEventType(), in);
94 }
95 }
96
97 private void checkPayload(final MySQLPacketPayload payload) {
98 int statusCode = payload.readInt1();
99 if (255 == statusCode) {
100 int errorNo = payload.readInt2();
101 payload.skipReserved(1);
102 String sqlState = payload.readStringFix(5);
103 throw new PipelineInternalException("Decode binlog event failed, errorCode: %d, sqlState: %s, errorMessage: %s", errorNo, sqlState, payload.readStringEOF());
104 }
105 if (0 != statusCode) {
106 log.debug("Illegal binlog status code {}, remaining packet \n{}", statusCode, readRemainPacket(payload));
107 }
108 }
109
110 private String readRemainPacket(final MySQLPacketPayload payload) {
111 return ByteBufUtil.hexDump(payload.readStringFixByBytes(payload.getByteBuf().readableBytes()));
112 }
113
114 private boolean checkEventIntegrity(final ByteBuf in, final MySQLBinlogEventHeader binlogEventHeader) {
115 if (in.readableBytes() < binlogEventHeader.getEventSize() - MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
116 log.debug("the event body is not complete, event size={}, readable bytes={}", binlogEventHeader.getEventSize(), in.readableBytes());
117 in.resetReaderIndex();
118 return false;
119 }
120 return true;
121 }
122
123 private void processEventWithTX(final AbstractBinlogEvent binlogEvent, final List<Object> out) {
124 if (binlogEvent instanceof QueryEvent) {
125 QueryEvent queryEvent = (QueryEvent) binlogEvent;
126 if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
127 records = new LinkedList<>();
128 } else {
129 out.add(binlogEvent);
130 }
131 } else if (binlogEvent instanceof XidEvent) {
132 records.add(binlogEvent);
133 out.add(records);
134 } else {
135 records.add(binlogEvent);
136 }
137 }
138
139 private void processEventIgnoreTX(final AbstractBinlogEvent binlogEvent, final List<Object> out) {
140 if (binlogEvent instanceof QueryEvent) {
141 QueryEvent queryEvent = (QueryEvent) binlogEvent;
142 if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
143 return;
144 }
145 }
146 out.add(binlogEvent);
147 }
148
149 private Optional<AbstractBinlogEvent> decodeEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
150 switch (MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType()).orElse(MySQLBinlogEventType.UNKNOWN_EVENT)) {
151 case ROTATE_EVENT:
152 decodeRotateEvent(binlogEventHeader, payload);
153 return Optional.empty();
154 case FORMAT_DESCRIPTION_EVENT:
155 decodeFormatDescriptionEvent(binlogEventHeader, payload);
156 return Optional.empty();
157 case TABLE_MAP_EVENT:
158 decodeTableMapEvent(binlogEventHeader, payload);
159 return Optional.empty();
160 case WRITE_ROWS_EVENT_V1:
161 case WRITE_ROWS_EVENT_V2:
162 return Optional.of(decodeWriteRowsEventV2(binlogEventHeader, payload));
163 case UPDATE_ROWS_EVENT_V1:
164 case UPDATE_ROWS_EVENT_V2:
165 return Optional.of(decodeUpdateRowsEventV2(binlogEventHeader, payload));
166 case DELETE_ROWS_EVENT_V1:
167 case DELETE_ROWS_EVENT_V2:
168 return Optional.of(decodeDeleteRowsEventV2(binlogEventHeader, payload));
169 case QUERY_EVENT:
170 return Optional.of(decodeQueryEvent(binlogEventHeader, payload));
171 case XID_EVENT:
172 return Optional.of(decodeXidEvent(binlogEventHeader, payload));
173 default:
174 return Optional.of(decodePlaceholderEvent(binlogEventHeader, payload));
175 }
176 }
177
178 private void decodeRotateEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
179 MySQLBinlogRotateEventPacket packet = new MySQLBinlogRotateEventPacket(binlogEventHeader, payload);
180 binlogContext.setFileName(packet.getNextBinlogName());
181 }
182
183 private void decodeFormatDescriptionEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
184 MySQLBinlogFormatDescriptionEventPacket packet = new MySQLBinlogFormatDescriptionEventPacket(binlogEventHeader, payload);
185
186 int readableBytes = payload.getByteBuf().readableBytes();
187 if (binlogEventHeader.getChecksumLength() <= 0 && readableBytes > 0) {
188 if (readableBytes != 4) {
189 log.warn("the format description event has extra bytes, readable bytes length={}, binlogEventHeader={}, formatDescriptionEvent={}", readableBytes, binlogEventHeader, packet);
190 }
191 payload.getByteBuf().skipBytes(readableBytes);
192 }
193 }
194
195 private void decodeTableMapEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
196 MySQLBinlogTableMapEventPacket packet = new MySQLBinlogTableMapEventPacket(binlogEventHeader, payload);
197 binlogContext.putTableMapEvent(packet.getTableId(), packet);
198 }
199
200 private WriteRowsEvent decodeWriteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
201 MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
202 packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload);
203 WriteRowsEvent result = new WriteRowsEvent();
204 initRowsEvent(result, binlogEventHeader, packet.getTableId());
205 result.setAfterRows(packet.getRows());
206 return result;
207 }
208
209 private UpdateRowsEvent decodeUpdateRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
210 MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
211 packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload);
212 UpdateRowsEvent result = new UpdateRowsEvent();
213 initRowsEvent(result, binlogEventHeader, packet.getTableId());
214 result.setBeforeRows(packet.getRows());
215 result.setAfterRows(packet.getRows2());
216 return result;
217 }
218
219 private DeleteRowsEvent decodeDeleteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
220 MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
221 packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload);
222 DeleteRowsEvent result = new DeleteRowsEvent();
223 initRowsEvent(result, binlogEventHeader, packet.getTableId());
224 result.setBeforeRows(packet.getRows());
225 return result;
226 }
227
228 private void initRowsEvent(final AbstractRowsEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader, final long tableId) {
229 rowsEvent.setDatabaseName(binlogContext.getDatabaseName(tableId));
230 rowsEvent.setTableName(binlogContext.getTableName(tableId));
231 rowsEvent.setFileName(binlogContext.getFileName());
232 rowsEvent.setPosition(binlogEventHeader.getLogPos());
233 rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
234 rowsEvent.setServerId(binlogEventHeader.getServerId());
235 }
236
237 private PlaceholderEvent decodePlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
238 PlaceholderEvent result = createPlaceholderEvent(binlogEventHeader);
239 int remainDataLength = binlogEventHeader.getEventSize() + 1 - binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex();
240 if (remainDataLength > 0) {
241 payload.skipReserved(remainDataLength);
242 }
243 return result;
244 }
245
246 private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
247 int threadId = payload.readInt4();
248 int executionTime = payload.readInt4();
249 payload.skipReserved(1);
250 int errorCode = payload.readInt2();
251 payload.skipReserved(payload.readInt2());
252 String databaseName = payload.readStringNul();
253 String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - binlogEventHeader.getChecksumLength());
254 QueryEvent result = new QueryEvent(threadId, executionTime, errorCode, databaseName, sql);
255 result.setFileName(binlogContext.getFileName());
256 result.setPosition(binlogEventHeader.getLogPos());
257 result.setTimestamp(binlogEventHeader.getTimestamp());
258 result.setServerId(binlogEventHeader.getServerId());
259 return result;
260 }
261
262 private XidEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
263 XidEvent result = new XidEvent(payload.readInt8());
264 result.setFileName(binlogContext.getFileName());
265 result.setPosition(binlogEventHeader.getLogPos());
266 result.setTimestamp(binlogEventHeader.getTimestamp());
267 result.setServerId(binlogEventHeader.getServerId());
268 return result;
269 }
270
271
272 private PlaceholderEvent createPlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader) {
273 PlaceholderEvent result = new PlaceholderEvent();
274 result.setFileName(binlogContext.getFileName());
275 result.setPosition(binlogEventHeader.getLogPos());
276 result.setTimestamp(binlogEventHeader.getTimestamp());
277 return result;
278 }
279
280 private void skipChecksum(final int eventType, final ByteBuf in) {
281 if (0 < binlogContext.getChecksumLength() && MySQLBinlogEventType.FORMAT_DESCRIPTION_EVENT.getValue() != eventType) {
282 in.skipBytes(binlogContext.getChecksumLength());
283 }
284 }
285 }