1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.broadcast.route;
19
20 import org.apache.shardingsphere.broadcast.constant.BroadcastOrder;
21 import org.apache.shardingsphere.broadcast.route.engine.BroadcastRouteEngineFactory;
22 import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
23 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
24 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
25 import org.apache.shardingsphere.infra.binder.context.statement.ddl.CloseStatementContext;
26 import org.apache.shardingsphere.infra.binder.context.type.CursorAvailable;
27 import org.apache.shardingsphere.infra.binder.context.type.IndexAvailable;
28 import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
29 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
30 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
31 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
32 import org.apache.shardingsphere.infra.route.SQLRouter;
33 import org.apache.shardingsphere.infra.route.context.RouteContext;
34 import org.apache.shardingsphere.infra.route.context.RouteMapper;
35 import org.apache.shardingsphere.infra.route.context.RouteUnit;
36 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
37 import org.apache.shardingsphere.infra.session.query.QueryContext;
38 import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
39 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
40 import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
41 import org.apache.shardingsphere.sql.parser.sql.common.statement.dcl.DCLStatement;
42 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.AlterFunctionStatement;
43 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.AlterProcedureStatement;
44 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.AlterTablespaceStatement;
45 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.CreateFunctionStatement;
46 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.CreateProcedureStatement;
47 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.CreateTablespaceStatement;
48 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
49 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DropFunctionStatement;
50 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DropProcedureStatement;
51 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DropTablespaceStatement;
52 import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
53 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLCreateResourceGroupStatement;
54 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dal.MySQLSetResourceGroupStatement;
55
56 import java.util.Collection;
57 import java.util.Collections;
58 import java.util.LinkedHashSet;
59
60
61
62
63 @HighFrequencyInvocation
64 public final class BroadcastSQLRouter implements SQLRouter<BroadcastRule> {
65
66 @Override
67 public RouteContext createRouteContext(final QueryContext queryContext, final RuleMetaData globalRuleMetaData, final ShardingSphereDatabase database,
68 final BroadcastRule rule, final ConfigurationProperties props, final ConnectionContext connectionContext) {
69 RouteContext result = new RouteContext();
70 BroadcastRouteEngineFactory.newInstance(rule, database, queryContext, connectionContext).route(result, rule);
71 return result;
72 }
73
74 @Override
75 public void decorateRouteContext(final RouteContext routeContext, final QueryContext queryContext, final ShardingSphereDatabase database, final BroadcastRule broadcastRule,
76 final ConfigurationProperties props, final ConnectionContext connectionContext) {
77 SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
78 SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
79 if (sqlStatement instanceof TCLStatement) {
80 routeToAllDatabase(routeContext, broadcastRule);
81 }
82 if (sqlStatement instanceof DDLStatement) {
83 decorateRouteContextWhenDDLStatement(routeContext, queryContext, database, broadcastRule);
84 }
85 if (sqlStatement instanceof DALStatement && isResourceGroupStatement(sqlStatement)) {
86 routeToAllDatabaseInstance(routeContext, database, broadcastRule);
87 }
88 if (sqlStatement instanceof DCLStatement && !isDCLForSingleTable(queryContext.getSqlStatementContext())) {
89 routeToAllDatabaseInstance(routeContext, database, broadcastRule);
90 }
91 }
92
93 private void decorateRouteContextWhenDDLStatement(final RouteContext routeContext, final QueryContext queryContext, final ShardingSphereDatabase database, final BroadcastRule broadcastRule) {
94 SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext();
95 if (sqlStatementContext instanceof CursorAvailable) {
96 if (sqlStatementContext instanceof CloseStatementContext && ((CloseStatementContext) sqlStatementContext).getSqlStatement().isCloseAll()) {
97 routeToAllDatabase(routeContext, broadcastRule);
98 }
99 return;
100 }
101 if (sqlStatementContext instanceof IndexAvailable && !routeContext.getRouteUnits().isEmpty()) {
102 putAllBroadcastTables(routeContext, broadcastRule, sqlStatementContext);
103 }
104 SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
105 boolean functionStatement = sqlStatement instanceof CreateFunctionStatement || sqlStatement instanceof AlterFunctionStatement || sqlStatement instanceof DropFunctionStatement;
106 boolean procedureStatement = sqlStatement instanceof CreateProcedureStatement || sqlStatement instanceof AlterProcedureStatement || sqlStatement instanceof DropProcedureStatement;
107 if (functionStatement || procedureStatement) {
108 routeToAllDatabase(routeContext, broadcastRule);
109 return;
110 }
111 if (sqlStatement instanceof CreateTablespaceStatement || sqlStatement instanceof AlterTablespaceStatement || sqlStatement instanceof DropTablespaceStatement) {
112 if (broadcastRule.isAllBroadcastTables(sqlStatementContext.getTablesContext().getTableNames())) {
113 routeToAllDatabaseInstance(routeContext, database, broadcastRule);
114 }
115 return;
116 }
117 Collection<String> tableNames = sqlStatementContext instanceof TableAvailable ? getTableNames((TableAvailable) sqlStatementContext) : sqlStatementContext.getTablesContext().getTableNames();
118 if (broadcastRule.isAllBroadcastTables(tableNames)) {
119 routeToAllDatabaseInstance(routeContext, database, broadcastRule);
120 }
121 }
122
123 private Collection<String> getTableNames(final TableAvailable sqlStatementContext) {
124 Collection<String> result = new LinkedHashSet<>();
125 for (SimpleTableSegment each : sqlStatementContext.getAllTables()) {
126 result.add(each.getTableName().getIdentifier().getValue());
127 }
128 return result;
129 }
130
131 private void putAllBroadcastTables(final RouteContext routeContext, final BroadcastRule broadcastRule, final SQLStatementContext sqlStatementContext) {
132 Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
133 for (String each : broadcastRule.getBroadcastRuleTableNames(tableNames)) {
134 for (RouteUnit routeUnit : routeContext.getRouteUnits()) {
135 routeUnit.getTableMappers().add(new RouteMapper(each, each));
136 }
137 }
138 }
139
140 private boolean isResourceGroupStatement(final SQLStatement sqlStatement) {
141
142 return sqlStatement instanceof MySQLCreateResourceGroupStatement || sqlStatement instanceof MySQLSetResourceGroupStatement;
143 }
144
145 private boolean isDCLForSingleTable(final SQLStatementContext sqlStatementContext) {
146 if (sqlStatementContext instanceof TableAvailable) {
147 TableAvailable tableSegmentsAvailable = (TableAvailable) sqlStatementContext;
148 return 1 == tableSegmentsAvailable.getAllTables().size() && !"*".equals(tableSegmentsAvailable.getAllTables().iterator().next().getTableName().getIdentifier().getValue());
149 }
150 return false;
151 }
152
153 private void routeToAllDatabaseInstance(final RouteContext routeContext, final ShardingSphereDatabase database, final BroadcastRule broadcastRule) {
154 routeContext.getRouteUnits().clear();
155 for (String each : broadcastRule.getDataSourceNames()) {
156 if (database.getResourceMetaData().getAllInstanceDataSourceNames().contains(each)) {
157 routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
158 }
159 }
160 }
161
162 private void routeToAllDatabase(final RouteContext routeContext, final BroadcastRule broadcastRule) {
163 routeContext.getRouteUnits().clear();
164 for (String each : broadcastRule.getDataSourceNames()) {
165 routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
166 }
167 }
168
169 @Override
170 public int getOrder() {
171 return BroadcastOrder.ORDER;
172 }
173
174 @Override
175 public Class<BroadcastRule> getTypeClass() {
176 return BroadcastRule.class;
177 }
178 }