1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
19
20 import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
21 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
22 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24 import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
25 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
26 import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
27
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.stream.Collectors;
35
36
37
38
39 public final class DataRecordGroupEngine {
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public List<DataRecord> merge(final List<DataRecord> dataRecords) {
59 Map<DataRecord.Key, DataRecord> result = new HashMap<>();
60 dataRecords.forEach(each -> {
61 if (PipelineSQLOperationType.INSERT == each.getType()) {
62 mergeInsert(each, result);
63 } else if (PipelineSQLOperationType.UPDATE == each.getType()) {
64 mergeUpdate(each, result);
65 } else if (PipelineSQLOperationType.DELETE == each.getType()) {
66 mergeDelete(each, result);
67 }
68 });
69 return new ArrayList<>(result.values());
70 }
71
72
73
74
75
76
77
78
79 public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
80 DataRecord firstDataRecord = dataRecords.get(0);
81 ShardingSpherePreconditions.checkState(!firstDataRecord.getUniqueKeyValue().isEmpty(),
82 () -> new IllegalArgumentException(firstDataRecord.getTableName() + " must have primary key or unique key"));
83 List<GroupedDataRecord> result = new ArrayList<>(100);
84 Map<String, List<DataRecord>> tableGroup = merge(dataRecords).stream().collect(Collectors.groupingBy(DataRecord::getTableName));
85 for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
86 Map<PipelineSQLOperationType, List<DataRecord>> typeGroup = entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
87 result.add(new GroupedDataRecord(entry.getKey(), typeGroup.getOrDefault(PipelineSQLOperationType.INSERT, Collections.emptyList()),
88 typeGroup.getOrDefault(PipelineSQLOperationType.UPDATE, Collections.emptyList()), typeGroup.getOrDefault(PipelineSQLOperationType.DELETE, Collections.emptyList())));
89 }
90 return result;
91 }
92
93 private void mergeInsert(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
94 DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
95 ShardingSpherePreconditions.checkState(null == beforeDataRecord || PipelineSQLOperationType.DELETE == beforeDataRecord.getType(),
96 () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
97 dataRecords.put(dataRecord.getKey(), dataRecord);
98 }
99
100 private void mergeUpdate(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
101 DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
102 if (null == beforeDataRecord) {
103 dataRecords.put(dataRecord.getKey(), dataRecord);
104 return;
105 }
106 ShardingSpherePreconditions.checkState(PipelineSQLOperationType.DELETE != beforeDataRecord.getType(), () -> new UnsupportedSQLOperationException("Not Delete"));
107 if (isUniqueKeyUpdated(dataRecord)) {
108 dataRecords.remove(dataRecord.getOldKey());
109 }
110 if (PipelineSQLOperationType.INSERT == beforeDataRecord.getType()) {
111 DataRecord mergedDataRecord = mergeUpdateColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(), beforeDataRecord, dataRecord);
112 dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
113 return;
114 }
115 if (PipelineSQLOperationType.UPDATE == beforeDataRecord.getType()) {
116 DataRecord mergedDataRecord = mergeUpdateColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(), beforeDataRecord, dataRecord);
117 dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
118 }
119 }
120
121 private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
122 DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
123 ShardingSpherePreconditions.checkState(null == beforeDataRecord || PipelineSQLOperationType.DELETE != beforeDataRecord.getType(),
124 () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
125 if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE == beforeDataRecord.getType() && isUniqueKeyUpdated(beforeDataRecord)) {
126 DataRecord mergedDataRecord = new DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(), dataRecord.getPosition(), dataRecord.getColumnCount());
127 mergeBaseFields(dataRecord, mergedDataRecord);
128 for (int i = 0; i < dataRecord.getColumnCount(); i++) {
129 mergedDataRecord.addColumn(new NormalColumn(dataRecord.getColumn(i).getName(),
130 dataRecord.getColumn(i).isUniqueKey() ? beforeDataRecord.getColumn(i).getOldValue() : beforeDataRecord.getColumn(i).getValue(),
131 null, true, dataRecord.getColumn(i).isUniqueKey()));
132 }
133 dataRecords.remove(beforeDataRecord.getKey());
134 dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
135 } else {
136 dataRecords.put(dataRecord.getOldKey(), dataRecord);
137 }
138 }
139
140 private void mergeBaseFields(final DataRecord sourceRecord, final DataRecord targetRecord) {
141 targetRecord.setActualTableName(sourceRecord.getActualTableName());
142 targetRecord.setCsn(sourceRecord.getCsn());
143 targetRecord.setCommitTime(sourceRecord.getCommitTime());
144 }
145
146 private boolean isUniqueKeyUpdated(final DataRecord dataRecord) {
147
148 for (Column each : dataRecord.getColumns()) {
149 if (each.isUniqueKey() && each.isUpdated()) {
150 return true;
151 }
152 }
153 return false;
154 }
155
156 private DataRecord mergeUpdateColumn(final PipelineSQLOperationType type, final String tableName, final DataRecord preDataRecord, final DataRecord curDataRecord) {
157 DataRecord result = new DataRecord(type, tableName, curDataRecord.getPosition(), curDataRecord.getColumnCount());
158 mergeBaseFields(curDataRecord, result);
159 for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
160 result.addColumn(new NormalColumn(
161 curDataRecord.getColumn(i).getName(),
162 preDataRecord.getColumn(i).getOldValue(),
163 curDataRecord.getColumn(i).getValue(),
164 preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
165 curDataRecord.getColumn(i).isUniqueKey()));
166 }
167 return result;
168 }
169 }