1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
19
20 import lombok.RequiredArgsConstructor;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
23 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
24 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
25 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
26 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
27 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
28 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
29 import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
30
31 import java.sql.Connection;
32 import java.sql.ResultSet;
33 import java.sql.SQLException;
34 import java.util.Collection;
35 import java.util.LinkedHashMap;
36 import java.util.LinkedList;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.SortedMap;
40 import java.util.TreeMap;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.stream.Collectors;
43
44
45
46
47 @RequiredArgsConstructor
48 @Slf4j
49 public final class StandardPipelineTableMetaDataLoader implements PipelineTableMetaDataLoader {
50
51 private final PipelineDataSource dataSource;
52
53 private final Map<ShardingSphereIdentifier, PipelineTableMetaData> tableMetaDataMap = new ConcurrentHashMap<>();
54
55 @Override
56 public PipelineTableMetaData getTableMetaData(final String schemaName, final String tableName) {
57 PipelineTableMetaData result = tableMetaDataMap.get(new ShardingSphereIdentifier(tableName));
58 if (null != result) {
59 return result;
60 }
61 try {
62 loadTableMetaData(schemaName, tableName);
63 } catch (final SQLException ex) {
64 throw new PipelineInternalException(String.format("Load meta data for schema '%s' and table '%s' failed", schemaName, tableName), ex);
65 }
66 result = tableMetaDataMap.get(new ShardingSphereIdentifier(tableName));
67 if (null == result) {
68 log.warn("Can not load meta data for table '{}'", tableName);
69 }
70 return result;
71 }
72
73 private void loadTableMetaData(final String schemaName, final String tableName) throws SQLException {
74 try (Connection connection = dataSource.getConnection()) {
75 DatabaseTypeRegistry databaseTypeRegistry = new DatabaseTypeRegistry(dataSource.getDatabaseType());
76 tableMetaDataMap.putAll(loadTableMetaData(connection, databaseTypeRegistry.getDialectDatabaseMetaData().getSchemaOption().isSchemaAvailable() ? schemaName : null, tableName));
77 }
78 }
79
80 private Map<ShardingSphereIdentifier, PipelineTableMetaData> loadTableMetaData(final Connection connection, final String schemaName, final String tableNamePattern) throws SQLException {
81 Collection<String> tableNames = new LinkedList<>();
82 try (ResultSet resultSet = connection.getMetaData().getTables(connection.getCatalog(), schemaName, tableNamePattern, null)) {
83 while (resultSet.next()) {
84 tableNames.add(resultSet.getString("TABLE_NAME"));
85 }
86 }
87 Map<ShardingSphereIdentifier, PipelineTableMetaData> result = new LinkedHashMap<>(tableNames.size(), 1F);
88 for (String each : tableNames) {
89 Collection<ShardingSphereIdentifier> primaryKeys = loadPrimaryKeys(connection, schemaName, each);
90 Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> uniqueKeys = loadUniqueKeys(connection, schemaName, each);
91 Map<ShardingSphereIdentifier, PipelineColumnMetaData> columnMetaDataMap = new LinkedHashMap<>();
92 try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), schemaName, each, "%")) {
93 while (resultSet.next()) {
94 int ordinalPosition = resultSet.getInt("ORDINAL_POSITION");
95 ShardingSphereIdentifier columnName = new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME"));
96 if (columnMetaDataMap.containsKey(columnName)) {
97 continue;
98 }
99 int dataType = resultSet.getInt("DATA_TYPE");
100 String dataTypeName = resultSet.getString("TYPE_NAME");
101 boolean primaryKey = primaryKeys.contains(columnName);
102 boolean isNullable = "YES".equals(resultSet.getString("IS_NULLABLE"));
103 boolean isUniqueKey = uniqueKeys.values().stream().anyMatch(names -> names.contains(columnName));
104 columnMetaDataMap.put(columnName, new PipelineColumnMetaData(ordinalPosition, columnName.toString(), dataType, dataTypeName, isNullable, primaryKey, isUniqueKey));
105 }
106 }
107 Collection<PipelineIndexMetaData> uniqueIndexMetaData = uniqueKeys.entrySet().stream()
108 .map(entry -> new PipelineIndexMetaData(entry.getKey(), entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()),
109 DataConsistencyCheckUtils.compareLists(primaryKeys, entry.getValue())))
110 .collect(Collectors.toList());
111 result.put(new ShardingSphereIdentifier(each), new PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData));
112 }
113 return result;
114 }
115
116 private Collection<ShardingSphereIdentifier> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
117 SortedMap<Short, ShardingSphereIdentifier> result = new TreeMap<>();
118 try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), schemaName, tableName)) {
119 while (resultSet.next()) {
120 result.put(resultSet.getShort("KEY_SEQ"), new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
121 }
122 }
123 return result.values();
124 }
125
126 private Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> loadUniqueKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
127 Map<String, SortedMap<Short, ShardingSphereIdentifier>> orderedColumnsOfIndexes = new LinkedHashMap<>();
128
129 try (ResultSet resultSet = connection.getMetaData().getIndexInfo(connection.getCatalog(), schemaName, tableName, true, true)) {
130 while (resultSet.next()) {
131 String indexName = resultSet.getString("INDEX_NAME");
132 if (null == indexName) {
133 continue;
134 }
135 orderedColumnsOfIndexes.computeIfAbsent(indexName,
136 unused -> new TreeMap<>()).put(resultSet.getShort("ORDINAL_POSITION"), new ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
137 }
138 }
139 Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> result = new LinkedHashMap<>();
140 for (Entry<String, SortedMap<Short, ShardingSphereIdentifier>> entry : orderedColumnsOfIndexes.entrySet()) {
141 Collection<ShardingSphereIdentifier> columnNames = result.computeIfAbsent(new ShardingSphereIdentifier(entry.getKey()), unused -> new LinkedList<>());
142 columnNames.addAll(entry.getValue().values());
143 }
144 return result;
145 }
146 }