View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
19  
20  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
21  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
22  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
23  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
24  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.column.PostgreSQLColumnPropertiesAppender;
25  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.constraints.PostgreSQLConstraintsPropertiesAppender;
26  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.index.PostgreSQLIndexSQLGenerator;
27  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.table.PostgreSQLTablePropertiesLoader;
28  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.template.PostgreSQLPipelineFreemarkerManager;
29  
30  import javax.sql.DataSource;
31  import java.sql.Connection;
32  import java.sql.SQLException;
33  import java.util.Arrays;
34  import java.util.Collection;
35  import java.util.Map;
36  import java.util.Optional;
37  import java.util.stream.Collectors;
38  
39  /**
40   * PostgreSQL pipeline SQL builder.
41   */
42  public final class PostgreSQLPipelineSQLBuilder implements DialectPipelineSQLBuilder {
43      
44      @Override
45      public Optional<String> buildCreateSchemaSQL(final String schemaName) {
46          return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
47      }
48      
49      @Override
50      public Optional<String> buildInsertOnDuplicateClause(final DataRecord dataRecord) {
51          // TODO without unique key, job has been interrupted, which may lead to data duplication
52          if (dataRecord.getUniqueKeyValue().isEmpty()) {
53              return Optional.empty();
54          }
55          StringBuilder result = new StringBuilder("ON CONFLICT (");
56          PipelineSQLSegmentBuilder sqlSegmentBuilder = new PipelineSQLSegmentBuilder(getType());
57          result.append(dataRecord.getColumns().stream().filter(Column::isUniqueKey).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(",")));
58          result.append(") DO UPDATE SET ");
59          result.append(dataRecord.getColumns().stream()
60                  .filter(each -> !each.isUniqueKey()).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + "=EXCLUDED." + sqlSegmentBuilder.getEscapedIdentifier(each.getName()))
61                  .collect(Collectors.joining(",")));
62          return Optional.of(result.toString());
63      }
64      
65      @Override
66      public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
67          return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
68      }
69      
70      @Override
71      public Optional<String> buildEstimatedCountSQL(final String catalogName, final String qualifiedTableName) {
72          return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
73      }
74      
75      // TODO support partitions etc. If user use partition table, after sharding, the partition definition will not be needed. So we need to remove it after supported.
76      @Override
77      public Optional<String> buildCRC32SQL(final String qualifiedTableName, final String columnName) {
78          return Optional.of(String.format("SELECT pg_catalog.pg_checksum_table('%s', true)", qualifiedTableName));
79      }
80      
81      @Override
82      public String buildSplitByUniqueKeyRangedSubqueryClause(final String qualifiedTableName, final String uniqueKey, final boolean hasLowerBound) {
83          return hasLowerBound
84                  ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
85                  : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey);
86      }
87      
88      @Override
89      public Collection<String> buildCreateTableSQLs(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
90          try (Connection connection = dataSource.getConnection()) {
91              int majorVersion = connection.getMetaData().getDatabaseMajorVersion();
92              int minorVersion = connection.getMetaData().getDatabaseMinorVersion();
93              Map<String, Object> materials = loadMaterials(tableName, schemaName, connection, majorVersion, minorVersion);
94              String tableSQL = generateCreateTableSQL(majorVersion, minorVersion, materials);
95              String indexSQL = generateCreateIndexSQL(connection, majorVersion, minorVersion, materials);
96              // TODO use ";" to split is not always correct if return value's comments contains ";"
97              return Arrays.asList((tableSQL + System.lineSeparator() + indexSQL).trim().split(";"));
98          }
99      }
100     
101     private Map<String, Object> loadMaterials(final String tableName, final String schemaName, final Connection connection, final int majorVersion, final int minorVersion) throws SQLException {
102         Map<String, Object> result = new PostgreSQLTablePropertiesLoader(connection, tableName, schemaName, majorVersion, minorVersion).load();
103         new PostgreSQLColumnPropertiesAppender(connection, majorVersion, minorVersion).append(result);
104         new PostgreSQLConstraintsPropertiesAppender(connection, majorVersion, minorVersion).append(result);
105         formatColumns(result);
106         return result;
107     }
108     
109     private String generateCreateTableSQL(final int majorVersion, final int minorVersion, final Map<String, Object> materials) {
110         return PostgreSQLPipelineFreemarkerManager.getSQLByVersion(materials, "component/table/%s/create.ftl", majorVersion, minorVersion).trim();
111     }
112     
113     private String generateCreateIndexSQL(final Connection connection, final int majorVersion, final int minorVersion, final Map<String, Object> materials) throws SQLException {
114         return new PostgreSQLIndexSQLGenerator(connection, majorVersion, minorVersion).generate(materials);
115     }
116     
117     @SuppressWarnings("unchecked")
118     private void formatColumns(final Map<String, Object> context) {
119         Collection<Map<String, Object>> columns = (Collection<Map<String, Object>>) context.get("columns");
120         for (Map<String, Object> each : columns) {
121             if (each.containsKey("cltype")) {
122                 typeFormatter(each, (String) each.get("cltype"));
123             }
124         }
125     }
126     
127     private void typeFormatter(final Map<String, Object> column, final String columnType) {
128         if (columnType.contains("[]")) {
129             column.put("cltype", columnType.substring(0, columnType.length() - 2));
130             column.put("hasSqrBracket", true);
131         } else {
132             column.put("hasSqrBracket", false);
133         }
134     }
135     
136     @Override
137     public Optional<String> buildQueryCurrentPositionSQL() {
138         return Optional.of("SELECT * FROM pg_current_wal_lsn()");
139     }
140     
141     @Override
142     public String wrapWithPageQuery(final String sql) {
143         return sql + " LIMIT ?";
144     }
145     
146     @Override
147     public String getDatabaseType() {
148         return "PostgreSQL";
149     }
150 }