View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
19  
20  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
21  import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
22  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24  import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
25  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
26  import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
27  
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.HashMap;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Map.Entry;
34  import java.util.stream.Collectors;
35  
36  /**
37   * Data Record group engine.
38   */
39  public final class DataRecordGroupEngine {
40      
41      /**
42       * Merge data record.
43       * <pre>
44       * insert + insert -&gt; exception
45       * update + insert -&gt; exception
46       * delete + insert -&gt; insert
47       * insert + update -&gt; insert
48       * update + update -&gt; update
49       * delete + update -&gt; exception
50       * insert + delete -&gt; delete
51       * update + delete -&gt; delete
52       * delete + delete -&gt; exception
53       * </pre>
54       *
55       * @param dataRecords data records
56       * @return merged data records
57       */
58      public List<DataRecord> merge(final List<DataRecord> dataRecords) {
59          Map<DataRecord.Key, DataRecord> result = new HashMap<>();
60          dataRecords.forEach(each -> {
61              if (PipelineSQLOperationType.INSERT == each.getType()) {
62                  mergeInsert(each, result);
63              } else if (PipelineSQLOperationType.UPDATE == each.getType()) {
64                  mergeUpdate(each, result);
65              } else if (PipelineSQLOperationType.DELETE == each.getType()) {
66                  mergeDelete(each, result);
67              }
68          });
69          return new ArrayList<>(result.values());
70      }
71      
72      /**
73       * Group by table and type.
74       *
75       * @param dataRecords data records, related table must have primary key or unique key
76       * @return grouped data records
77       * @throws IllegalArgumentException if related table has no primary key or unique key
78       */
79      public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
80          DataRecord firstDataRecord = dataRecords.get(0);
81          ShardingSpherePreconditions.checkState(!firstDataRecord.getUniqueKeyValue().isEmpty(),
82                  () -> new IllegalArgumentException(firstDataRecord.getTableName() + " must have primary key or unique key"));
83          List<GroupedDataRecord> result = new ArrayList<>(100);
84          Map<String, List<DataRecord>> tableGroup = merge(dataRecords).stream().collect(Collectors.groupingBy(DataRecord::getTableName));
85          for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
86              Map<PipelineSQLOperationType, List<DataRecord>> typeGroup = entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
87              result.add(new GroupedDataRecord(entry.getKey(), typeGroup.getOrDefault(PipelineSQLOperationType.INSERT, Collections.emptyList()),
88                      typeGroup.getOrDefault(PipelineSQLOperationType.UPDATE, Collections.emptyList()), typeGroup.getOrDefault(PipelineSQLOperationType.DELETE, Collections.emptyList())));
89          }
90          return result;
91      }
92      
93      private void mergeInsert(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
94          DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
95          ShardingSpherePreconditions.checkState(null == beforeDataRecord || PipelineSQLOperationType.DELETE == beforeDataRecord.getType(),
96                  () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
97          dataRecords.put(dataRecord.getKey(), dataRecord);
98      }
99      
100     private void mergeUpdate(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
101         DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
102         if (null == beforeDataRecord) {
103             dataRecords.put(dataRecord.getKey(), dataRecord);
104             return;
105         }
106         ShardingSpherePreconditions.checkState(PipelineSQLOperationType.DELETE != beforeDataRecord.getType(), () -> new UnsupportedSQLOperationException("Not Delete"));
107         if (isUniqueKeyUpdated(dataRecord)) {
108             dataRecords.remove(dataRecord.getOldKey());
109         }
110         if (PipelineSQLOperationType.INSERT == beforeDataRecord.getType()) {
111             DataRecord mergedDataRecord = mergeUpdateColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(), beforeDataRecord, dataRecord);
112             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
113             return;
114         }
115         if (PipelineSQLOperationType.UPDATE == beforeDataRecord.getType()) {
116             DataRecord mergedDataRecord = mergeUpdateColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(), beforeDataRecord, dataRecord);
117             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
118         }
119     }
120     
121     private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
122         DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
123         ShardingSpherePreconditions.checkState(null == beforeDataRecord || PipelineSQLOperationType.DELETE != beforeDataRecord.getType(),
124                 () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
125         if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE == beforeDataRecord.getType() && isUniqueKeyUpdated(beforeDataRecord)) {
126             DataRecord mergedDataRecord = new DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(), dataRecord.getPosition(), dataRecord.getColumnCount());
127             mergeBaseFields(dataRecord, mergedDataRecord);
128             for (int i = 0; i < dataRecord.getColumnCount(); i++) {
129                 mergedDataRecord.addColumn(new NormalColumn(dataRecord.getColumn(i).getName(),
130                         dataRecord.getColumn(i).isUniqueKey() ? beforeDataRecord.getColumn(i).getOldValue() : beforeDataRecord.getColumn(i).getValue(),
131                         null, true, dataRecord.getColumn(i).isUniqueKey()));
132             }
133             dataRecords.remove(beforeDataRecord.getKey());
134             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
135         } else {
136             dataRecords.put(dataRecord.getOldKey(), dataRecord);
137         }
138     }
139     
140     private void mergeBaseFields(final DataRecord sourceRecord, final DataRecord targetRecord) {
141         targetRecord.setActualTableName(sourceRecord.getActualTableName());
142         targetRecord.setCsn(sourceRecord.getCsn());
143         targetRecord.setCommitTime(sourceRecord.getCommitTime());
144     }
145     
146     private boolean isUniqueKeyUpdated(final DataRecord dataRecord) {
147         // TODO Compatible with multiple unique indexes
148         for (Column each : dataRecord.getColumns()) {
149             if (each.isUniqueKey() && each.isUpdated()) {
150                 return true;
151             }
152         }
153         return false;
154     }
155     
156     private DataRecord mergeUpdateColumn(final PipelineSQLOperationType type, final String tableName, final DataRecord preDataRecord, final DataRecord curDataRecord) {
157         DataRecord result = new DataRecord(type, tableName, curDataRecord.getPosition(), curDataRecord.getColumnCount());
158         mergeBaseFields(curDataRecord, result);
159         for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
160             result.addColumn(new NormalColumn(
161                     curDataRecord.getColumn(i).getName(),
162                     preDataRecord.getColumn(i).getOldValue(),
163                     curDataRecord.getColumn(i).getValue(),
164                     preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
165                     curDataRecord.getColumn(i).isUniqueKey()));
166         }
167         return result;
168     }
169 }