1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
19
20 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
21 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
22 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
23 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
24 import org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLColumnPropertiesAppender;
25 import org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLConstraintsPropertiesAppender;
26 import org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLIndexSQLGenerator;
27 import org.apache.shardingsphere.data.pipeline.postgresql.ddlgenerator.PostgreSQLTablePropertiesLoader;
28 import org.apache.shardingsphere.data.pipeline.postgresql.util.PostgreSQLPipelineFreemarkerManager;
29
30 import javax.sql.DataSource;
31 import java.sql.Connection;
32 import java.sql.SQLException;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.Map;
36 import java.util.Optional;
37 import java.util.stream.Collectors;
38
39
40
41
42 public final class PostgreSQLPipelineSQLBuilder implements DialectPipelineSQLBuilder {
43
44 @Override
45 public Optional<String> buildCreateSchemaSQL(final String schemaName) {
46 return Optional.of(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
47 }
48
49 @Override
50 public Optional<String> buildInsertOnDuplicateClause(final DataRecord dataRecord) {
51
52 if (dataRecord.getUniqueKeyValue().isEmpty()) {
53 return Optional.empty();
54 }
55 StringBuilder result = new StringBuilder("ON CONFLICT (");
56 PipelineSQLSegmentBuilder sqlSegmentBuilder = new PipelineSQLSegmentBuilder(getType());
57 result.append(dataRecord.getColumns().stream().filter(Column::isUniqueKey).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName())).collect(Collectors.joining(",")));
58 result.append(") DO UPDATE SET ");
59 result.append(dataRecord.getColumns().stream()
60 .filter(each -> !each.isUniqueKey()).map(each -> sqlSegmentBuilder.getEscapedIdentifier(each.getName()) + "=EXCLUDED." + sqlSegmentBuilder.getEscapedIdentifier(each.getName()))
61 .collect(Collectors.joining(",")));
62 return Optional.of(result.toString());
63 }
64
65 @Override
66 public String buildCheckEmptyTableSQL(final String qualifiedTableName) {
67 return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
68 }
69
70 @Override
71 public Optional<String> buildEstimatedCountSQL(final String qualifiedTableName) {
72 return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
73 }
74
75
76 @Override
77 public Collection<String> buildCreateTableSQLs(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
78 try (Connection connection = dataSource.getConnection()) {
79 int majorVersion = connection.getMetaData().getDatabaseMajorVersion();
80 int minorVersion = connection.getMetaData().getDatabaseMinorVersion();
81 Map<String, Object> materials = loadMaterials(tableName, schemaName, connection, majorVersion, minorVersion);
82 String tableSQL = generateCreateTableSQL(majorVersion, minorVersion, materials);
83 String indexSQL = generateCreateIndexSQL(connection, majorVersion, minorVersion, materials);
84
85 return Arrays.asList((tableSQL + System.lineSeparator() + indexSQL).trim().split(";"));
86 }
87 }
88
89 private Map<String, Object> loadMaterials(final String tableName, final String schemaName, final Connection connection, final int majorVersion, final int minorVersion) throws SQLException {
90 Map<String, Object> result = new PostgreSQLTablePropertiesLoader(connection, tableName, schemaName, majorVersion, minorVersion).load();
91 new PostgreSQLColumnPropertiesAppender(connection, majorVersion, minorVersion).append(result);
92 new PostgreSQLConstraintsPropertiesAppender(connection, majorVersion, minorVersion).append(result);
93 formatColumns(result);
94 return result;
95 }
96
97 private String generateCreateTableSQL(final int majorVersion, final int minorVersion, final Map<String, Object> materials) {
98 return PostgreSQLPipelineFreemarkerManager.getSQLByVersion(materials, "component/table/%s/create.ftl", majorVersion, minorVersion).trim();
99 }
100
101 private String generateCreateIndexSQL(final Connection connection, final int majorVersion, final int minorVersion, final Map<String, Object> materials) throws SQLException {
102 return new PostgreSQLIndexSQLGenerator(connection, majorVersion, minorVersion).generate(materials);
103 }
104
105 @SuppressWarnings("unchecked")
106 private void formatColumns(final Map<String, Object> context) {
107 Collection<Map<String, Object>> columns = (Collection<Map<String, Object>>) context.get("columns");
108 for (Map<String, Object> each : columns) {
109 if (each.containsKey("cltype")) {
110 typeFormatter(each, (String) each.get("cltype"));
111 }
112 }
113 }
114
115 private void typeFormatter(final Map<String, Object> column, final String columnType) {
116 if (columnType.contains("[]")) {
117 column.put("cltype", columnType.substring(0, columnType.length() - 2));
118 column.put("hasSqrBracket", true);
119 } else {
120 column.put("hasSqrBracket", false);
121 }
122 }
123
124 @Override
125 public Optional<String> buildQueryCurrentPositionSQL() {
126 return Optional.of("SELECT * FROM pg_current_wal_lsn()");
127 }
128
129 @Override
130 public String getDatabaseType() {
131 return "PostgreSQL";
132 }
133 }