1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
19
20 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
21 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
22 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24 import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
25 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
26 import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
27 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
28 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
29 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
30 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
31 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
32 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
33 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
34 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
35 import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
36
37 import java.util.List;
38
39
40
41
42 public final class WALEventConverter {
43
44 private final IncrementalDumperContext dumperContext;
45
46 private final PipelineTableMetaDataLoader metaDataLoader;
47
48 public WALEventConverter(final IncrementalDumperContext dumperContext, final PipelineTableMetaDataLoader metaDataLoader) {
49 this.dumperContext = dumperContext;
50 this.metaDataLoader = metaDataLoader;
51 }
52
53
54
55
56
57
58
59
60 public Record convert(final AbstractWALEvent event) {
61 if (filter(event)) {
62 return createPlaceholderRecord(event);
63 }
64 if (!(event instanceof AbstractRowEvent)) {
65 return createPlaceholderRecord(event);
66 }
67 PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((AbstractRowEvent) event).getTableName());
68 if (event instanceof WriteRowEvent) {
69 return handleWriteRowEvent((WriteRowEvent) event, tableMetaData);
70 }
71 if (event instanceof UpdateRowEvent) {
72 return handleUpdateRowEvent((UpdateRowEvent) event, tableMetaData);
73 }
74 if (event instanceof DeleteRowEvent) {
75 return handleDeleteRowEvent((DeleteRowEvent) event, tableMetaData);
76 }
77 throw new UnsupportedSQLOperationException("");
78 }
79
80 private boolean filter(final AbstractWALEvent event) {
81 if (event instanceof AbstractRowEvent) {
82 AbstractRowEvent rowEvent = (AbstractRowEvent) event;
83 return !dumperContext.getCommonContext().getTableNameMapper().containsTable(rowEvent.getTableName());
84 }
85 return false;
86 }
87
88 private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) {
89 return new PlaceholderRecord(new WALPosition(event.getLogSequenceNumber()));
90 }
91
92 private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
93 CaseInsensitiveIdentifier logicTableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
94 return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName);
95 }
96
97 private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) {
98 DataRecord result = createDataRecord(PipelineSQLOperationType.INSERT, writeRowEvent, writeRowEvent.getAfterRow().size());
99 putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getAfterRow());
100 return result;
101 }
102
103 private DataRecord handleUpdateRowEvent(final UpdateRowEvent updateRowEvent, final PipelineTableMetaData tableMetaData) {
104 DataRecord result = createDataRecord(PipelineSQLOperationType.UPDATE, updateRowEvent, updateRowEvent.getAfterRow().size());
105 putColumnsIntoDataRecord(result, tableMetaData, updateRowEvent.getAfterRow());
106 return result;
107 }
108
109 private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final PipelineTableMetaData tableMetaData) {
110
111 DataRecord result = createDataRecord(PipelineSQLOperationType.DELETE, event, event.getPrimaryKeys().size());
112
113 List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
114 for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
115 result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), null, true, true));
116 }
117 return result;
118 }
119
120 private DataRecord createDataRecord(final PipelineSQLOperationType type, final AbstractRowEvent rowsEvent, final int columnCount) {
121 String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
122 DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
123 result.setActualTableName(rowsEvent.getTableName());
124 result.setCsn(rowsEvent.getCsn());
125 return result;
126 }
127
128 private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final List<Object> values) {
129 for (int i = 0, count = values.size(); i < count; i++) {
130 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
131 boolean isUniqueKey = columnMetaData.isUniqueKey();
132 Object uniqueKeyOldValue = isUniqueKey && PipelineSQLOperationType.UPDATE == dataRecord.getType() ? values.get(i) : null;
133 Column column = new Column(columnMetaData.getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
134 dataRecord.addColumn(column);
135 }
136 }
137 }