View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
19  
20  import lombok.RequiredArgsConstructor;
21  import lombok.extern.slf4j.Slf4j;
22  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
23  import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
24  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
25  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
26  import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
27  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
28  import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
29  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
30  
31  import java.sql.Connection;
32  import java.sql.ResultSet;
33  import java.sql.SQLException;
34  import java.util.Collection;
35  import java.util.LinkedHashMap;
36  import java.util.LinkedHashSet;
37  import java.util.LinkedList;
38  import java.util.Map;
39  import java.util.Map.Entry;
40  import java.util.Set;
41  import java.util.SortedMap;
42  import java.util.TreeMap;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.stream.Collectors;
45  
46  /**
47   * Standard pipeline table meta data loader.
48   */
49  @RequiredArgsConstructor
50  @Slf4j
51  public final class StandardPipelineTableMetaDataLoader implements PipelineTableMetaDataLoader {
52      
53      // It doesn't support ShardingSphereDataSource
54      private final PipelineDataSourceWrapper dataSource;
55      
56      private final Map<CaseInsensitiveIdentifier, PipelineTableMetaData> tableMetaDataMap = new ConcurrentHashMap<>();
57      
58      @Override
59      public PipelineTableMetaData getTableMetaData(final String schemaName, final String tableName) {
60          PipelineTableMetaData result = tableMetaDataMap.get(new CaseInsensitiveIdentifier(tableName));
61          if (null != result) {
62              return result;
63          }
64          try {
65              loadTableMetaData(schemaName, tableName);
66          } catch (final SQLException ex) {
67              throw new PipelineInternalException(String.format("Load meta data for schema '%s' and table '%s' failed", schemaName, tableName), ex);
68          }
69          result = tableMetaDataMap.get(new CaseInsensitiveIdentifier(tableName));
70          if (null == result) {
71              log.warn("Can not load meta data for table '{}'", tableName);
72          }
73          return result;
74      }
75      
76      private void loadTableMetaData(final String schemaName, final String tableNamePattern) throws SQLException {
77          try (Connection connection = dataSource.getConnection()) {
78              DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSource.getDatabaseType()).getDialectDatabaseMetaData();
79              Map<CaseInsensitiveIdentifier, PipelineTableMetaData> tableMetaDataMap = loadTableMetaData0(connection, dialectDatabaseMetaData.isSchemaAvailable() ? schemaName : null, tableNamePattern);
80              this.tableMetaDataMap.putAll(tableMetaDataMap);
81          }
82      }
83      
84      private Map<CaseInsensitiveIdentifier, PipelineTableMetaData> loadTableMetaData0(final Connection connection, final String schemaName, final String tableNamePattern) throws SQLException {
85          Collection<String> tableNames = new LinkedList<>();
86          try (ResultSet resultSet = connection.getMetaData().getTables(connection.getCatalog(), schemaName, tableNamePattern, null)) {
87              while (resultSet.next()) {
88                  String tableName = resultSet.getString("TABLE_NAME");
89                  tableNames.add(tableName);
90              }
91          }
92          Map<CaseInsensitiveIdentifier, PipelineTableMetaData> result = new LinkedHashMap<>();
93          for (String each : tableNames) {
94              Set<String> primaryKeys = loadPrimaryKeys(connection, schemaName, each);
95              Map<String, Collection<String>> uniqueKeys = loadUniqueIndexesOfTable(connection, schemaName, each);
96              Map<String, PipelineColumnMetaData> columnMetaDataMap = new LinkedHashMap<>();
97              try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), schemaName, each, "%")) {
98                  while (resultSet.next()) {
99                      int ordinalPosition = resultSet.getInt("ORDINAL_POSITION");
100                     String columnName = resultSet.getString("COLUMN_NAME");
101                     if (columnMetaDataMap.containsKey(columnName)) {
102                         continue;
103                     }
104                     int dataType = resultSet.getInt("DATA_TYPE");
105                     String dataTypeName = resultSet.getString("TYPE_NAME");
106                     boolean primaryKey = primaryKeys.contains(columnName);
107                     boolean isNullable = "YES".equals(resultSet.getString("IS_NULLABLE"));
108                     boolean isUniqueKey = uniqueKeys.values().stream().anyMatch(names -> names.contains(columnName));
109                     PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName, isNullable, primaryKey, isUniqueKey);
110                     columnMetaDataMap.put(columnName, columnMetaData);
111                 }
112             }
113             Collection<PipelineIndexMetaData> uniqueIndexMetaData = uniqueKeys.entrySet().stream()
114                     .map(entry -> new PipelineIndexMetaData(entry.getKey(), entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()))).collect(Collectors.toList());
115             result.put(new CaseInsensitiveIdentifier(each), new PipelineTableMetaData(each,
116                     columnMetaDataMap.entrySet().stream().collect(Collectors.toMap(entry -> new CaseInsensitiveIdentifier(entry.getKey()), Entry::getValue)), uniqueIndexMetaData));
117         }
118         return result;
119     }
120     
121     private Map<String, Collection<String>> loadUniqueIndexesOfTable(final Connection connection, final String schemaName, final String tableName) throws SQLException {
122         Map<String, SortedMap<Short, String>> orderedColumnsOfIndexes = new LinkedHashMap<>();
123         try (ResultSet resultSet = connection.getMetaData().getIndexInfo(connection.getCatalog(), schemaName, tableName, true, false)) {
124             while (resultSet.next()) {
125                 String indexName = resultSet.getString("INDEX_NAME");
126                 if (null == indexName) {
127                     continue;
128                 }
129                 orderedColumnsOfIndexes.computeIfAbsent(indexName, unused -> new TreeMap<>()).put(resultSet.getShort("ORDINAL_POSITION"), resultSet.getString("COLUMN_NAME"));
130             }
131         }
132         Map<String, Collection<String>> result = new LinkedHashMap<>();
133         for (Entry<String, SortedMap<Short, String>> entry : orderedColumnsOfIndexes.entrySet()) {
134             Collection<String> columnNames = result.computeIfAbsent(entry.getKey(), unused -> new LinkedList<>());
135             columnNames.addAll(entry.getValue().values());
136         }
137         return result;
138     }
139     
140     private Set<String> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
141         Set<String> result = new LinkedHashSet<>();
142         // TODO order primary keys
143         try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), schemaName, tableName)) {
144             while (resultSet.next()) {
145                 result.add(resultSet.getString("COLUMN_NAME"));
146             }
147         }
148         return result;
149     }
150 }