View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
19  
20  import lombok.RequiredArgsConstructor;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
23  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
24  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
25  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
26  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
27  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
28  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
29  import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
30  
31  import java.sql.Connection;
32  import java.sql.ResultSet;
33  import java.sql.SQLException;
34  import java.util.Collection;
35  import java.util.LinkedHashMap;
36  import java.util.LinkedList;
37  import java.util.Map;
38  import java.util.Map.Entry;
39  import java.util.SortedMap;
40  import java.util.TreeMap;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.stream.Collectors;
43  
44  /**
45   * Standard pipeline table meta data loader.
46   */
47  @RequiredArgsConstructor
48  @Slf4j
49  public final class StandardPipelineTableMetaDataLoader implements PipelineTableMetaDataLoader {
50      
51      private final PipelineDataSource dataSource;
52      
53      private final Map<ShardingSphereIdentifier, PipelineTableMetaData> tableMetaDataMap = new ConcurrentHashMap<>();
54      
55      @Override
56      public PipelineTableMetaData getTableMetaData(final String schemaName, final String tableName) {
57          PipelineTableMetaData result = tableMetaDataMap.get(new ShardingSphereIdentifier(tableName));
58          if (null != result) {
59              return result;
60          }
61          try {
62              loadTableMetaData(schemaName, tableName);
63          } catch (final SQLException ex) {
64              throw new PipelineInternalException(String.format("Load meta data for schema '%s' and table '%s' failed", schemaName, tableName), ex);
65          }
66          result = tableMetaDataMap.get(new ShardingSphereIdentifier(tableName));
67          if (null == result) {
68              log.warn("Can not load meta data for table '{}'", tableName);
69          }
70          return result;
71      }
72      
73      private void loadTableMetaData(final String schemaName, final String tableName) throws SQLException {
74          try (Connection connection = dataSource.getConnection()) {
75              DatabaseTypeRegistry databaseTypeRegistry = new DatabaseTypeRegistry(dataSource.getDatabaseType());
76              tableMetaDataMap.putAll(loadTableMetaData(connection, databaseTypeRegistry.getDialectDatabaseMetaData().getSchemaOption().isSchemaAvailable() ? schemaName : null, tableName));
77          }
78      }
79      
80      private Map<ShardingSphereIdentifier, PipelineTableMetaData> loadTableMetaData(final Connection connection, final String schemaName, final String tableNamePattern) throws SQLException {
81          Collection<String> tableNames = new LinkedList<>();
82          try (ResultSet resultSet = connection.getMetaData().getTables(connection.getCatalog(), schemaName, tableNamePattern, null)) {
83              while (resultSet.next()) {
84                  tableNames.add(resultSet.getString("TABLE_NAME"));
85              }
86          }
87          Map<ShardingSphereIdentifier, PipelineTableMetaData> result = new LinkedHashMap<>(tableNames.size(), 1F);
88          for (String each : tableNames) {
89              Collection<ShardingSphereIdentifier> primaryKeys = loadPrimaryKeys(connection, schemaName, each);
90              Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> uniqueKeys = loadUniqueKeys(connection, schemaName, each);
91              Map<ShardingSphereIdentifier, PipelineColumnMetaData> columnMetaDataMap = new LinkedHashMap<>();
92              try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), schemaName, each, "%")) {
93                  while (resultSet.next()) {
94                      int ordinalPosition = resultSet.getInt("ORDINAL_POSITION");
95                      ShardingSphereIdentifier columnName = new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME"));
96                      if (columnMetaDataMap.containsKey(columnName)) {
97                          continue;
98                      }
99                      int dataType = resultSet.getInt("DATA_TYPE");
100                     String dataTypeName = resultSet.getString("TYPE_NAME");
101                     boolean primaryKey = primaryKeys.contains(columnName);
102                     boolean isNullable = "YES".equals(resultSet.getString("IS_NULLABLE"));
103                     boolean isUniqueKey = uniqueKeys.values().stream().anyMatch(names -> names.contains(columnName));
104                     columnMetaDataMap.put(columnName, new PipelineColumnMetaData(ordinalPosition, columnName.toString(), dataType, dataTypeName, isNullable, primaryKey, isUniqueKey));
105                 }
106             }
107             Collection<PipelineIndexMetaData> uniqueIndexMetaData = uniqueKeys.entrySet().stream()
108                     .map(entry -> new PipelineIndexMetaData(entry.getKey(), entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()),
109                             DataConsistencyCheckUtils.compareLists(primaryKeys, entry.getValue())))
110                     .collect(Collectors.toList());
111             result.put(new ShardingSphereIdentifier(each), new PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData));
112         }
113         return result;
114     }
115     
116     private Collection<ShardingSphereIdentifier> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
117         SortedMap<Short, ShardingSphereIdentifier> result = new TreeMap<>();
118         try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), schemaName, tableName)) {
119             while (resultSet.next()) {
120                 result.put(resultSet.getShort("KEY_SEQ"), new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
121             }
122         }
123         return result.values();
124     }
125     
126     private Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> loadUniqueKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
127         Map<String, SortedMap<Short, ShardingSphereIdentifier>> orderedColumnsOfIndexes = new LinkedHashMap<>();
128         // Set approximate=true to avoid Oracle driver 19 run `analyze table`
129         try (ResultSet resultSet = connection.getMetaData().getIndexInfo(connection.getCatalog(), schemaName, tableName, true, true)) {
130             while (resultSet.next()) {
131                 String indexName = resultSet.getString("INDEX_NAME");
132                 if (null == indexName) {
133                     continue;
134                 }
135                 orderedColumnsOfIndexes.computeIfAbsent(indexName,
136                         unused -> new TreeMap<>()).put(resultSet.getShort("ORDINAL_POSITION"), new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
137             }
138         }
139         Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> result = new LinkedHashMap<>();
140         for (Entry<String, SortedMap<Short, ShardingSphereIdentifier>> entry : orderedColumnsOfIndexes.entrySet()) {
141             Collection<ShardingSphereIdentifier> columnNames = result.computeIfAbsent(new ShardingSphereIdentifier(entry.getKey()), unused -> new LinkedList<>());
142             columnNames.addAll(entry.getValue().values());
143         }
144         return result;
145     }
146 }