1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
19
20 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
21 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
22 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
25 import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
26
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.stream.Collectors;
34
35
36
37
38 public final class DataRecordGroupEngine {
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 public List<DataRecord> merge(final List<DataRecord> dataRecords) {
58 Map<DataRecord.Key, DataRecord> result = new HashMap<>();
59 dataRecords.forEach(each -> {
60 if (PipelineSQLOperationType.INSERT == each.getType()) {
61 mergeInsert(each, result);
62 } else if (PipelineSQLOperationType.UPDATE == each.getType()) {
63 mergeUpdate(each, result);
64 } else if (PipelineSQLOperationType.DELETE == each.getType()) {
65 mergeDelete(each, result);
66 }
67 });
68 return new ArrayList<>(result.values());
69 }
70
71
72
73
74
75
76
77 public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
78 List<GroupedDataRecord> result = new ArrayList<>(100);
79 List<DataRecord> mergedDataRecords = dataRecords.get(0).getUniqueKeyValue().isEmpty() ? dataRecords : merge(dataRecords);
80 Map<String, List<DataRecord>> tableGroup = mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
81 for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
82 Map<PipelineSQLOperationType, List<DataRecord>> typeGroup = entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
83 result.add(new GroupedDataRecord(entry.getKey(), typeGroup.getOrDefault(PipelineSQLOperationType.INSERT, Collections.emptyList()),
84 typeGroup.getOrDefault(PipelineSQLOperationType.UPDATE, Collections.emptyList()), typeGroup.getOrDefault(PipelineSQLOperationType.DELETE, Collections.emptyList())));
85 }
86 return result;
87 }
88
89 private void mergeInsert(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
90 DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
91 ShardingSpherePreconditions.checkState(null == beforeDataRecord || PipelineSQLOperationType.DELETE == beforeDataRecord.getType(),
92 () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
93 dataRecords.put(dataRecord.getKey(), dataRecord);
94 }
95
96 private void mergeUpdate(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
97 DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
98 if (null == beforeDataRecord) {
99 dataRecords.put(dataRecord.getKey(), dataRecord);
100 return;
101 }
102 ShardingSpherePreconditions.checkState(PipelineSQLOperationType.DELETE != beforeDataRecord.getType(), () -> new UnsupportedSQLOperationException("Not Delete"));
103 if (isUniqueKeyUpdated(dataRecord)) {
104 dataRecords.remove(dataRecord.getOldKey());
105 }
106 if (PipelineSQLOperationType.INSERT == beforeDataRecord.getType()) {
107 DataRecord mergedDataRecord = mergeUpdateColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(), beforeDataRecord, dataRecord);
108 dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
109 return;
110 }
111 if (PipelineSQLOperationType.UPDATE == beforeDataRecord.getType()) {
112 DataRecord mergedDataRecord = mergeUpdateColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(), beforeDataRecord, dataRecord);
113 dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
114 }
115 }
116
117 private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
118 DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
119 ShardingSpherePreconditions.checkState(null == beforeDataRecord || PipelineSQLOperationType.DELETE != beforeDataRecord.getType(),
120 () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
121 if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE == beforeDataRecord.getType() && isUniqueKeyUpdated(beforeDataRecord)) {
122 DataRecord mergedDataRecord = new DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(), dataRecord.getPosition(), dataRecord.getColumnCount());
123 mergeBaseFields(dataRecord, mergedDataRecord);
124 for (int i = 0; i < dataRecord.getColumnCount(); i++) {
125 mergedDataRecord.addColumn(new Column(dataRecord.getColumn(i).getName(),
126 dataRecord.getColumn(i).isUniqueKey() ? beforeDataRecord.getColumn(i).getOldValue() : beforeDataRecord.getColumn(i).getValue(),
127 null, true, dataRecord.getColumn(i).isUniqueKey()));
128 }
129 dataRecords.remove(beforeDataRecord.getKey());
130 dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
131 } else {
132 dataRecords.put(dataRecord.getOldKey(), dataRecord);
133 }
134 }
135
136 private void mergeBaseFields(final DataRecord sourceRecord, final DataRecord targetRecord) {
137 targetRecord.setActualTableName(sourceRecord.getActualTableName());
138 targetRecord.setCsn(sourceRecord.getCsn());
139 targetRecord.setCommitTime(sourceRecord.getCommitTime());
140 }
141
142 private boolean isUniqueKeyUpdated(final DataRecord dataRecord) {
143
144 for (Column each : dataRecord.getColumns()) {
145 if (each.isUniqueKey() && each.isUpdated()) {
146 return true;
147 }
148 }
149 return false;
150 }
151
152 private DataRecord mergeUpdateColumn(final PipelineSQLOperationType type, final String tableName, final DataRecord preDataRecord, final DataRecord curDataRecord) {
153 DataRecord result = new DataRecord(type, tableName, curDataRecord.getPosition(), curDataRecord.getColumnCount());
154 mergeBaseFields(curDataRecord, result);
155 for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
156 result.addColumn(new Column(
157 curDataRecord.getColumn(i).getName(),
158 preDataRecord.getColumn(i).getOldValue(),
159 curDataRecord.getColumn(i).getValue(),
160 preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
161 curDataRecord.getColumn(i).isUniqueKey()));
162 }
163 return result;
164 }
165 }