1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
19
20 import org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
21 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
22 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
23 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
24
25 import javax.sql.DataSource;
26 import java.sql.Connection;
27 import java.sql.ResultSet;
28 import java.sql.SQLException;
29 import java.sql.Statement;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Optional;
33 import java.util.stream.Collectors;
34
35
36
37
38 public final class OpenGaussPipelineSQLBuilder implements DialectPipelineSQLBuilder {
39
40 @Override
41 public Optional<String> buildCreateSchemaSQL(final String schemaName) {
42 return Optional.of(String.format("CREATE SCHEMA %s", schemaName));
43 }
44
45 @Override
46 public Optional<String> buildInsertOnDuplicateClause(final DataRecord dataRecord) {
47 StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
48 PipelineSQLSegmentBuilder sqlSegmentBuilder = new PipelineSQLSegmentBuilder(getType());
49 result.append(dataRecord.getColumns().stream()
50 .filter(each -> !each.isUniqueKey()).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + "=EXCLUDED." + sqlSegmentBuilder.getEscapedIdentifier(each.getName()))
51 .collect(Collectors.joining(",")));
52 return Optional.of(result.toString());
53 }
54
55 @Override
56 public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
57 return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
58 }
59
60 @Override
61 public Optional<String> buildEstimatedCountSQL(final String qualifiedTableName) {
62 return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
63 }
64
65 @Override
66 public Collection<String> buildCreateTableSQLs(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
67 try (
68 Connection connection = dataSource.getConnection();
69 Statement statement = connection.createStatement();
70 ResultSet resultSet = statement.executeQuery(String.format("SELECT * FROM pg_get_tabledef('%s.%s')", schemaName, tableName))) {
71 if (resultSet.next()) {
72
73 return Arrays.asList(resultSet.getString("pg_get_tabledef").split(";"));
74 }
75 }
76 throw new CreateTableSQLGenerateException(tableName);
77 }
78
79 @Override
80 public Optional<String> buildQueryCurrentPositionSQL() {
81 return Optional.of("SELECT * FROM pg_current_xlog_location()");
82 }
83
84 @Override
85 public String getDatabaseType() {
86 return "openGauss";
87 }
88 }