View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
19  
20  import com.github.benmanes.caffeine.cache.Cache;
21  import com.github.benmanes.caffeine.cache.Caffeine;
22  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
23  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
24  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
25  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
26  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
28  
29  import java.util.Collection;
30  import java.util.Objects;
31  import java.util.Optional;
32  import java.util.stream.Collectors;
33  
34  /**
35   * Pipeline import SQL builder engine.
36   */
37  public final class PipelineImportSQLBuilder {
38      
39      private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
40      
41      private static final String UPDATE_SQL_CACHE_KEY_PREFIX = "UPDATE_";
42      
43      private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
44      
45      private final DialectPipelineSQLBuilder dialectSQLBuilder;
46      
47      private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
48      
49      private final Cache<String, String> sqlCache;
50      
51      public PipelineImportSQLBuilder(final DatabaseType databaseType) {
52          dialectSQLBuilder = DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType);
53          sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
54          sqlCache = Caffeine.newBuilder().initialCapacity(16).maximumSize(1024L).build();
55      }
56      
57      /**
58       * Build insert SQL.
59       *
60       * @param schemaName schema name
61       * @param dataRecord data record
62       * @return insert SQL
63       */
64      public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
65          String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
66          if (null == sqlCache.getIfPresent(sqlCacheKey)) {
67              String insertMainClause = buildInsertMainClause(schemaName, dataRecord);
68              sqlCache.put(sqlCacheKey, dialectSQLBuilder.buildInsertOnDuplicateClause(dataRecord).map(optional -> insertMainClause + " " + optional).orElse(insertMainClause));
69          }
70          return sqlCache.getIfPresent(sqlCacheKey);
71      }
72      
73      private String buildInsertMainClause(final String schemaName, final DataRecord dataRecord) {
74          String columnsLiteral = dataRecord.getColumns().stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(","));
75          String valuesLiteral = dataRecord.getColumns().stream().map(each -> "?").collect(Collectors.joining(","));
76          return String.format("INSERT INTO %s(%s) VALUES(%s)", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()), columnsLiteral, valuesLiteral);
77      }
78      
79      /**
80       * Build update SQL.
81       *
82       * @param schemaName schema name
83       * @param dataRecord data record
84       * @param conditionColumns condition columns
85       * @return update SQL
86       */
87      public String buildUpdateSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
88          String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
89          if (null == sqlCache.getIfPresent(sqlCacheKey)) {
90              String updateMainClause = String.format("UPDATE %s SET %%s", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
91              sqlCache.put(sqlCacheKey, buildWhereClause(conditionColumns).map(optional -> updateMainClause + optional).orElse(updateMainClause));
92          }
93          Collection<Column> setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
94          String updateSetClause = setColumns.stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = ?").collect(Collectors.joining(","));
95          return String.format(Objects.requireNonNull(sqlCache.getIfPresent(sqlCacheKey)), updateSetClause);
96      }
97      
98      /**
99       * Build delete SQL.
100      *
101      * @param schemaName schema name
102      * @param dataRecord data record
103      * @param conditionColumns condition columns
104      * @return delete SQL
105      */
106     public String buildDeleteSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
107         String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
108         if (null == sqlCache.getIfPresent(sqlCacheKey)) {
109             String deleteMainClause = buildDeleteMainClause(schemaName, dataRecord);
110             sqlCache.put(sqlCacheKey, buildWhereClause(conditionColumns).map(optional -> deleteMainClause + optional).orElse(deleteMainClause));
111         }
112         return sqlCache.getIfPresent(sqlCacheKey);
113     }
114     
115     private String buildDeleteMainClause(final String schemaName, final DataRecord dataRecord) {
116         return String.format("DELETE FROM %s", sqlSegmentBuilder.getQualifiedTableName(schemaName, dataRecord.getTableName()));
117     }
118     
119     private Optional<String> buildWhereClause(final Collection<Column> conditionColumns) {
120         return conditionColumns.isEmpty()
121                 ? Optional.empty()
122                 : Optional.of(" WHERE " + conditionColumns.stream().map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + " = ?").collect(Collectors.joining(" AND ")));
123     }
124 }