1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
19
20 import com.google.common.base.Strings;
21 import lombok.RequiredArgsConstructor;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
24 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
25 import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLDecorator;
26 import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
27 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
28 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
29 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
30 import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
31 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
32 import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
33 import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
34 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
35 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
36 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
37 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
38 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
39
40 import javax.sql.DataSource;
41 import java.sql.Connection;
42 import java.sql.SQLException;
43 import java.sql.Statement;
44 import java.util.Collection;
45 import java.util.Collections;
46 import java.util.HashSet;
47 import java.util.List;
48 import java.util.Optional;
49 import java.util.regex.Pattern;
50
51
52
53
54 @RequiredArgsConstructor
55 @Slf4j
56 public final class PipelineJobDataSourcePreparer {
57
58 private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE);
59
60 private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
61
62 private final DatabaseType databaseType;
63
64
65
66
67
68
69
70 public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException {
71 DatabaseType targetDatabaseType = param.getTargetDatabaseType();
72 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
73 if (!dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()) {
74 return;
75 }
76 String defaultSchema = dialectDatabaseMetaData.getSchemaOption().getDefaultSchema().orElse(null);
77 PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(targetDatabaseType);
78 Collection<String> createdSchemaNames = new HashSet<>(param.getCreateTableConfigurations().size(), 1F);
79 for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
80 String targetSchemaName = each.getTargetName().getSchemaName();
81 if (null == targetSchemaName || targetSchemaName.equalsIgnoreCase(defaultSchema) || createdSchemaNames.contains(targetSchemaName)) {
82 continue;
83 }
84 Optional<String> sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
85 if (sql.isPresent()) {
86 executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get());
87 createdSchemaNames.add(targetSchemaName);
88 }
89 }
90 }
91
92 private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager,
93 final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
94 log.info("Prepare target schemas SQL: {}", sql);
95 try (
96 Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
97 Statement statement = connection.createStatement()) {
98 statement.execute(sql);
99 } catch (final SQLException ex) {
100 if (DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType)
101 .map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true)) {
102 throw ex;
103 }
104 log.warn("Create schema failed", ex);
105 }
106 }
107
108
109
110
111
112
113
114 public void prepareTargetTables(final PrepareTargetTablesParameter param) throws SQLException {
115 final long startTimeMillis = System.currentTimeMillis();
116 PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
117 for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
118 try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
119 List<String> createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager);
120 for (String sql : createTargetTableSQL) {
121 ShardingSphereMetaData metaData = ((ShardingSphereConnection) targetConnection).getContextManager().getMetaDataContexts().getMetaData();
122 Optional<String> decoratedSQL = decorateTargetTableSQL(each, param.getSqlParserEngine(), metaData, param.getTargetDatabaseName(), sql);
123 if (decoratedSQL.isPresent()) {
124 executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(decoratedSQL.get()));
125 }
126 }
127 }
128 }
129 log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis);
130 }
131
132 private List<String> getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, final PipelineDataSourceManager dataSourceManager) throws SQLException {
133 DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType();
134 DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
135 String schemaName = createTableConfig.getSourceName().getSchemaName();
136 String sourceTableName = createTableConfig.getSourceName().getTableName();
137 String targetTableName = createTableConfig.getTargetName().getTableName();
138 return PipelineDDLGenerator.generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName);
139 }
140
141 private Optional<String> decorateTargetTableSQL(final CreateTableConfiguration createTableConfig, final SQLParserEngine sqlParserEngine,
142 final ShardingSphereMetaData metaData, final String targetDatabaseName, final String sql) {
143 String schemaName = createTableConfig.getSourceName().getSchemaName();
144 String targetTableName = createTableConfig.getTargetName().getTableName();
145 Optional<String> decoratedSQL = new PipelineDDLDecorator(metaData).decorate(databaseType, targetDatabaseName, schemaName, targetTableName, sqlParserEngine, sql);
146 return decoratedSQL.map(String::trim).filter(trimmedSql -> !Strings.isNullOrEmpty(trimmedSql));
147 }
148
149 private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
150 log.info("Execute target table SQL: {}", sql);
151 try (Statement statement = targetConnection.createStatement()) {
152 statement.execute(sql);
153 } catch (final SQLException ex) {
154 for (String each : DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType)
155 .map(DialectPipelineJobDataSourcePrepareOption::getIgnoredExceptionMessages).orElse(Collections.emptyList())) {
156 if (ex.getMessage().contains(each)) {
157 return;
158 }
159 }
160 throw ex;
161 }
162 }
163
164 private String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
165 return PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find() ? createTableSQL : PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
166 }
167 }