1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
19
20 import com.google.common.base.Splitter;
21 import lombok.RequiredArgsConstructor;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
25 import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
26 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
27 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
28 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
29 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
30 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
31 import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
32 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
33 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
34 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
35
36 import javax.sql.DataSource;
37 import java.sql.Connection;
38 import java.sql.SQLException;
39 import java.sql.Statement;
40 import java.util.Collection;
41 import java.util.HashSet;
42 import java.util.Optional;
43 import java.util.regex.Pattern;
44
45
46
47
48 @RequiredArgsConstructor
49 @Slf4j
50 public final class PipelineJobDataSourcePreparer {
51
52 private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE);
53
54 private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
55
56 private final DialectPipelineJobDataSourcePrepareOption option;
57
58
59
60
61
62
63
64 public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException {
65 DatabaseType targetDatabaseType = param.getTargetDatabaseType();
66 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
67 if (!dialectDatabaseMetaData.isSchemaAvailable()) {
68 return;
69 }
70 String defaultSchema = dialectDatabaseMetaData.getDefaultSchema().orElse(null);
71 PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(targetDatabaseType);
72 Collection<String> createdSchemaNames = new HashSet<>(param.getCreateTableConfigurations().size(), 1F);
73 for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
74 String targetSchemaName = each.getTargetName().getSchemaName().toString();
75 if (null == targetSchemaName || targetSchemaName.equalsIgnoreCase(defaultSchema) || createdSchemaNames.contains(targetSchemaName)) {
76 continue;
77 }
78 Optional<String> sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
79 if (sql.isPresent()) {
80 executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get());
81 createdSchemaNames.add(targetSchemaName);
82 }
83 }
84 }
85
86 private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
87 log.info("Prepare target schemas SQL: {}", sql);
88 try (
89 Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
90 Statement statement = connection.createStatement()) {
91 statement.execute(sql);
92 } catch (final SQLException ex) {
93 if (option.isSupportIfNotExistsOnCreateSchema()) {
94 throw ex;
95 }
96 log.warn("Create schema failed", ex);
97 }
98 }
99
100
101
102
103
104
105
106 public void prepareTargetTables(final PrepareTargetTablesParameter param) throws SQLException {
107 final long startTimeMillis = System.currentTimeMillis();
108 PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
109 for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
110 String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine());
111 try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
112 for (String sql : Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL)) {
113 executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql));
114 }
115 }
116 }
117 log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis);
118 }
119
120 private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
121 log.info("Execute target table SQL: {}", sql);
122 try (Statement statement = targetConnection.createStatement()) {
123 statement.execute(sql);
124 } catch (final SQLException ex) {
125 for (String each : option.getIgnoredExceptionMessages()) {
126 if (ex.getMessage().contains(each)) {
127 return;
128 }
129 }
130 throw ex;
131 }
132 }
133
134 private String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
135 return PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find() ? createTableSQL : PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
136 }
137
138 private String getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig,
139 final PipelineDataSourceManager dataSourceManager, final SQLParserEngine sqlParserEngine) throws SQLException {
140 DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType();
141 DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
142 String schemaName = createTableConfig.getSourceName().getSchemaName().toString();
143 String sourceTableName = createTableConfig.getSourceName().getTableName().toString();
144 String targetTableName = createTableConfig.getTargetName().getTableName().toString();
145 return new PipelineDDLGenerator().generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine);
146 }
147 }