1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.importer;
19
20 import lombok.Getter;
21 import lombok.RequiredArgsConstructor;
22 import lombok.ToString;
23 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
25 import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
26 import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
27 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
28 import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
29 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
30
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Map;
34 import java.util.Optional;
35 import java.util.Set;
36 import java.util.stream.Collectors;
37
38
39
40
41 @RequiredArgsConstructor
42 @Getter
43 @ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
44 public final class ImporterConfiguration {
45
46 private final PipelineDataSourceConfiguration dataSourceConfig;
47
48
49 private final Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap;
50
51 private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
52
53 private final int batchSize;
54
55 private final JobRateLimitAlgorithm rateLimitAlgorithm;
56
57 private final int retryTimes;
58
59
60 private final int concurrency;
61
62
63
64
65
66
67
68 public Set<String> getShardingColumns(final String logicTableName) {
69 return shardingColumnsMap.getOrDefault(new CaseInsensitiveIdentifier(logicTableName), Collections.emptySet());
70 }
71
72
73
74
75
76
77
78 public Optional<String> findSchemaName(final String logicTableName) {
79 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
80 return dialectDatabaseMetaData.isSchemaAvailable() ? Optional.of(tableAndSchemaNameMapper.getSchemaName(logicTableName)) : Optional.empty();
81 }
82
83
84
85
86
87
88 public Collection<CaseInsensitiveQualifiedTable> getQualifiedTables() {
89 return shardingColumnsMap.keySet().stream()
90 .map(CaseInsensitiveIdentifier::toString).map(each -> new CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), each)).collect(Collectors.toList());
91 }
92 }