View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
19  
20  import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
21  import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
22  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24  import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
25  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
26  import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
27  import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
28  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
29  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
30  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
31  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
32  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
33  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
34  import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
35  import org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
36  
37  import java.util.List;
38  
39  /**
40   * WAL event converter.
41   */
42  public final class WALEventConverter {
43      
44      private final IncrementalDumperContext dumperContext;
45      
46      private final PipelineTableMetaDataLoader metaDataLoader;
47      
48      public WALEventConverter(final IncrementalDumperContext dumperContext, final PipelineTableMetaDataLoader metaDataLoader) {
49          this.dumperContext = dumperContext;
50          this.metaDataLoader = metaDataLoader;
51      }
52      
53      /**
54       * Convert WAL event to {@code Record}.
55       *
56       * @param event WAL event
57       * @return record
58       * @throws UnsupportedSQLOperationException unsupported SQL operation exception
59       */
60      public Record convert(final AbstractWALEvent event) {
61          if (filter(event)) {
62              return createPlaceholderRecord(event);
63          }
64          if (!(event instanceof AbstractRowEvent)) {
65              return createPlaceholderRecord(event);
66          }
67          PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((AbstractRowEvent) event).getTableName());
68          if (event instanceof WriteRowEvent) {
69              return handleWriteRowEvent((WriteRowEvent) event, tableMetaData);
70          }
71          if (event instanceof UpdateRowEvent) {
72              return handleUpdateRowEvent((UpdateRowEvent) event, tableMetaData);
73          }
74          if (event instanceof DeleteRowEvent) {
75              return handleDeleteRowEvent((DeleteRowEvent) event, tableMetaData);
76          }
77          throw new UnsupportedSQLOperationException("");
78      }
79      
80      private boolean filter(final AbstractWALEvent event) {
81          if (event instanceof AbstractRowEvent) {
82              AbstractRowEvent rowEvent = (AbstractRowEvent) event;
83              return !dumperContext.getCommonContext().getTableNameMapper().containsTable(rowEvent.getTableName());
84          }
85          return false;
86      }
87      
88      private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) {
89          return new PlaceholderRecord(new WALPosition(event.getLogSequenceNumber()));
90      }
91      
92      private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
93          CaseInsensitiveIdentifier logicTableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
94          return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName);
95      }
96      
97      private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) {
98          DataRecord result = createDataRecord(PipelineSQLOperationType.INSERT, writeRowEvent, writeRowEvent.getAfterRow().size());
99          putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getAfterRow());
100         return result;
101     }
102     
103     private DataRecord handleUpdateRowEvent(final UpdateRowEvent updateRowEvent, final PipelineTableMetaData tableMetaData) {
104         DataRecord result = createDataRecord(PipelineSQLOperationType.UPDATE, updateRowEvent, updateRowEvent.getAfterRow().size());
105         putColumnsIntoDataRecord(result, tableMetaData, updateRowEvent.getAfterRow());
106         return result;
107     }
108     
109     private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final PipelineTableMetaData tableMetaData) {
110         // TODO completion columns
111         DataRecord result = createDataRecord(PipelineSQLOperationType.DELETE, event, event.getPrimaryKeys().size());
112         // TODO Unique key may be a column within unique index
113         List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
114         for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
115             result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), null, true, true));
116         }
117         return result;
118     }
119     
120     private DataRecord createDataRecord(final PipelineSQLOperationType type, final AbstractRowEvent rowsEvent, final int columnCount) {
121         String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
122         DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
123         result.setActualTableName(rowsEvent.getTableName());
124         result.setCsn(rowsEvent.getCsn());
125         return result;
126     }
127     
128     private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final List<Object> values) {
129         for (int i = 0, count = values.size(); i < count; i++) {
130             PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
131             boolean isUniqueKey = columnMetaData.isUniqueKey();
132             Object uniqueKeyOldValue = isUniqueKey && PipelineSQLOperationType.UPDATE == dataRecord.getType() ? values.get(i) : null;
133             Column column = new Column(columnMetaData.getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
134             dataRecord.addColumn(column);
135         }
136     }
137 }