1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
19
20 import org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
21 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
22 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
23 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
24
25 import javax.sql.DataSource;
26 import java.sql.Connection;
27 import java.sql.ResultSet;
28 import java.sql.SQLException;
29 import java.sql.Statement;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Optional;
33 import java.util.stream.Collectors;
34
35
36
37
38 public final class OpenGaussPipelineSQLBuilder implements DialectPipelineSQLBuilder {
39
40 @Override
41 public Optional<String> buildCreateSchemaSQL(final String schemaName) {
42 return Optional.of(String.format("CREATE SCHEMA %s", schemaName));
43 }
44
45 @Override
46 public Optional<String> buildInsertOnDuplicateClause(final DataRecord dataRecord) {
47 StringBuilder result = new StringBuilder("ON DUPLICATE KEY UPDATE ");
48 PipelineSQLSegmentBuilder sqlSegmentBuilder = new PipelineSQLSegmentBuilder(getType());
49 result.append(dataRecord.getColumns().stream()
50 .filter(each -> !each.isUniqueKey()).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + "=EXCLUDED." + sqlSegmentBuilder.getEscapedIdentifier(each.getName()))
51 .collect(Collectors.joining(",")));
52 return Optional.of(result.toString());
53 }
54
55 @Override
56 public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
57 return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
58 }
59
60 @Override
61 public Optional<String> buildEstimatedCountSQL(final String catalogName, final String qualifiedTableName) {
62 return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
63 }
64
65 @Override
66 public String buildSplitByUniqueKeyRangedSubqueryClause(final String qualifiedTableName, final String uniqueKey, final boolean hasLowerBound) {
67 return hasLowerBound
68 ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
69 : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey);
70 }
71
72 @Override
73 public Collection<String> buildCreateTableSQLs(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
74 try (
75 Connection connection = dataSource.getConnection();
76 Statement statement = connection.createStatement();
77 ResultSet resultSet = statement.executeQuery(String.format("SELECT * FROM pg_get_tabledef('%s.%s')", schemaName, tableName))) {
78 if (resultSet.next()) {
79
80 Collection<String> defSQLs = Arrays.asList(resultSet.getString("pg_get_tabledef").split(";"));
81 return defSQLs.stream().map(sql -> {
82 String targetPrefix = "ALTER TABLE " + tableName;
83 if (sql.trim().startsWith(targetPrefix)) {
84 return sql.replaceFirst(targetPrefix, "ALTER TABLE " + schemaName + "." + tableName);
85 }
86 return sql;
87 }).collect(Collectors.toList());
88 }
89 }
90 throw new CreateTableSQLGenerateException(tableName);
91 }
92
93 @Override
94 public Optional<String> buildQueryCurrentPositionSQL() {
95 return Optional.of("SELECT * FROM pg_current_xlog_location()");
96 }
97
98 @Override
99 public String wrapWithPageQuery(final String sql) {
100 return sql + " LIMIT ?";
101 }
102
103 @Override
104 public String getDatabaseType() {
105 return "openGauss";
106 }
107 }