View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
19  
20  import com.google.common.base.Strings;
21  import lombok.RequiredArgsConstructor;
22  import lombok.extern.slf4j.Slf4j;
23  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
25  import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLDecorator;
26  import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
27  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
28  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
29  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
30  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
31  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
32  import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
33  import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
34  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
35  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
36  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
37  import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
38  import org.apache.shardingsphere.infra.parser.SQLParserEngine;
39  
40  import javax.sql.DataSource;
41  import java.sql.Connection;
42  import java.sql.SQLException;
43  import java.sql.Statement;
44  import java.util.Collection;
45  import java.util.Collections;
46  import java.util.HashSet;
47  import java.util.List;
48  import java.util.Optional;
49  import java.util.regex.Pattern;
50  
51  /**
52   * Pipeline job data source preparer.
53   */
54  @RequiredArgsConstructor
55  @Slf4j
56  public final class PipelineJobDataSourcePreparer {
57      
58      private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE);
59      
60      private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
61      
62      private final DatabaseType databaseType;
63      
64      /**
65       * Prepare target schemas.
66       *
67       * @param param prepare target schemas parameter
68       * @throws SQLException if prepare target schema fail
69       */
70      public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException {
71          DatabaseType targetDatabaseType = param.getTargetDatabaseType();
72          DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
73          if (!dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()) {
74              return;
75          }
76          String defaultSchema = dialectDatabaseMetaData.getSchemaOption().getDefaultSchema().orElse(null);
77          PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(targetDatabaseType);
78          Collection<String> createdSchemaNames = new HashSet<>(param.getCreateTableConfigurations().size(), 1F);
79          for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
80              String targetSchemaName = each.getTargetName().getSchemaName();
81              if (null == targetSchemaName || targetSchemaName.equalsIgnoreCase(defaultSchema) || createdSchemaNames.contains(targetSchemaName)) {
82                  continue;
83              }
84              Optional<String> sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
85              if (sql.isPresent()) {
86                  executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get());
87                  createdSchemaNames.add(targetSchemaName);
88              }
89          }
90      }
91      
92      private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager,
93                                       final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
94          log.info("Prepare target schemas SQL: {}", sql);
95          try (
96                  Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
97                  Statement statement = connection.createStatement()) {
98              statement.execute(sql);
99          } catch (final SQLException ex) {
100             if (DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType)
101                     .map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true)) {
102                 throw ex;
103             }
104             log.warn("Create schema failed", ex);
105         }
106     }
107     
108     /**
109      * Prepare target tables.
110      *
111      * @param param prepare target tables parameter
112      * @throws SQLException SQL exception
113      */
114     public void prepareTargetTables(final PrepareTargetTablesParameter param) throws SQLException {
115         final long startTimeMillis = System.currentTimeMillis();
116         PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
117         for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
118             try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
119                 List<String> createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager);
120                 for (String sql : createTargetTableSQL) {
121                     ShardingSphereMetaData metaData = ((ShardingSphereConnection) targetConnection).getContextManager().getMetaDataContexts().getMetaData();
122                     Optional<String> decoratedSQL = decorateTargetTableSQL(each, param.getSqlParserEngine(), metaData, param.getTargetDatabaseName(), sql);
123                     if (decoratedSQL.isPresent()) {
124                         executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(decoratedSQL.get()));
125                     }
126                 }
127             }
128         }
129         log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis);
130     }
131     
132     private List<String> getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, final PipelineDataSourceManager dataSourceManager) throws SQLException {
133         DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType();
134         DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
135         String schemaName = createTableConfig.getSourceName().getSchemaName();
136         String sourceTableName = createTableConfig.getSourceName().getTableName();
137         String targetTableName = createTableConfig.getTargetName().getTableName();
138         return PipelineDDLGenerator.generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName);
139     }
140     
141     private Optional<String> decorateTargetTableSQL(final CreateTableConfiguration createTableConfig, final SQLParserEngine sqlParserEngine,
142                                                     final ShardingSphereMetaData metaData, final String targetDatabaseName, final String sql) {
143         String schemaName = createTableConfig.getSourceName().getSchemaName();
144         String targetTableName = createTableConfig.getTargetName().getTableName();
145         Optional<String> decoratedSQL = new PipelineDDLDecorator(metaData).decorate(databaseType, targetDatabaseName, schemaName, targetTableName, sqlParserEngine, sql);
146         return decoratedSQL.map(String::trim).filter(trimmedSql -> !Strings.isNullOrEmpty(trimmedSql));
147     }
148     
149     private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
150         log.info("Execute target table SQL: {}", sql);
151         try (Statement statement = targetConnection.createStatement()) {
152             statement.execute(sql);
153         } catch (final SQLException ex) {
154             for (String each : DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType)
155                     .map(DialectPipelineJobDataSourcePrepareOption::getIgnoredExceptionMessages).orElse(Collections.emptyList())) {
156                 if (ex.getMessage().contains(each)) {
157                     return;
158                 }
159             }
160             throw ex;
161         }
162     }
163     
164     private String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
165         return PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find() ? createTableSQL : PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
166     }
167 }