1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.netty;
19
20 import com.google.common.hash.Hashing;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.util.AttributeKey;
26 import lombok.extern.slf4j.Slf4j;
27 import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
28 import org.apache.shardingsphere.authority.rule.AuthorityRule;
29 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
30 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
31 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginFailedException;
32 import org.apache.shardingsphere.data.pipeline.cdc.exception.EmptyCDCLoginRequestBodyException;
33 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
34 import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
35 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
36 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
37 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
38 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
39 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
40 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
41 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
42 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
43 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
44 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
45 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
46 import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
47 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
48 import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
49 import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
50 import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
51 import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.database.UnknownDatabaseException;
52 import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.MissingRequiredRuleException;
53 import org.apache.shardingsphere.infra.exception.mysql.exception.AccessDeniedException;
54 import org.apache.shardingsphere.infra.metadata.user.Grantee;
55 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
56 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
57 import org.apache.shardingsphere.proxy.frontend.protocol.FrontDatabaseProtocolTypeFactory;
58
59 import java.net.InetSocketAddress;
60 import java.net.SocketAddress;
61 import java.sql.SQLException;
62 import java.util.Objects;
63 import java.util.Optional;
64
65
66
67
68 @Slf4j
69 public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {
70
71 private static final AttributeKey<CDCConnectionContext> CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
72
73 private final CDCBackendHandler backendHandler = new CDCBackendHandler();
74
75 @Override
76 public void channelActive(final ChannelHandlerContext ctx) {
77 CDCResponse response = CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1").build())
78 .setStatus(Status.SUCCEED).build();
79 ctx.writeAndFlush(response);
80 }
81
82 @Override
83 public void channelInactive(final ChannelHandlerContext ctx) {
84 CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
85 if (null != connectionContext && null != connectionContext.getJobId()) {
86 backendHandler.stopStreaming(connectionContext.getJobId(), ctx.channel().id());
87 }
88 ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
89 }
90
91 @Override
92 public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
93 log.error("caught CDC resolution error", cause);
94
95 ChannelFuture channelFuture;
96 if (cause instanceof CDCExceptionWrapper) {
97 CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause;
98 SQLException sqlException = SQLExceptionTransformEngine.toSQLException(wrapper.getCause(), FrontDatabaseProtocolTypeFactory.getDatabaseType());
99 channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed(wrapper.getRequestId(), sqlException.getSQLState(), sqlException.getMessage()));
100 } else {
101 channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
102 }
103 CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
104 if (null == connectionContext) {
105 channelFuture.addListener(ChannelFutureListener.CLOSE);
106 }
107 }
108
109 @Override
110 public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
111 CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
112 CDCRequest request = (CDCRequest) msg;
113 if (null == connectionContext || request.hasLoginRequestBody()) {
114 processLogin(ctx, request);
115 return;
116 }
117 switch (request.getType()) {
118 case STREAM_DATA:
119 processStreamDataRequest(ctx, request, connectionContext);
120 break;
121 case ACK_STREAMING:
122 processAckStreamingRequest(request);
123 break;
124 case STOP_STREAMING:
125 processStopStreamingRequest(ctx, request, connectionContext);
126 break;
127 case START_STREAMING:
128 processStartStreamingRequest(ctx, request, connectionContext);
129 break;
130 case DROP_STREAMING:
131 processDropStreamingRequest(ctx, request, connectionContext);
132 break;
133 default:
134 log.warn("can't handle this type of request {}", request);
135 break;
136 }
137 }
138
139 private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request) {
140 ShardingSpherePreconditions.checkState(request.hasLoginRequestBody() && request.getLoginRequestBody().hasBasicBody(),
141 () -> new CDCExceptionWrapper(request.getRequestId(), new EmptyCDCLoginRequestBodyException()));
142 BasicBody body = request.getLoginRequestBody().getBasicBody();
143 AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
144 Optional<ShardingSphereUser> user = authorityRule.findUser(new Grantee(body.getUsername(), getHostAddress(ctx)));
145 if (user.isPresent() && Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(), body.getPassword())) {
146 ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new CDCConnectionContext(user.get()));
147 ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
148 } else {
149 throw new CDCExceptionWrapper(request.getRequestId(), new CDCLoginFailedException());
150 }
151 }
152
153 private void checkPrivileges(final String requestId, final Grantee grantee, final String currentDatabase) {
154 AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class)
155 .orElseThrow(() -> new CDCExceptionWrapper(requestId, new MissingRequiredRuleException("authority")));
156 ShardingSpherePrivileges privileges = authorityRule.findPrivileges(grantee)
157 .orElseThrow(() -> new CDCExceptionWrapper(requestId, new AccessDeniedException(grantee.getUsername(), grantee.getHostname(), false)));
158 ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase),
159 () -> new CDCExceptionWrapper(requestId, new UnknownDatabaseException(currentDatabase)));
160 }
161
162 private String getHostAddress(final ChannelHandlerContext context) {
163 SocketAddress socketAddress = context.channel().remoteAddress();
164 return socketAddress instanceof InetSocketAddress ? ((InetSocketAddress) socketAddress).getAddress().getHostAddress() : socketAddress.toString();
165 }
166
167 private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
168 if (!request.hasStreamDataRequestBody()) {
169 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Stream data request body is empty"));
170 }
171 StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
172 if (requestBody.getDatabase().isEmpty()) {
173 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Database is empty"));
174 }
175 if (requestBody.getSourceSchemaTableList().isEmpty()) {
176 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Source schema table is empty"));
177 }
178 checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase());
179 try {
180 CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel());
181 ctx.writeAndFlush(response);
182 } catch (final PipelineSQLException ex) {
183 throw new CDCExceptionWrapper(request.getRequestId(), ex);
184 }
185 }
186
187 private void processAckStreamingRequest(final CDCRequest request) {
188 if (!request.hasAckStreamingRequestBody()) {
189 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request body is empty"));
190 }
191 AckStreamingRequestBody requestBody = request.getAckStreamingRequestBody();
192 if (requestBody.getAckId().isEmpty()) {
193 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request is empty"));
194 }
195 backendHandler.processAck(requestBody);
196 }
197
198 private void processStartStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
199 if (!request.hasStartStreamingRequestBody()) {
200 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Start streaming request body is empty"));
201 }
202 StartStreamingRequestBody requestBody = request.getStartStreamingRequestBody();
203 if (requestBody.getStreamingId().isEmpty()) {
204 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Streaming id is empty"));
205 }
206 String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
207 checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
208 backendHandler.startStreaming(requestBody.getStreamingId(), connectionContext, ctx.channel());
209 ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
210 }
211
212 private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
213 StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
214 String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
215 checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
216 backendHandler.stopStreaming(requestBody.getStreamingId(), ctx.channel().id());
217 connectionContext.setJobId(null);
218 ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
219 }
220
221 private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
222 DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
223 checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
224 backendHandler.dropStreaming(requestBody.getStreamingId());
225 connectionContext.setJobId(null);
226 ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
227 }
228 }