View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
19  
20  import com.github.benmanes.caffeine.cache.Cache;
21  import com.github.benmanes.caffeine.cache.Caffeine;
22  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
25  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
26  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
27  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
28  
29  import java.util.Collection;
30  import java.util.Objects;
31  import java.util.Optional;
32  import java.util.stream.Collectors;
33  
34  /**
35   * Pipeline import SQL builder engine.
36   */
37  public final class PipelineImportSQLBuilder {
38      
39      private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
40      
41      private static final String UPDATE_SQL_CACHE_KEY_PREFIX = "UPDATE_";
42      
43      private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
44      
45      private final DialectPipelineSQLBuilder dialectSQLBuilder;
46      
47      private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
48      
49      private final Cache<String, String> sqlCache;
50      
51      public PipelineImportSQLBuilder(final DatabaseType databaseType) {
52          dialectSQLBuilder = DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType);
53          sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
54          sqlCache = Caffeine.newBuilder().initialCapacity(16).maximumSize(1024L).build();
55      }
56      
57      /**
58       * Build insert SQL.
59       *
60       * @param schemaName schema name
61       * @param dataRecord data record
62       * @return insert SQL
63       */
64      public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
65          String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
66          if (null == sqlCache.getIfPresent(sqlCacheKey)) {
67              sqlCache.put(sqlCacheKey, buildInsertSQL0(schemaName, dataRecord));
68          }
69          return sqlCache.getIfPresent(sqlCacheKey);
70      }
71      
72      private String buildInsertSQL0(final String schemaName, final DataRecord dataRecord) {
73          String insertMainClause = buildInsertMainClause(schemaName, dataRecord);
74          return dialectSQLBuilder.buildInsertOnDuplicateClause(dataRecord).map(optional -> insertMainClause + " " + optional).orElse(insertMainClause);
75      }
76      
77      private String buildInsertMainClause(final String schemaName, final DataRecord dataRecord) {
78          String columnsLiteral = dataRecord.getColumns().stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(","));
79          String valuesLiteral = dataRecord.getColumns().stream().map(each -> "?").collect(Collectors.joining(","));
80          return String.format("INSERT INTO %s(%s) VALUES(%s)", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()), columnsLiteral, valuesLiteral);
81      }
82      
83      /**
84       * Build update SQL.
85       *
86       * @param schemaName schema name
87       * @param dataRecord data record
88       * @param conditionColumns condition columns
89       * @return update SQL
90       */
91      public String buildUpdateSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
92          String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
93          if (null == sqlCache.getIfPresent(sqlCacheKey)) {
94              sqlCache.put(sqlCacheKey, buildUpdateSQL0(schemaName, dataRecord, conditionColumns));
95          }
96          Collection<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
97          String updateSetClause = setColumns.stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = ?").collect(Collectors.joining(","));
98          return String.format(Objects.requireNonNull(sqlCache.getIfPresent(sqlCacheKey)), updateSetClause);
99      }
100     
101     private String buildUpdateSQL0(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
102         String updateMainClause = String.format("UPDATE %s SET %%s", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
103         return buildWhereClause(conditionColumns).map(optional -> updateMainClause + optional).orElse(updateMainClause);
104     }
105     
106     /**
107      * Build delete SQL.
108      *
109      * @param schemaName schema name
110      * @param dataRecord data record
111      * @param conditionColumns condition columns
112      * @return delete SQL
113      */
114     public String buildDeleteSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
115         String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
116         if (null == sqlCache.getIfPresent(sqlCacheKey)) {
117             sqlCache.put(sqlCacheKey, buildDeleteSQL0(schemaName, dataRecord, conditionColumns));
118         }
119         return sqlCache.getIfPresent(sqlCacheKey);
120     }
121     
122     private String buildDeleteSQL0(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
123         String deleteMainClause = buildDeleteMainClause(schemaName, dataRecord);
124         return buildWhereClause(conditionColumns).map(optional -> deleteMainClause + optional).orElse(deleteMainClause);
125     }
126     
127     private String buildDeleteMainClause(final String schemaName, final DataRecord dataRecord) {
128         return String.format("DELETE FROM %s", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
129     }
130     
131     private Optional<String> buildWhereClause(final Collection<Column> conditionColumns) {
132         return conditionColumns.isEmpty()
133                 ? Optional.empty()
134                 : Optional.of(" WHERE " + conditionColumns.stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = ?").collect(Collectors.joining(" AND ")));
135     }
136 }