View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
19  
20  import org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
21  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
22  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
23  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
24  
25  import javax.sql.DataSource;
26  import java.sql.Connection;
27  import java.sql.ResultSet;
28  import java.sql.SQLException;
29  import java.sql.Statement;
30  import java.util.Arrays;
31  import java.util.Collection;
32  import java.util.Optional;
33  import java.util.stream.Collectors;
34  
35  /**
36   * Pipeline SQL builder of openGauss.
37   */
38  public final class OpenGaussPipelineSQLBuilder implements DialectPipelineSQLBuilder {
39      
40      @Override
41      public Optional<String> buildCreateSchemaSQL(final String schemaName) {
42          return Optional.of(String.format("CREATE SCHEMA %s", schemaName));
43      }
44      
45      @Override
46      public Optional<String> buildInsertOnDuplicateClause(final DataRecord dataRecord) {
47          StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
48          PipelineSQLSegmentBuilder sqlSegmentBuilder = new PipelineSQLSegmentBuilder(getType());
49          result.append(dataRecord.getColumns().stream()
50                  .filter(each -> !each.isUniqueKey()).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + "=EXCLUDED." + sqlSegmentBuilder.getEscapedIdentifier(each.getName()))
51                  .collect(Collectors.joining(",")));
52          return Optional.of(result.toString());
53      }
54      
55      @Override
56      public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
57          return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
58      }
59      
60      @Override
61      public Optional<String> buildEstimatedCountSQL(final String catalogName, final String qualifiedTableName) {
62          return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
63      }
64      
65      @Override
66      public String buildSplitByUniqueKeyRangedSubqueryClause(final String qualifiedTableName, final String uniqueKey, final boolean hasLowerBound) {
67          return hasLowerBound
68                  ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
69                  : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey);
70      }
71      
72      @Override
73      public Collection<String> buildCreateTableSQLs(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
74          try (
75                  Connection connection = dataSource.getConnection();
76                  Statement statement = connection.createStatement();
77                  ResultSet resultSet = statement.executeQuery(String.format("SELECT * FROM pg_get_tabledef('%s.%s')", schemaName, tableName))) {
78              if (resultSet.next()) {
79                  // TODO use ";" to split is not always correct if return value's comments contains ";"
80                  Collection<String> defSQLs = Arrays.asList(resultSet.getString("pg_get_tabledef").split(";"));
81                  return defSQLs.stream().map(sql -> {
82                      String targetPrefix = "ALTER TABLE " + tableName;
83                      if (sql.trim().startsWith(targetPrefix)) {
84                          return sql.replaceFirst(targetPrefix, "ALTER TABLE " + schemaName + "." + tableName);
85                      }
86                      return sql;
87                  }).collect(Collectors.toList());
88              }
89          }
90          throw new CreateTableSQLGenerateException(tableName);
91      }
92      
93      @Override
94      public Optional<String> buildQueryCurrentPositionSQL() {
95          return Optional.of("SELECT * FROM pg_current_xlog_location()");
96      }
97      
98      @Override
99      public String wrapWithPageQuery(final String sql) {
100         return sql + " LIMIT ?";
101     }
102     
103     @Override
104     public String getDatabaseType() {
105         return "openGauss";
106     }
107 }