1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
19
20 import lombok.RequiredArgsConstructor;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
23 import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
24 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
25 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
26 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
27 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
28 import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
29 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
30
31 import java.sql.Connection;
32 import java.sql.ResultSet;
33 import java.sql.SQLException;
34 import java.util.Collection;
35 import java.util.LinkedHashMap;
36 import java.util.LinkedHashSet;
37 import java.util.LinkedList;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.Set;
41 import java.util.SortedMap;
42 import java.util.TreeMap;
43 import java.util.concurrent.ConcurrentHashMap;
44 import java.util.stream.Collectors;
45
46
47
48
49 @RequiredArgsConstructor
50 @Slf4j
51 public final class StandardPipelineTableMetaDataLoader implements PipelineTableMetaDataLoader {
52
53
54 private final PipelineDataSourceWrapper dataSource;
55
56 private final Map<CaseInsensitiveIdentifier, PipelineTableMetaData> tableMetaDataMap = new ConcurrentHashMap<>();
57
58 @Override
59 public PipelineTableMetaData getTableMetaData(final String schemaName, final String tableName) {
60 PipelineTableMetaData result = tableMetaDataMap.get(new CaseInsensitiveIdentifier(tableName));
61 if (null != result) {
62 return result;
63 }
64 try {
65 loadTableMetaData(schemaName, tableName);
66 } catch (final SQLException ex) {
67 throw new PipelineInternalException(String.format("Load meta data for schema '%s' and table '%s' failed", schemaName, tableName), ex);
68 }
69 result = tableMetaDataMap.get(new CaseInsensitiveIdentifier(tableName));
70 if (null == result) {
71 log.warn("Can not load meta data for table '{}'", tableName);
72 }
73 return result;
74 }
75
76 private void loadTableMetaData(final String schemaName, final String tableNamePattern) throws SQLException {
77 try (Connection connection = dataSource.getConnection()) {
78 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSource.getDatabaseType()).getDialectDatabaseMetaData();
79 Map<CaseInsensitiveIdentifier, PipelineTableMetaData> tableMetaDataMap = loadTableMetaData0(connection, dialectDatabaseMetaData.isSchemaAvailable() ? schemaName : null, tableNamePattern);
80 this.tableMetaDataMap.putAll(tableMetaDataMap);
81 }
82 }
83
84 private Map<CaseInsensitiveIdentifier, PipelineTableMetaData> loadTableMetaData0(final Connection connection, final String schemaName, final String tableNamePattern) throws SQLException {
85 Collection<String> tableNames = new LinkedList<>();
86 try (ResultSet resultSet = connection.getMetaData().getTables(connection.getCatalog(), schemaName, tableNamePattern, null)) {
87 while (resultSet.next()) {
88 String tableName = resultSet.getString("TABLE_NAME");
89 tableNames.add(tableName);
90 }
91 }
92 Map<CaseInsensitiveIdentifier, PipelineTableMetaData> result = new LinkedHashMap<>();
93 for (String each : tableNames) {
94 Set<String> primaryKeys = loadPrimaryKeys(connection, schemaName, each);
95 Map<String, Collection<String>> uniqueKeys = loadUniqueIndexesOfTable(connection, schemaName, each);
96 Map<String, PipelineColumnMetaData> columnMetaDataMap = new LinkedHashMap<>();
97 try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), schemaName, each, "%")) {
98 while (resultSet.next()) {
99 int ordinalPosition = resultSet.getInt("ORDINAL_POSITION");
100 String columnName = resultSet.getString("COLUMN_NAME");
101 if (columnMetaDataMap.containsKey(columnName)) {
102 continue;
103 }
104 int dataType = resultSet.getInt("DATA_TYPE");
105 String dataTypeName = resultSet.getString("TYPE_NAME");
106 boolean primaryKey = primaryKeys.contains(columnName);
107 boolean isNullable = "YES".equals(resultSet.getString("IS_NULLABLE"));
108 boolean isUniqueKey = uniqueKeys.values().stream().anyMatch(names -> names.contains(columnName));
109 PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName, isNullable, primaryKey, isUniqueKey);
110 columnMetaDataMap.put(columnName, columnMetaData);
111 }
112 }
113 Collection<PipelineIndexMetaData> uniqueIndexMetaData = uniqueKeys.entrySet().stream()
114 .map(entry -> new PipelineIndexMetaData(entry.getKey(), entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()))).collect(Collectors.toList());
115 result.put(new CaseInsensitiveIdentifier(each), new PipelineTableMetaData(each,
116 columnMetaDataMap.entrySet().stream().collect(Collectors.toMap(entry -> new CaseInsensitiveIdentifier(entry.getKey()), Entry::getValue)), uniqueIndexMetaData));
117 }
118 return result;
119 }
120
121 private Map<String, Collection<String>> loadUniqueIndexesOfTable(final Connection connection, final String schemaName, final String tableName) throws SQLException {
122 Map<String, SortedMap<Short, String>> orderedColumnsOfIndexes = new LinkedHashMap<>();
123 try (ResultSet resultSet = connection.getMetaData().getIndexInfo(connection.getCatalog(), schemaName, tableName, true, false)) {
124 while (resultSet.next()) {
125 String indexName = resultSet.getString("INDEX_NAME");
126 if (null == indexName) {
127 continue;
128 }
129 orderedColumnsOfIndexes.computeIfAbsent(indexName, unused -> new TreeMap<>()).put(resultSet.getShort("ORDINAL_POSITION"), resultSet.getString("COLUMN_NAME"));
130 }
131 }
132 Map<String, Collection<String>> result = new LinkedHashMap<>();
133 for (Entry<String, SortedMap<Short, String>> entry : orderedColumnsOfIndexes.entrySet()) {
134 Collection<String> columnNames = result.computeIfAbsent(entry.getKey(), unused -> new LinkedList<>());
135 columnNames.addAll(entry.getValue().values());
136 }
137 return result;
138 }
139
140 private Set<String> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
141 Set<String> result = new LinkedHashSet<>();
142
143 try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), schemaName, tableName)) {
144 while (resultSet.next()) {
145 result.add(resultSet.getString("COLUMN_NAME"));
146 }
147 }
148 return result;
149 }
150 }