1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
19
20 import com.github.benmanes.caffeine.cache.Cache;
21 import com.github.benmanes.caffeine.cache.Caffeine;
22 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
25 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
26 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
28
29 import java.util.Collection;
30 import java.util.Objects;
31 import java.util.Optional;
32 import java.util.stream.Collectors;
33
34
35
36
37 public final class PipelineImportSQLBuilder {
38
39 private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
40
41 private static final String UPDATE_SQL_CACHE_KEY_PREFIX = "UPDATE_";
42
43 private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
44
45 private final DialectPipelineSQLBuilder dialectSQLBuilder;
46
47 private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
48
49 private final Cache<String, String> sqlCache;
50
51 public PipelineImportSQLBuilder(final DatabaseType databaseType) {
52 dialectSQLBuilder = DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType);
53 sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
54 sqlCache = Caffeine.newBuilder().initialCapacity(16).maximumSize(1024L).build();
55 }
56
57
58
59
60
61
62
63
64 public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
65 String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
66 if (null == sqlCache.getIfPresent(sqlCacheKey)) {
67 String insertMainClause = buildInsertMainClause(schemaName, dataRecord);
68 sqlCache.put(sqlCacheKey, dialectSQLBuilder.buildInsertOnDuplicateClause(dataRecord).map(optional -> insertMainClause + " " + optional).orElse(insertMainClause));
69 }
70 return sqlCache.getIfPresent(sqlCacheKey);
71 }
72
73 private String buildInsertMainClause(final String schemaName, final DataRecord dataRecord) {
74 String columnsLiteral = dataRecord.getColumns().stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(","));
75 String valuesLiteral = dataRecord.getColumns().stream().map(each -> "?").collect(Collectors.joining(","));
76 return String.format("INSERT INTO %s(%s) VALUES(%s)", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()), columnsLiteral, valuesLiteral);
77 }
78
79
80
81
82
83
84
85
86
87 public String buildUpdateSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
88 String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
89 if (null == sqlCache.getIfPresent(sqlCacheKey)) {
90 String updateMainClause = String.format("UPDATE %s SET %%s", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
91 sqlCache.put(sqlCacheKey, buildWhereClause(conditionColumns).map(optional -> updateMainClause + optional).orElse(updateMainClause));
92 }
93 Collection<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
94 String updateSetClause = setColumns.stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = ?").collect(Collectors.joining(","));
95 return String.format(Objects.requireNonNull(sqlCache.getIfPresent(sqlCacheKey)), updateSetClause);
96 }
97
98
99
100
101
102
103
104
105
106 public String buildDeleteSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
107 String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
108 if (null == sqlCache.getIfPresent(sqlCacheKey)) {
109 String deleteMainClause = buildDeleteMainClause(schemaName, dataRecord);
110 sqlCache.put(sqlCacheKey, buildWhereClause(conditionColumns).map(optional -> deleteMainClause + optional).orElse(deleteMainClause));
111 }
112 return sqlCache.getIfPresent(sqlCacheKey);
113 }
114
115 private String buildDeleteMainClause(final String schemaName, final DataRecord dataRecord) {
116 return String.format("DELETE FROM %s", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
117 }
118
119 private Optional<String> buildWhereClause(final Collection<Column> conditionColumns) {
120 return conditionColumns.isEmpty()
121 ? Optional.empty()
122 : Optional.of(" WHERE " + conditionColumns.stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = ?").collect(Collectors.joining(" AND ")));
123 }
124 }