View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.netty;
19  
20  import com.google.common.hash.Hashing;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.util.AttributeKey;
26  import lombok.extern.slf4j.Slf4j;
27  import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
28  import org.apache.shardingsphere.authority.rule.AuthorityRule;
29  import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
30  import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
31  import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginFailedException;
32  import org.apache.shardingsphere.data.pipeline.cdc.exception.EmptyCDCLoginRequestBodyException;
33  import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
34  import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
35  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
36  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
37  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
38  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
39  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
40  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
41  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
42  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
43  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
44  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
45  import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
46  import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
47  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
48  import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
49  import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
50  import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
51  import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.database.UnknownDatabaseException;
52  import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.MissingRequiredRuleException;
53  import org.apache.shardingsphere.infra.exception.dialect.exception.connection.AccessDeniedException;
54  import org.apache.shardingsphere.infra.metadata.user.Grantee;
55  import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
56  import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
57  import org.apache.shardingsphere.proxy.frontend.protocol.FrontDatabaseProtocolTypeFactory;
58  
59  import java.net.InetSocketAddress;
60  import java.net.SocketAddress;
61  import java.sql.SQLException;
62  import java.util.Objects;
63  
64  /**
65   * CDC channel inbound handler.
66   */
67  @Slf4j
68  public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {
69      
70      private static final AttributeKey<CDCConnectionContext> CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
71      
72      private final CDCBackendHandler backendHandler = new CDCBackendHandler();
73      
74      @Override
75      public void channelActive(final ChannelHandlerContext ctx) {
76          CDCResponse response = CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1").build())
77                  .setStatus(Status.SUCCEED).build();
78          ctx.writeAndFlush(response);
79      }
80      
81      @Override
82      public void channelInactive(final ChannelHandlerContext ctx) {
83          CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
84          if (null != connectionContext && null != connectionContext.getJobId()) {
85              backendHandler.stopStreaming(connectionContext.getJobId(), ctx.channel().id());
86          }
87          ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
88      }
89      
90      @Override
91      public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
92          log.error("caught CDC resolution error", cause);
93          // TODO add CDC exception to wrapper this exception, and add the parameters requestId and whether to close connect
94          ChannelFuture channelFuture;
95          if (cause instanceof CDCExceptionWrapper) {
96              CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause;
97              SQLException sqlException = SQLExceptionTransformEngine.toSQLException(wrapper.getCause(), FrontDatabaseProtocolTypeFactory.getDatabaseType());
98              channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed(wrapper.getRequestId(), sqlException.getSQLState(), sqlException.getMessage()));
99          } else {
100             channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
101         }
102         CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
103         if (null == connectionContext) {
104             channelFuture.addListener(ChannelFutureListener.CLOSE);
105         }
106     }
107     
108     @Override
109     public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
110         CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
111         CDCRequest request = (CDCRequest) msg;
112         if (null == connectionContext || request.hasLoginRequestBody()) {
113             processLogin(ctx, request);
114             return;
115         }
116         switch (request.getType()) {
117             case STREAM_DATA:
118                 processStreamDataRequest(ctx, request, connectionContext);
119                 break;
120             case ACK_STREAMING:
121                 processAckStreamingRequest(request);
122                 break;
123             case STOP_STREAMING:
124                 processStopStreamingRequest(ctx, request, connectionContext);
125                 break;
126             case START_STREAMING:
127                 processStartStreamingRequest(ctx, request, connectionContext);
128                 break;
129             case DROP_STREAMING:
130                 processDropStreamingRequest(ctx, request, connectionContext);
131                 break;
132             default:
133                 log.warn("can't handle this type of request {}", request);
134                 break;
135         }
136     }
137     
138     private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request) {
139         ShardingSpherePreconditions.checkState(request.hasLoginRequestBody() && request.getLoginRequestBody().hasBasicBody(),
140                 () -> new CDCExceptionWrapper(request.getRequestId(), new EmptyCDCLoginRequestBodyException()));
141         BasicBody body = request.getLoginRequestBody().getBasicBody();
142         AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
143         ShardingSphereUser user = authorityRule.findUser(new Grantee(body.getUsername(), getHostAddress(ctx)))
144                 .orElseThrow(() -> new CDCExceptionWrapper(request.getRequestId(), new CDCLoginFailedException()));
145         ShardingSpherePreconditions.checkState(Objects.equals(Hashing.sha256().hashBytes(user.getPassword().getBytes()).toString().toUpperCase(), body.getPassword()),
146                 () -> new CDCExceptionWrapper(request.getRequestId(), new CDCLoginFailedException()));
147         ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new CDCConnectionContext(user));
148         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
149     }
150     
151     private void checkPrivileges(final String requestId, final Grantee grantee, final String currentDatabase) {
152         AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class)
153                 .orElseThrow(() -> new CDCExceptionWrapper(requestId, new MissingRequiredRuleException("authority")));
154         ShardingSpherePrivileges privileges = authorityRule.findPrivileges(grantee)
155                 .orElseThrow(() -> new CDCExceptionWrapper(requestId, new AccessDeniedException(grantee.getUsername(), grantee.getHostname(), false)));
156         ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase),
157                 () -> new CDCExceptionWrapper(requestId, new UnknownDatabaseException(currentDatabase)));
158     }
159     
160     private String getHostAddress(final ChannelHandlerContext context) {
161         SocketAddress socketAddress = context.channel().remoteAddress();
162         return socketAddress instanceof InetSocketAddress ? ((InetSocketAddress) socketAddress).getAddress().getHostAddress() : socketAddress.toString();
163     }
164     
165     private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
166         if (!request.hasStreamDataRequestBody()) {
167             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Stream data request body is empty"));
168         }
169         StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
170         if (requestBody.getDatabase().isEmpty()) {
171             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Database is empty"));
172         }
173         if (requestBody.getSourceSchemaTableList().isEmpty()) {
174             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Source schema table is empty"));
175         }
176         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase());
177         try {
178             CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel());
179             ctx.writeAndFlush(response);
180         } catch (final PipelineSQLException ex) {
181             throw new CDCExceptionWrapper(request.getRequestId(), ex);
182         }
183     }
184     
185     private void processAckStreamingRequest(final CDCRequest request) {
186         if (!request.hasAckStreamingRequestBody()) {
187             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request body is empty"));
188         }
189         AckStreamingRequestBody requestBody = request.getAckStreamingRequestBody();
190         if (requestBody.getAckId().isEmpty()) {
191             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request is empty"));
192         }
193         backendHandler.processAck(requestBody);
194     }
195     
196     private void processStartStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
197         if (!request.hasStartStreamingRequestBody()) {
198             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Start streaming request body is empty"));
199         }
200         StartStreamingRequestBody requestBody = request.getStartStreamingRequestBody();
201         if (requestBody.getStreamingId().isEmpty()) {
202             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Streaming id is empty"));
203         }
204         String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
205         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
206         backendHandler.startStreaming(requestBody.getStreamingId(), connectionContext, ctx.channel());
207         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
208     }
209     
210     private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
211         StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
212         String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
213         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
214         backendHandler.stopStreaming(requestBody.getStreamingId(), ctx.channel().id());
215         connectionContext.setJobId(null);
216         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
217     }
218     
219     private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
220         DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
221         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
222         backendHandler.dropStreaming(requestBody.getStreamingId());
223         connectionContext.setJobId(null);
224         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
225     }
226 }