View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
19  
20  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
21  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
22  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
23  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
24  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.column.PostgreSQLColumnPropertiesAppender;
25  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.constraints.PostgreSQLConstraintsPropertiesAppender;
26  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.index.PostgreSQLIndexSQLGenerator;
27  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.table.PostgreSQLTablePropertiesLoader;
28  import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.template.PostgreSQLPipelineFreemarkerManager;
29  
30  import javax.sql.DataSource;
31  import java.sql.Connection;
32  import java.sql.SQLException;
33  import java.util.Arrays;
34  import java.util.Collection;
35  import java.util.Map;
36  import java.util.Optional;
37  import java.util.stream.Collectors;
38  
39  /**
40   * PostgreSQL pipeline SQL builder.
41   */
42  public final class PostgreSQLPipelineSQLBuilder implements DialectPipelineSQLBuilder {
43      
44      @Override
45      public Optional<String> buildCreateSchemaSQL(final String schemaName) {
46          return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
47      }
48      
49      @Override
50      public Optional<String> buildInsertOnDuplicateClause(final DataRecord dataRecord) {
51          // TODO without unique key, job has been interrupted, which may lead to data duplication
52          if (dataRecord.getUniqueKeyValue().isEmpty()) {
53              return Optional.empty();
54          }
55          StringBuilder result = new StringBuilder("ON CONFLICT (");
56          PipelineSQLSegmentBuilder sqlSegmentBuilder = new PipelineSQLSegmentBuilder(getType());
57          result.append(dataRecord.getColumns().stream().filter(Column::isUniqueKey).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(",")));
58          result.append(") DO UPDATE SET ");
59          result.append(dataRecord.getColumns().stream()
60                  .filter(each -> !each.isUniqueKey()).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + "=EXCLUDED." + sqlSegmentBuilder.getEscapedIdentifier(each.getName()))
61                  .collect(Collectors.joining(",")));
62          return Optional.of(result.toString());
63      }
64      
65      @Override
66      public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
67          return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
68      }
69      
70      @Override
71      public Optional<String> buildEstimatedCountSQL(final String catalogName, final String qualifiedTableName) {
72          return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
73      }
74      
75      // TODO support partitions etc. If user use partition table, after sharding, the partition definition will not be needed. So we need to remove it after supported.
76      @Override
77      public Optional<String> buildCRC32SQL(final String qualifiedTableName, final String columnName) {
78          return Optional.of(String.format("SELECT pg_catalog.pg_checksum_table('%s', true)", qualifiedTableName));
79      }
80      
81      @Override
82      public Collection<String> buildCreateTableSQLs(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
83          try (Connection connection = dataSource.getConnection()) {
84              int majorVersion = connection.getMetaData().getDatabaseMajorVersion();
85              int minorVersion = connection.getMetaData().getDatabaseMinorVersion();
86              Map<String, Object> materials = loadMaterials(tableName, schemaName, connection, majorVersion, minorVersion);
87              String tableSQL = generateCreateTableSQL(majorVersion, minorVersion, materials);
88              String indexSQL = generateCreateIndexSQL(connection, majorVersion, minorVersion, materials);
89              // TODO use ";" to split is not always correct if return value's comments contains ";"
90              return Arrays.asList((tableSQL + System.lineSeparator() + indexSQL).trim().split(";"));
91          }
92      }
93      
94      private Map<String, Object> loadMaterials(final String tableName, final String schemaName, final Connection connection, final int majorVersion, final int minorVersion) throws SQLException {
95          Map<String, Object> result = new PostgreSQLTablePropertiesLoader(connection, tableName, schemaName, majorVersion, minorVersion).load();
96          new PostgreSQLColumnPropertiesAppender(connection, majorVersion, minorVersion).append(result);
97          new PostgreSQLConstraintsPropertiesAppender(connection, majorVersion, minorVersion).append(result);
98          formatColumns(result);
99          return result;
100     }
101     
102     private String generateCreateTableSQL(final int majorVersion, final int minorVersion, final Map<String, Object> materials) {
103         return PostgreSQLPipelineFreemarkerManager.getSQLByVersion(materials, "component/table/%s/create.ftl", majorVersion, minorVersion).trim();
104     }
105     
106     private String generateCreateIndexSQL(final Connection connection, final int majorVersion, final int minorVersion, final Map<String, Object> materials) throws SQLException {
107         return new PostgreSQLIndexSQLGenerator(connection, majorVersion, minorVersion).generate(materials);
108     }
109     
110     @SuppressWarnings("unchecked")
111     private void formatColumns(final Map<String, Object> context) {
112         Collection<Map<String, Object>> columns = (Collection<Map<String, Object>>) context.get("columns");
113         for (Map<String, Object> each : columns) {
114             if (each.containsKey("cltype")) {
115                 typeFormatter(each, (String) each.get("cltype"));
116             }
117         }
118     }
119     
120     private void typeFormatter(final Map<String, Object> column, final String columnType) {
121         if (columnType.contains("[]")) {
122             column.put("cltype", columnType.substring(0, columnType.length() - 2));
123             column.put("hasSqrBracket", true);
124         } else {
125             column.put("hasSqrBracket", false);
126         }
127     }
128     
129     @Override
130     public Optional<String> buildQueryCurrentPositionSQL() {
131         return Optional.of("SELECT * FROM pg_current_wal_lsn()");
132     }
133     
134     @Override
135     public String wrapWithPageQuery(final String sql) {
136         return sql + " LIMIT ?";
137     }
138     
139     @Override
140     public String getDatabaseType() {
141         return "PostgreSQL";
142     }
143 }