1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.importer;
19
20 import lombok.AccessLevel;
21 import lombok.Getter;
22 import lombok.RequiredArgsConstructor;
23 import lombok.ToString;
24 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
25 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
26 import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
27 import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
28 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
29 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
30 import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
31
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Map;
35 import java.util.Optional;
36 import java.util.Set;
37 import java.util.stream.Collectors;
38
39
40
41
42 @RequiredArgsConstructor
43 @Getter
44 @ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
45 public final class ImporterConfiguration {
46
47 private final PipelineDataSourceConfiguration dataSourceConfig;
48
49 @Getter(AccessLevel.NONE)
50 private final Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap;
51
52 private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
53
54 private final int batchSize;
55
56 private final JobRateLimitAlgorithm rateLimitAlgorithm;
57
58 private final int retryTimes;
59
60 private final int concurrency;
61
62
63
64
65
66
67
68 public Set<String> getShardingColumns(final String logicTableName) {
69 return shardingColumnsMap.getOrDefault(new ShardingSphereIdentifier(logicTableName), Collections.emptySet());
70 }
71
72
73
74
75
76
77
78 public Optional<String> findSchemaName(final String logicTableName) {
79 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
80 return dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable() ? Optional.ofNullable(tableAndSchemaNameMapper.getSchemaName(logicTableName)) : Optional.empty();
81 }
82
83
84
85
86
87
88 public Collection<QualifiedTable> getQualifiedTables() {
89 return shardingColumnsMap.keySet().stream()
90 .map(ShardingSphereIdentifier::getValue).map(each -> new QualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), each)).collect(Collectors.toList());
91 }
92 }