1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.checker;
19
20 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
21 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
22 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
23 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
24 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
25 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
26 import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
27 import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
28
29 import javax.sql.DataSource;
30 import java.sql.Connection;
31 import java.sql.PreparedStatement;
32 import java.sql.ResultSet;
33 import java.sql.SQLException;
34 import java.util.Collection;
35
36
37
38
39 public final class DataSourceCheckEngine {
40
41 private final DialectDataSourceChecker checker;
42
43 private final PipelinePrepareSQLBuilder sqlBuilder;
44
45 public DataSourceCheckEngine(final DatabaseType databaseType) {
46 checker = DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class, databaseType).orElse(null);
47 sqlBuilder = new PipelinePrepareSQLBuilder(databaseType);
48 }
49
50
51
52
53
54
55
56 public void checkConnection(final Collection<DataSource> dataSources) {
57 try {
58 for (DataSource each : dataSources) {
59 each.getConnection().close();
60 }
61 } catch (final SQLException ex) {
62 throw new SQLWrapperException(ex);
63 }
64 }
65
66
67
68
69
70
71 public void checkSourceDataSources(final Collection<DataSource> dataSources) {
72 checkConnection(dataSources);
73 if (null == checker) {
74 return;
75 }
76 dataSources.forEach(checker::checkPrivilege);
77 dataSources.forEach(checker::checkVariable);
78 }
79
80
81
82
83
84
85
86 public void checkTargetDataSources(final Collection<DataSource> dataSources, final ImporterConfiguration importerConfig) {
87 checkConnection(dataSources);
88 checkEmptyTable(dataSources, importerConfig);
89 }
90
91 private void checkEmptyTable(final Collection<DataSource> dataSources, final ImporterConfiguration importerConfig) {
92 try {
93 for (DataSource each : dataSources) {
94 for (CaseInsensitiveQualifiedTable qualifiedTable : importerConfig.getQualifiedTables()) {
95 ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable), () -> new PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName().toString()));
96 }
97 }
98 } catch (final SQLException ex) {
99 throw new SQLWrapperException(ex);
100 }
101 }
102
103
104
105
106
107
108
109
110
111 public boolean checkEmptyTable(final DataSource dataSource, final CaseInsensitiveQualifiedTable qualifiedTable) throws SQLException {
112 String sql = sqlBuilder.buildCheckEmptyTableSQL(qualifiedTable.getSchemaName().toString(), qualifiedTable.getTableName().toString());
113 try (
114 Connection connection = dataSource.getConnection();
115 PreparedStatement preparedStatement = connection.prepareStatement(sql);
116 ResultSet resultSet = preparedStatement.executeQuery()) {
117 return !resultSet.next();
118 }
119 }
120 }