View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
19  
20  import org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
21  import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
22  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
23  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
24  
25  import javax.sql.DataSource;
26  import java.sql.Connection;
27  import java.sql.ResultSet;
28  import java.sql.SQLException;
29  import java.sql.Statement;
30  import java.util.Arrays;
31  import java.util.Collection;
32  import java.util.Optional;
33  import java.util.stream.Collectors;
34  
35  /**
36   * Pipeline SQL builder of openGauss.
37   */
38  public final class OpenGaussPipelineSQLBuilder implements DialectPipelineSQLBuilder {
39      
40      @Override
41      public Optional<String> buildCreateSchemaSQL(final String schemaName) {
42          return Optional.of(String.format("CREATE SCHEMA %s", schemaName));
43      }
44      
45      @Override
46      public Optional<String> buildInsertOnDuplicateClause(final DataRecord dataRecord) {
47          StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
48          PipelineSQLSegmentBuilder sqlSegmentBuilder = new PipelineSQLSegmentBuilder(getType());
49          result.append(dataRecord.getColumns().stream()
50                  .filter(each -> !each.isUniqueKey()).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + "=EXCLUDED." + sqlSegmentBuilder.getEscapedIdentifier(each.getName()))
51                  .collect(Collectors.joining(",")));
52          return Optional.of(result.toString());
53      }
54      
55      @Override
56      public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
57          return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
58      }
59      
60      @Override
61      public Optional<String> buildEstimatedCountSQL(final String qualifiedTableName) {
62          return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
63      }
64      
65      @Override
66      public Collection<String> buildCreateTableSQLs(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
67          try (
68                  Connection connection = dataSource.getConnection();
69                  Statement statement = connection.createStatement();
70                  ResultSet resultSet = statement.executeQuery(String.format("SELECT * FROM pg_get_tabledef('%s.%s')", schemaName, tableName))) {
71              if (resultSet.next()) {
72                  // TODO use ";" to split is not always correct
73                  return Arrays.asList(resultSet.getString("pg_get_tabledef").split(";"));
74              }
75          }
76          throw new CreateTableSQLGenerateException(tableName);
77      }
78      
79      @Override
80      public Optional<String> buildQueryCurrentPositionSQL() {
81          return Optional.of("SELECT * FROM pg_current_xlog_location()");
82      }
83      
84      @Override
85      public String getDatabaseType() {
86          return "openGauss";
87      }
88  }