View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
19  
20  import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
21  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
22  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
23  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
24  import org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLColumnPropertiesAppender;
25  import org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLConstraintsPropertiesAppender;
26  import org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLIndexSQLGenerator;
27  import org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLTablePropertiesLoader;
28  import org.apache.shardingsphere.data.pipeline.postgresql.util.PostgreSQLPipelineFreemarkerManager;
29  
30  import javax.sql.DataSource;
31  import java.sql.Connection;
32  import java.sql.SQLException;
33  import java.util.Arrays;
34  import java.util.Collection;
35  import java.util.Map;
36  import java.util.Optional;
37  import java.util.stream.Collectors;
38  
39  /**
40   * PostgreSQL pipeline SQL builder.
41   */
42  public final class PostgreSQLPipelineSQLBuilder implements DialectPipelineSQLBuilder {
43      
44      @Override
45      public Optional<String> buildCreateSchemaSQL(final String schemaName) {
46          return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
47      }
48      
49      @Override
50      public Optional<String> buildInsertOnDuplicateClause(final DataRecord dataRecord) {
51          // TODO without unique key, job has been interrupted, which may lead to data duplication
52          if (dataRecord.getUniqueKeyValue().isEmpty()) {
53              return Optional.empty();
54          }
55          StringBuilder result = new StringBuilder("ON CONFLICT (");
56          PipelineSQLSegmentBuilder sqlSegmentBuilder = new PipelineSQLSegmentBuilder(getType());
57          result.append(dataRecord.getColumns().stream().filter(Column::isUniqueKey).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(",")));
58          result.append(") DO UPDATE SET ");
59          result.append(dataRecord.getColumns().stream()
60                  .filter(each -> !each.isUniqueKey()).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + "=EXCLUDED." + sqlSegmentBuilder.getEscapedIdentifier(each.getName()))
61                  .collect(Collectors.joining(",")));
62          return Optional.of(result.toString());
63      }
64      
65      @Override
66      public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
67          return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
68      }
69      
70      @Override
71      public Optional<String> buildEstimatedCountSQL(final String qualifiedTableName) {
72          return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
73      }
74      
75      // TODO support partitions etc.
76      @Override
77      public Collection<String> buildCreateTableSQLs(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
78          try (Connection connection = dataSource.getConnection()) {
79              int majorVersion = connection.getMetaData().getDatabaseMajorVersion();
80              int minorVersion = connection.getMetaData().getDatabaseMinorVersion();
81              Map<String, Object> materials = loadMaterials(tableName, schemaName, connection, majorVersion, minorVersion);
82              String tableSQL = generateCreateTableSQL(majorVersion, minorVersion, materials);
83              String indexSQL = generateCreateIndexSQL(connection, majorVersion, minorVersion, materials);
84              // TODO use ";" to split is not always correct
85              return Arrays.asList((tableSQL + System.lineSeparator() + indexSQL).trim().split(";"));
86          }
87      }
88      
89      private Map<String, Object> loadMaterials(final String tableName, final String schemaName, final Connection connection, final int majorVersion, final int minorVersion) throws SQLException {
90          Map<String, Object> result = new PostgreSQLTablePropertiesLoader(connection, tableName, schemaName, majorVersion, minorVersion).load();
91          new PostgreSQLColumnPropertiesAppender(connection, majorVersion, minorVersion).append(result);
92          new PostgreSQLConstraintsPropertiesAppender(connection, majorVersion, minorVersion).append(result);
93          formatColumns(result);
94          return result;
95      }
96      
97      private String generateCreateTableSQL(final int majorVersion, final int minorVersion, final Map<String, Object> materials) {
98          return PostgreSQLPipelineFreemarkerManager.getSQLByVersion(materials, "component/table/%s/create.ftl", majorVersion, minorVersion).trim();
99      }
100     
101     private String generateCreateIndexSQL(final Connection connection, final int majorVersion, final int minorVersion, final Map<String, Object> materials) throws SQLException {
102         return new PostgreSQLIndexSQLGenerator(connection, majorVersion, minorVersion).generate(materials);
103     }
104     
105     @SuppressWarnings("unchecked")
106     private void formatColumns(final Map<String, Object> context) {
107         Collection<Map<String, Object>> columns = (Collection<Map<String, Object>>) context.get("columns");
108         for (Map<String, Object> each : columns) {
109             if (each.containsKey("cltype")) {
110                 typeFormatter(each, (String) each.get("cltype"));
111             }
112         }
113     }
114     
115     private void typeFormatter(final Map<String, Object> column, final String columnType) {
116         if (columnType.contains("[]")) {
117             column.put("cltype", columnType.substring(0, columnType.length() - 2));
118             column.put("hasSqrBracket", true);
119         } else {
120             column.put("hasSqrBracket", false);
121         }
122     }
123     
124     @Override
125     public Optional<String> buildQueryCurrentPositionSQL() {
126         return Optional.of("SELECT * FROM pg_current_wal_lsn()");
127     }
128     
129     @Override
130     public String getDatabaseType() {
131         return "PostgreSQL";
132     }
133 }