1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.frontend;
19
20 import io.netty.bootstrap.ServerBootstrap;
21 import io.netty.buffer.PooledByteBufAllocator;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.WriteBufferWaterMark;
26 import io.netty.channel.epoll.Epoll;
27 import io.netty.channel.epoll.EpollEventLoopGroup;
28 import io.netty.channel.epoll.EpollServerSocketChannel;
29 import io.netty.channel.nio.NioEventLoopGroup;
30 import io.netty.channel.socket.nio.NioServerSocketChannel;
31 import io.netty.handler.logging.LogLevel;
32 import io.netty.handler.logging.LoggingHandler;
33 import lombok.RequiredArgsConstructor;
34 import org.apache.shardingsphere.proxy.frontend.netty.CDCServerHandlerInitializer;
35
36 import java.util.ArrayList;
37 import java.util.List;
38
39
40
41
42 @RequiredArgsConstructor
43 public final class CDCServer extends Thread {
44
45 private final List<String> addressed;
46
47 private final int port;
48
49 private EventLoopGroup bossGroup;
50
51 private EventLoopGroup workerGroup;
52
53 @Override
54 public void run() {
55 try {
56 List<ChannelFuture> futures = startInternal(addressed, port);
57 for (ChannelFuture each : futures) {
58 each.channel().closeFuture().sync();
59 }
60 } catch (final InterruptedException ignored) {
61 Thread.currentThread().interrupt();
62 } finally {
63 close();
64 }
65 }
66
67 private List<ChannelFuture> startInternal(final List<String> addresses, final int port) throws InterruptedException {
68 createEventLoopGroup();
69 ServerBootstrap bootstrap = new ServerBootstrap();
70 bootstrap.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
71 .group(bossGroup, workerGroup)
72 .option(ChannelOption.SO_REUSEADDR, true)
73 .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
74 .childOption(ChannelOption.SO_KEEPALIVE, true)
75 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
76 .childOption(ChannelOption.TCP_NODELAY, true)
77 .handler(new LoggingHandler(LogLevel.INFO))
78 .childHandler(new CDCServerHandlerInitializer());
79 List<ChannelFuture> result = new ArrayList<>();
80 for (String each : addresses) {
81 result.add(bootstrap.bind(each, port).sync());
82 }
83 return result;
84 }
85
86 private void createEventLoopGroup() {
87 bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
88 workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
89 }
90
91 private void close() {
92 if (null != bossGroup) {
93 bossGroup.shutdownGracefully();
94 }
95 if (null != workerGroup) {
96 workerGroup.shutdownGracefully();
97 }
98 }
99 }