【Socket / Netty】Netty TCP Server & Client
Simple TCP Server
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class SimpleTcpServer {
/** CPU 线程数 */
protected static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/** 积压量 */
protected volatile int backlog = ~(Byte.MIN_VALUE << 1);
/** 监听端口 */
protected volatile int port = Short.MAX_VALUE;
/** 关闭标识 */
protected volatile boolean closed = false;
/** {@link io.netty.channel.EventLoopGroup} bossGroup */
protected volatile EventLoopGroup bossGroup;
/** {@link io.netty.channel.EventLoopGroup} workerGroup */
protected volatile EventLoopGroup workerGroup;
/** {@link io.netty.bootstrap.ServerBootstrap} */
protected volatile ServerBootstrap bootstrap;
/**
* Create a tcp server.
*
* @param port 监听端口
*/
public SimpleTcpServer(int port) {
this.port = port;
}
/**
* Create a tcp server.
*
* @param port 监听端口
* @param backlog 积压量
*/
public SimpleTcpServer(int port, int backlog) {
this.port = port;
this.backlog = backlog;
}
/**
* Stop the tcp server.
*/
public void stop() {
this.closed = true;
Optional.ofNullable(this.bossGroup).ifPresent(x -> x.shutdownGracefully());
Optional.ofNullable(this.workerGroup).ifPresent(x -> x.shutdownGracefully());
}
/**
* Start the tcp server.
*
* @param consumer
* @throws Exception
*/
public void start(Consumer<SocketChannel> consumer) throws Exception {
this.closed = false;
this.bossGroup = new NioEventLoopGroup(CPU_COUNT + 1);
this.workerGroup = new NioEventLoopGroup();
this.bootstrap = new ServerBootstrap();
this.bootstrap.group(this.bossGroup, this.workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, this.backlog)
.option(ChannelOption.SO_REUSEADDR, true)
.option(EpollChannelOption.SO_REUSEPORT, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, 0)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// ch.config().setAllocator(PooledByteBufAllocator.DEFAULT);
// 断线处理
ch.pipeline().addFirst(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
});
// 心跳处理
ch.pipeline().addFirst(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
// 业务处理
Optional.ofNullable(consumer).ifPresent(x -> x.accept(ch));
};
});
doBind();
}
/**
*
*/
protected void doBind() {
if (this.closed) {
return;
}
this.bootstrap.bind(this.port).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.channel().eventLoop().schedule(() -> doBind(), 1, TimeUnit.SECONDS);
}
}
});
}
}
Simple TCP Client
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
public class SimpleTcpClient {
/** 连接地址 */
protected volatile String addr = "127.0.0.1";
/** 连接端口 */
protected volatile int port = Short.MAX_VALUE;
/** 关闭标识 */
protected volatile boolean closed = false;
/** {@link io.netty.channel.EventLoopGroup} workerGroup */
protected volatile EventLoopGroup workerGroup;
/** {@link io.netty.bootstrap.Bootstrap} */
protected volatile Bootstrap bootstrap;
/** {@link io.netty.channel.Channel} */
protected volatile Channel channel;
/**
* Create a tcp client.
*
* @param addr IP 地址
* @param port 端口号
*/
public SimpleTcpClient(String addr, int port) {
this.addr = addr;
this.port = port;
}
/**
* Close the tcp client.
*/
public void stop() {
this.closed = true;
Optional.ofNullable(this.channel).ifPresent(x -> x.close());
Optional.ofNullable(this.workerGroup).ifPresent(x -> x.shutdownGracefully());
}
/**
* Connect to tcp server.
*
* @param consumer
* @throws Exception
*/
public void start(Consumer<SocketChannel> consumer) throws Exception {
this.closed = false;
this.workerGroup = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();
this.bootstrap.group(this.workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 断线重连
ch.pipeline().addFirst(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
});
// 心跳处理
ch.pipeline().addFirst(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
// 业务处理
Optional.ofNullable(consumer).ifPresent(x -> x.accept(ch));
}
});
doConnect();
}
/**
*
*/
protected void doConnect() {
if (this.closed) {
return;
}
this.bootstrap.connect(this.addr, this.port).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
} else {
channel = future.channel();
}
}
});
}
/**
*
* @param data
*/
public void sent(Object data) {
Optional.ofNullable(this.channel)
.filter(x -> x.isActive() && x.isWritable())
.ifPresent(x -> x.writeAndFlush(data));
}
}
赞 (0)