View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
19  
20  import com.google.common.base.Splitter;
21  import lombok.RequiredArgsConstructor;
22  import lombok.extern.slf4j.Slf4j;
23  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
24  import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
25  import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
26  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
27  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
28  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
29  import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
30  import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
31  import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
32  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
33  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
34  import org.apache.shardingsphere.infra.parser.SQLParserEngine;
35  
36  import javax.sql.DataSource;
37  import java.sql.Connection;
38  import java.sql.SQLException;
39  import java.sql.Statement;
40  import java.util.Collection;
41  import java.util.HashSet;
42  import java.util.Optional;
43  import java.util.regex.Pattern;
44  
45  /**
46   * Pipeline job data source preparer.
47   */
48  @RequiredArgsConstructor
49  @Slf4j
50  public final class PipelineJobDataSourcePreparer {
51      
52      private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", Pattern.CASE_INSENSITIVE);
53      
54      private static final Pattern PATTERN_CREATE_TABLE = Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
55      
56      private final DialectPipelineJobDataSourcePrepareOption option;
57      
58      /**
59       * Prepare target schemas.
60       *
61       * @param param prepare target schemas parameter
62       * @throws SQLException if prepare target schema fail
63       */
64      public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException {
65          DatabaseType targetDatabaseType = param.getTargetDatabaseType();
66          DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
67          if (!dialectDatabaseMetaData.isSchemaAvailable()) {
68              return;
69          }
70          String defaultSchema = dialectDatabaseMetaData.getDefaultSchema().orElse(null);
71          PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(targetDatabaseType);
72          Collection<String> createdSchemaNames = new HashSet<>(param.getCreateTableConfigurations().size(), 1F);
73          for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
74              String targetSchemaName = each.getTargetName().getSchemaName().toString();
75              if (null == targetSchemaName || targetSchemaName.equalsIgnoreCase(defaultSchema) || createdSchemaNames.contains(targetSchemaName)) {
76                  continue;
77              }
78              Optional<String> sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
79              if (sql.isPresent()) {
80                  executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get());
81                  createdSchemaNames.add(targetSchemaName);
82              }
83          }
84      }
85      
86      private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
87          log.info("Prepare target schemas SQL: {}", sql);
88          try (
89                  Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
90                  Statement statement = connection.createStatement()) {
91              statement.execute(sql);
92          } catch (final SQLException ex) {
93              if (option.isSupportIfNotExistsOnCreateSchema()) {
94                  throw ex;
95              }
96              log.warn("Create schema failed", ex);
97          }
98      }
99      
100     /**
101      * Prepare target tables.
102      *
103      * @param param prepare target tables parameter
104      * @throws SQLException SQL exception
105      */
106     public void prepareTargetTables(final PrepareTargetTablesParameter param) throws SQLException {
107         final long startTimeMillis = System.currentTimeMillis();
108         PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
109         for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
110             String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine());
111             try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
112                 for (String sql : Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL)) {
113                     executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql));
114                 }
115             }
116         }
117         log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis);
118     }
119     
120     private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
121         log.info("Execute target table SQL: {}", sql);
122         try (Statement statement = targetConnection.createStatement()) {
123             statement.execute(sql);
124         } catch (final SQLException ex) {
125             for (String each : option.getIgnoredExceptionMessages()) {
126                 if (ex.getMessage().contains(each)) {
127                     return;
128                 }
129             }
130             throw ex;
131         }
132     }
133     
134     private String addIfNotExistsForCreateTableSQL(final String createTableSQL) {
135         return PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find() ? createTableSQL : PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
136     }
137     
138     private String getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig,
139                                            final PipelineDataSourceManager dataSourceManager, final SQLParserEngine sqlParserEngine) throws SQLException {
140         DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType();
141         DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
142         String schemaName = createTableConfig.getSourceName().getSchemaName().toString();
143         String sourceTableName = createTableConfig.getSourceName().getTableName().toString();
144         String targetTableName = createTableConfig.getTargetName().getTableName().toString();
145         return new PipelineDDLGenerator().generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine);
146     }
147 }