1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
19
20 import com.github.benmanes.caffeine.cache.Cache;
21 import com.github.benmanes.caffeine.cache.Caffeine;
22 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
25 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
26 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
27 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
28
29 import java.util.Collection;
30 import java.util.Objects;
31 import java.util.Optional;
32 import java.util.stream.Collectors;
33
34
35
36
37 public final class PipelineImportSQLBuilder {
38
39 private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
40
41 private static final String UPDATE_SQL_CACHE_KEY_PREFIX = "UPDATE_";
42
43 private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
44
45 private final DialectPipelineSQLBuilder dialectSQLBuilder;
46
47 private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
48
49 private final Cache<String, String> sqlCache;
50
51 public PipelineImportSQLBuilder(final DatabaseType databaseType) {
52 dialectSQLBuilder = DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType);
53 sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
54 sqlCache = Caffeine.newBuilder().initialCapacity(16).maximumSize(1024L).build();
55 }
56
57
58
59
60
61
62
63
64 public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
65 String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
66 if (null == sqlCache.getIfPresent(sqlCacheKey)) {
67 sqlCache.put(sqlCacheKey, buildInsertSQL0(schemaName, dataRecord));
68 }
69 return sqlCache.getIfPresent(sqlCacheKey);
70 }
71
72 private String buildInsertSQL0(final String schemaName, final DataRecord dataRecord) {
73 String insertMainClause = buildInsertMainClause(schemaName, dataRecord);
74 return dialectSQLBuilder.buildInsertOnDuplicateClause(dataRecord).map(optional -> insertMainClause + " " + optional).orElse(insertMainClause);
75 }
76
77 private String buildInsertMainClause(final String schemaName, final DataRecord dataRecord) {
78 String columnsLiteral = dataRecord.getColumns().stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(","));
79 String valuesLiteral = dataRecord.getColumns().stream().map(each -> "?").collect(Collectors.joining(","));
80 return String.format("INSERT INTO %s(%s) VALUES(%s)", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()), columnsLiteral, valuesLiteral);
81 }
82
83
84
85
86
87
88
89
90
91 public String buildUpdateSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
92 String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
93 if (null == sqlCache.getIfPresent(sqlCacheKey)) {
94 sqlCache.put(sqlCacheKey, buildUpdateSQL0(schemaName, dataRecord, conditionColumns));
95 }
96 Collection<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
97 String updateSetClause = setColumns.stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = ?").collect(Collectors.joining(","));
98 return String.format(Objects.requireNonNull(sqlCache.getIfPresent(sqlCacheKey)), updateSetClause);
99 }
100
101 private String buildUpdateSQL0(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
102 String updateMainClause = String.format("UPDATE %s SET %%s", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
103 return buildWhereClause(conditionColumns).map(optional -> updateMainClause + optional).orElse(updateMainClause);
104 }
105
106
107
108
109
110
111
112
113
114 public String buildDeleteSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
115 String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
116 if (null == sqlCache.getIfPresent(sqlCacheKey)) {
117 sqlCache.put(sqlCacheKey, buildDeleteSQL0(schemaName, dataRecord, conditionColumns));
118 }
119 return sqlCache.getIfPresent(sqlCacheKey);
120 }
121
122 private String buildDeleteSQL0(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
123 String deleteMainClause = buildDeleteMainClause(schemaName, dataRecord);
124 return buildWhereClause(conditionColumns).map(optional -> deleteMainClause + optional).orElse(deleteMainClause);
125 }
126
127 private String buildDeleteMainClause(final String schemaName, final DataRecord dataRecord) {
128 return String.format("DELETE FROM %s", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
129 }
130
131 private Optional<String> buildWhereClause(final Collection<Column> conditionColumns) {
132 return conditionColumns.isEmpty()
133 ? Optional.empty()
134 : Optional.of(" WHERE " + conditionColumns.stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = ?").collect(Collectors.joining(" AND ")));
135 }
136 }