View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.ingest.record.group;
19  
20  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
21  import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
22  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
25  import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
26  
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.stream.Collectors;
34  
35  /**
36   * Data Record group engine.
37   */
38  public final class DataRecordGroupEngine {
39      
40      /**
41       * Merge data record.
42       * <pre>
43       * insert + insert -&gt; exception
44       * update + insert -&gt; exception
45       * delete + insert -&gt; insert
46       * insert + update -&gt; insert
47       * update + update -&gt; update
48       * delete + update -&gt; exception
49       * insert + delete -&gt; delete
50       * update + delete -&gt; delete
51       * delete + delete -&gt; exception
52       * </pre>
53       *
54       * @param dataRecords data records
55       * @return merged data records
56       */
57      public List<DataRecord> merge(final List<DataRecord> dataRecords) {
58          Map<DataRecord.Key, DataRecord> result = new HashMap<>();
59          dataRecords.forEach(each -> {
60              if (PipelineSQLOperationType.INSERT == each.getType()) {
61                  mergeInsert(each, result);
62              } else if (PipelineSQLOperationType.UPDATE == each.getType()) {
63                  mergeUpdate(each, result);
64              } else if (PipelineSQLOperationType.DELETE == each.getType()) {
65                  mergeDelete(each, result);
66              }
67          });
68          return new ArrayList<>(result.values());
69      }
70      
71      /**
72       * Group by table and type.
73       *
74       * @param dataRecords data records
75       * @return grouped data records
76       */
77      public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
78          List<GroupedDataRecord> result = new ArrayList<>(100);
79          List<DataRecord> mergedDataRecords = dataRecords.get(0).getUniqueKeyValue().isEmpty() ? dataRecords : merge(dataRecords);
80          Map<String, List<DataRecord>> tableGroup = mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
81          for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
82              Map<PipelineSQLOperationType, List<DataRecord>> typeGroup = entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
83              result.add(new GroupedDataRecord(entry.getKey(), typeGroup.getOrDefault(PipelineSQLOperationType.INSERT, Collections.emptyList()),
84                      typeGroup.getOrDefault(PipelineSQLOperationType.UPDATE, Collections.emptyList()), typeGroup.getOrDefault(PipelineSQLOperationType.DELETE, Collections.emptyList())));
85          }
86          return result;
87      }
88      
89      private void mergeInsert(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
90          DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
91          ShardingSpherePreconditions.checkState(null == beforeDataRecord || PipelineSQLOperationType.DELETE == beforeDataRecord.getType(),
92                  () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
93          dataRecords.put(dataRecord.getKey(), dataRecord);
94      }
95      
96      private void mergeUpdate(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
97          DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
98          if (null == beforeDataRecord) {
99              dataRecords.put(dataRecord.getKey(), dataRecord);
100             return;
101         }
102         ShardingSpherePreconditions.checkState(PipelineSQLOperationType.DELETE != beforeDataRecord.getType(), () -> new UnsupportedSQLOperationException("Not Delete"));
103         if (isUniqueKeyUpdated(dataRecord)) {
104             dataRecords.remove(dataRecord.getOldKey());
105         }
106         if (PipelineSQLOperationType.INSERT == beforeDataRecord.getType()) {
107             DataRecord mergedDataRecord = mergeUpdateColumn(PipelineSQLOperationType.INSERT, dataRecord.getTableName(), beforeDataRecord, dataRecord);
108             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
109             return;
110         }
111         if (PipelineSQLOperationType.UPDATE == beforeDataRecord.getType()) {
112             DataRecord mergedDataRecord = mergeUpdateColumn(PipelineSQLOperationType.UPDATE, dataRecord.getTableName(), beforeDataRecord, dataRecord);
113             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
114         }
115     }
116     
117     private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
118         DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
119         ShardingSpherePreconditions.checkState(null == beforeDataRecord || PipelineSQLOperationType.DELETE != beforeDataRecord.getType(),
120                 () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
121         if (null != beforeDataRecord && PipelineSQLOperationType.UPDATE == beforeDataRecord.getType() && isUniqueKeyUpdated(beforeDataRecord)) {
122             DataRecord mergedDataRecord = new DataRecord(PipelineSQLOperationType.DELETE, dataRecord.getTableName(), dataRecord.getPosition(), dataRecord.getColumnCount());
123             mergeBaseFields(dataRecord, mergedDataRecord);
124             for (int i = 0; i < dataRecord.getColumnCount(); i++) {
125                 mergedDataRecord.addColumn(new Column(dataRecord.getColumn(i).getName(),
126                         dataRecord.getColumn(i).isUniqueKey() ? beforeDataRecord.getColumn(i).getOldValue() : beforeDataRecord.getColumn(i).getValue(),
127                         null, true, dataRecord.getColumn(i).isUniqueKey()));
128             }
129             dataRecords.remove(beforeDataRecord.getKey());
130             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
131         } else {
132             dataRecords.put(dataRecord.getOldKey(), dataRecord);
133         }
134     }
135     
136     private void mergeBaseFields(final DataRecord sourceRecord, final DataRecord targetRecord) {
137         targetRecord.setActualTableName(sourceRecord.getActualTableName());
138         targetRecord.setCsn(sourceRecord.getCsn());
139         targetRecord.setCommitTime(sourceRecord.getCommitTime());
140     }
141     
142     private boolean isUniqueKeyUpdated(final DataRecord dataRecord) {
143         // TODO Compatible with multiple unique indexes
144         for (Column each : dataRecord.getColumns()) {
145             if (each.isUniqueKey() && each.isUpdated()) {
146                 return true;
147             }
148         }
149         return false;
150     }
151     
152     private DataRecord mergeUpdateColumn(final PipelineSQLOperationType type, final String tableName, final DataRecord preDataRecord, final DataRecord curDataRecord) {
153         DataRecord result = new DataRecord(type, tableName, curDataRecord.getPosition(), curDataRecord.getColumnCount());
154         mergeBaseFields(curDataRecord, result);
155         for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
156             result.addColumn(new Column(
157                     curDataRecord.getColumn(i).getName(),
158                     preDataRecord.getColumn(i).getOldValue(),
159                     curDataRecord.getColumn(i).getValue(),
160                     preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
161                     curDataRecord.getColumn(i).isUniqueKey()));
162         }
163         return result;
164     }
165 }