1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
19
20 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
21 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
22 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
23 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
24 import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.column.PostgreSQLColumnPropertiesAppender;
25 import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.constraints.PostgreSQLConstraintsPropertiesAppender;
26 import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.index.PostgreSQLIndexSQLGenerator;
27 import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.ddl.table.PostgreSQLTablePropertiesLoader;
28 import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.template.PostgreSQLPipelineFreemarkerManager;
29
30 import javax.sql.DataSource;
31 import java.sql.Connection;
32 import java.sql.SQLException;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.Map;
36 import java.util.Optional;
37 import java.util.stream.Collectors;
38
39
40
41
42 public final class PostgreSQLPipelineSQLBuilder implements DialectPipelineSQLBuilder {
43
44 @Override
45 public Optional<String> buildCreateSchemaSQL(final String schemaName) {
46 return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
47 }
48
49 @Override
50 public Optional<String> buildInsertOnDuplicateClause(final DataRecord dataRecord) {
51
52 if (dataRecord.getUniqueKeyValue().isEmpty()) {
53 return Optional.empty();
54 }
55 StringBuilder result = new StringBuilder("ON CONFLICT (");
56 PipelineSQLSegmentBuilder sqlSegmentBuilder = new PipelineSQLSegmentBuilder(getType());
57 result.append(dataRecord.getColumns().stream().filter(Column::isUniqueKey).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(",")));
58 result.append(") DO UPDATE SET ");
59 result.append(dataRecord.getColumns().stream()
60 .filter(each -> !each.isUniqueKey()).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + "=EXCLUDED." + sqlSegmentBuilder.getEscapedIdentifier(each.getName()))
61 .collect(Collectors.joining(",")));
62 return Optional.of(result.toString());
63 }
64
65 @Override
66 public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
67 return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
68 }
69
70 @Override
71 public Optional<String> buildEstimatedCountSQL(final String catalogName, final String qualifiedTableName) {
72 return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
73 }
74
75
76 @Override
77 public Optional<String> buildCRC32SQL(final String qualifiedTableName, final String columnName) {
78 return Optional.of(String.format("SELECT pg_catalog.pg_checksum_table('%s', true)", qualifiedTableName));
79 }
80
81 @Override
82 public String buildSplitByUniqueKeyRangedSubqueryClause(final String qualifiedTableName, final String uniqueKey, final boolean hasLowerBound) {
83 return hasLowerBound
84 ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
85 : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey);
86 }
87
88 @Override
89 public Collection<String> buildCreateTableSQLs(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
90 try (Connection connection = dataSource.getConnection()) {
91 int majorVersion = connection.getMetaData().getDatabaseMajorVersion();
92 int minorVersion = connection.getMetaData().getDatabaseMinorVersion();
93 Map<String, Object> materials = loadMaterials(tableName, schemaName, connection, majorVersion, minorVersion);
94 String tableSQL = generateCreateTableSQL(majorVersion, minorVersion, materials);
95 String indexSQL = generateCreateIndexSQL(connection, majorVersion, minorVersion, materials);
96
97 return Arrays.asList((tableSQL + System.lineSeparator() + indexSQL).trim().split(";"));
98 }
99 }
100
101 private Map<String, Object> loadMaterials(final String tableName, final String schemaName, final Connection connection, final int majorVersion, final int minorVersion) throws SQLException {
102 Map<String, Object> result = new PostgreSQLTablePropertiesLoader(connection, tableName, schemaName, majorVersion, minorVersion).load();
103 new PostgreSQLColumnPropertiesAppender(connection, majorVersion, minorVersion).append(result);
104 new PostgreSQLConstraintsPropertiesAppender(connection, majorVersion, minorVersion).append(result);
105 formatColumns(result);
106 return result;
107 }
108
109 private String generateCreateTableSQL(final int majorVersion, final int minorVersion, final Map<String, Object> materials) {
110 return PostgreSQLPipelineFreemarkerManager.getSQLByVersion(materials, "component/table/%s/create.ftl", majorVersion, minorVersion).trim();
111 }
112
113 private String generateCreateIndexSQL(final Connection connection, final int majorVersion, final int minorVersion, final Map<String, Object> materials) throws SQLException {
114 return new PostgreSQLIndexSQLGenerator(connection, majorVersion, minorVersion).generate(materials);
115 }
116
117 @SuppressWarnings("unchecked")
118 private void formatColumns(final Map<String, Object> context) {
119 Collection<Map<String, Object>> columns = (Collection<Map<String, Object>>) context.get("columns");
120 for (Map<String, Object> each : columns) {
121 if (each.containsKey("cltype")) {
122 typeFormatter(each, (String) each.get("cltype"));
123 }
124 }
125 }
126
127 private void typeFormatter(final Map<String, Object> column, final String columnType) {
128 if (columnType.contains("[]")) {
129 column.put("cltype", columnType.substring(0, columnType.length() - 2));
130 column.put("hasSqrBracket", true);
131 } else {
132 column.put("hasSqrBracket", false);
133 }
134 }
135
136 @Override
137 public Optional<String> buildQueryCurrentPositionSQL() {
138 return Optional.of("SELECT * FROM pg_current_wal_lsn()");
139 }
140
141 @Override
142 public String wrapWithPageQuery(final String sql) {
143 return sql + " LIMIT ?";
144 }
145
146 @Override
147 public String getDatabaseType() {
148 return "PostgreSQL";
149 }
150 }