1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend.netty;
19
20 import com.google.common.hash.Hashing;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.util.AttributeKey;
26 import lombok.extern.slf4j.Slf4j;
27 import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
28 import org.apache.shardingsphere.authority.rule.AuthorityRule;
29 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
30 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
31 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginFailedException;
32 import org.apache.shardingsphere.data.pipeline.cdc.exception.EmptyCDCLoginRequestBodyException;
33 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
34 import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
35 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
36 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
37 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
38 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
39 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
40 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
41 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
42 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
43 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
44 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
45 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
46 import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
47 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
48 import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
49 import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
50 import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
51 import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.database.UnknownDatabaseException;
52 import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.MissingRequiredRuleException;
53 import org.apache.shardingsphere.infra.exception.dialect.exception.connection.AccessDeniedException;
54 import org.apache.shardingsphere.infra.metadata.user.Grantee;
55 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
56 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
57 import org.apache.shardingsphere.proxy.frontend.protocol.FrontDatabaseProtocolTypeFactory;
58
59 import java.net.InetSocketAddress;
60 import java.net.SocketAddress;
61 import java.sql.SQLException;
62 import java.util.Objects;
63
64
65
66
67 @Slf4j
68 public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {
69
70 private static final AttributeKey<CDCConnectionContext> CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
71
72 private final CDCBackendHandler backendHandler = new CDCBackendHandler();
73
74 @Override
75 public void channelActive(final ChannelHandlerContext ctx) {
76 CDCResponse response = CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1").build())
77 .setStatus(Status.SUCCEED).build();
78 ctx.writeAndFlush(response);
79 }
80
81 @Override
82 public void channelInactive(final ChannelHandlerContext ctx) {
83 CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
84 if (null != connectionContext && null != connectionContext.getJobId()) {
85 backendHandler.stopStreaming(connectionContext.getJobId(), ctx.channel().id());
86 }
87 ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
88 }
89
90 @Override
91 public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
92 log.error("caught CDC resolution error", cause);
93
94 ChannelFuture channelFuture;
95 if (cause instanceof CDCExceptionWrapper) {
96 CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause;
97 SQLException sqlException = SQLExceptionTransformEngine.toSQLException(wrapper.getCause(), FrontDatabaseProtocolTypeFactory.getDatabaseType());
98 channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed(wrapper.getRequestId(), sqlException.getSQLState(), sqlException.getMessage()));
99 } else {
100 channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
101 }
102 CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
103 if (null == connectionContext) {
104 channelFuture.addListener(ChannelFutureListener.CLOSE);
105 }
106 }
107
108 @Override
109 public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
110 CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
111 CDCRequest request = (CDCRequest) msg;
112 if (null == connectionContext || request.hasLoginRequestBody()) {
113 processLogin(ctx, request);
114 return;
115 }
116 switch (request.getType()) {
117 case STREAM_DATA:
118 processStreamDataRequest(ctx, request, connectionContext);
119 break;
120 case ACK_STREAMING:
121 processAckStreamingRequest(request);
122 break;
123 case STOP_STREAMING:
124 processStopStreamingRequest(ctx, request, connectionContext);
125 break;
126 case START_STREAMING:
127 processStartStreamingRequest(ctx, request, connectionContext);
128 break;
129 case DROP_STREAMING:
130 processDropStreamingRequest(ctx, request, connectionContext);
131 break;
132 default:
133 log.warn("can't handle this type of request {}", request);
134 break;
135 }
136 }
137
138 private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request) {
139 ShardingSpherePreconditions.checkState(request.hasLoginRequestBody() && request.getLoginRequestBody().hasBasicBody(),
140 () -> new CDCExceptionWrapper(request.getRequestId(), new EmptyCDCLoginRequestBodyException()));
141 BasicBody body = request.getLoginRequestBody().getBasicBody();
142 AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
143 ShardingSphereUser user = authorityRule.findUser(new Grantee(body.getUsername(), getHostAddress(ctx)))
144 .orElseThrow(() -> new CDCExceptionWrapper(request.getRequestId(), new CDCLoginFailedException()));
145 ShardingSpherePreconditions.checkState(Objects.equals(Hashing.sha256().hashBytes(user.getPassword().getBytes()).toString().toUpperCase(), body.getPassword()),
146 () -> new CDCExceptionWrapper(request.getRequestId(), new CDCLoginFailedException()));
147 ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new CDCConnectionContext(user));
148 ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
149 }
150
151 private void checkPrivileges(final String requestId, final Grantee grantee, final String currentDatabase) {
152 AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class)
153 .orElseThrow(() -> new CDCExceptionWrapper(requestId, new MissingRequiredRuleException("authority")));
154 ShardingSpherePrivileges privileges = authorityRule.findPrivileges(grantee)
155 .orElseThrow(() -> new CDCExceptionWrapper(requestId, new AccessDeniedException(grantee.getUsername(), grantee.getHostname(), false)));
156 ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase),
157 () -> new CDCExceptionWrapper(requestId, new UnknownDatabaseException(currentDatabase)));
158 }
159
160 private String getHostAddress(final ChannelHandlerContext context) {
161 SocketAddress socketAddress = context.channel().remoteAddress();
162 return socketAddress instanceof InetSocketAddress ? ((InetSocketAddress) socketAddress).getAddress().getHostAddress() : socketAddress.toString();
163 }
164
165 private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
166 if (!request.hasStreamDataRequestBody()) {
167 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Stream data request body is empty"));
168 }
169 StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
170 if (requestBody.getDatabase().isEmpty()) {
171 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Database is empty"));
172 }
173 if (requestBody.getSourceSchemaTableList().isEmpty()) {
174 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Source schema table is empty"));
175 }
176 checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase());
177 try {
178 CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel());
179 ctx.writeAndFlush(response);
180 } catch (final PipelineSQLException ex) {
181 throw new CDCExceptionWrapper(request.getRequestId(), ex);
182 }
183 }
184
185 private void processAckStreamingRequest(final CDCRequest request) {
186 if (!request.hasAckStreamingRequestBody()) {
187 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request body is empty"));
188 }
189 AckStreamingRequestBody requestBody = request.getAckStreamingRequestBody();
190 if (requestBody.getAckId().isEmpty()) {
191 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request is empty"));
192 }
193 backendHandler.processAck(requestBody);
194 }
195
196 private void processStartStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
197 if (!request.hasStartStreamingRequestBody()) {
198 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Start streaming request body is empty"));
199 }
200 StartStreamingRequestBody requestBody = request.getStartStreamingRequestBody();
201 if (requestBody.getStreamingId().isEmpty()) {
202 throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Streaming id is empty"));
203 }
204 String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
205 checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
206 backendHandler.startStreaming(requestBody.getStreamingId(), connectionContext, ctx.channel());
207 ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
208 }
209
210 private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
211 StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
212 String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
213 checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
214 backendHandler.stopStreaming(requestBody.getStreamingId(), ctx.channel().id());
215 connectionContext.setJobId(null);
216 ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
217 }
218
219 private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
220 DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
221 checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
222 backendHandler.dropStreaming(requestBody.getStreamingId());
223 connectionContext.setJobId(null);
224 ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
225 }
226 }