【Socket / Netty】Netty TCP Server & Client

Simple TCP Server

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class SimpleTcpServer {

    /** CPU 线程数 */
    protected static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    /** 积压量 */
    protected volatile int backlog = ~(Byte.MIN_VALUE << 1);
    /** 监听端口 */
    protected volatile int port = Short.MAX_VALUE;
    /** 关闭标识 */
    protected volatile boolean closed = false;
    /** {@link io.netty.channel.EventLoopGroup} bossGroup */
    protected volatile EventLoopGroup bossGroup;
    /** {@link io.netty.channel.EventLoopGroup} workerGroup */
    protected volatile EventLoopGroup workerGroup;
    /** {@link io.netty.bootstrap.ServerBootstrap} */
    protected volatile ServerBootstrap bootstrap;

    /**
     * Create a tcp server.
     *
     * @param port      监听端口
     */
    public SimpleTcpServer(int port) {
        this.port = port;
    }

    /**
     * Create a tcp server.
     *
     * @param port          监听端口
     * @param backlog       积压量
     */
    public SimpleTcpServer(int port, int backlog) {
        this.port = port;
        this.backlog = backlog;
    }

    /**
     * Stop the tcp server.
     */
    public void stop() {
        this.closed = true;
        Optional.ofNullable(this.bossGroup).ifPresent(x -> x.shutdownGracefully());
        Optional.ofNullable(this.workerGroup).ifPresent(x -> x.shutdownGracefully());
    }

    /**
     * Start the tcp server.
     *
     * @param consumer
     * @throws Exception
     */
    public void start(Consumer<SocketChannel> consumer) throws Exception {
        this.closed = false;
        this.bossGroup = new NioEventLoopGroup(CPU_COUNT + 1);
        this.workerGroup = new NioEventLoopGroup();
        this.bootstrap = new ServerBootstrap();
        this.bootstrap.group(this.bossGroup, this.workerGroup)
                      .channel(NioServerSocketChannel.class)
                      .option(ChannelOption.SO_BACKLOG, this.backlog)
                      .option(ChannelOption.SO_REUSEADDR, true)
                      .option(EpollChannelOption.SO_REUSEPORT, true)
                      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                      .childOption(ChannelOption.SO_KEEPALIVE, true)
                      .childOption(ChannelOption.TCP_NODELAY, true)
                      .childOption(ChannelOption.SO_LINGER, 0)
                      .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                      .childHandler(new ChannelInitializer<SocketChannel>() {
                           @Override
                           public void initChannel(SocketChannel ch) throws Exception {
                               // ch.config().setAllocator(PooledByteBufAllocator.DEFAULT);
                               // 断线处理
                               ch.pipeline().addFirst(new ChannelInboundHandlerAdapter() {
                                   @Override
                                   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                       if (evt instanceof IdleStateEvent) {
                                           IdleStateEvent event = (IdleStateEvent) evt;
                                           if (event.state() == IdleState.READER_IDLE) {
                                               ctx.channel().close();
                                           }
                                       } else {
                                           super.userEventTriggered(ctx, evt);
                                       }
                                   }
                               });
                               // 心跳处理
                               ch.pipeline().addFirst(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
                               // 业务处理
                               Optional.ofNullable(consumer).ifPresent(x -> x.accept(ch));
                           };
                      });
        doBind();
    }

    /**
     *
     */
    protected void doBind() {
        if (this.closed) {
            return;
        }
        this.bootstrap.bind(this.port).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    future.channel().eventLoop().schedule(() -> doBind(), 1, TimeUnit.SECONDS);
                }
            }
        });
    }

}

Simple TCP Client

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

public class SimpleTcpClient {

    /** 连接地址 */
    protected volatile String addr = "127.0.0.1";
    /** 连接端口 */
    protected volatile int port = Short.MAX_VALUE;
    /** 关闭标识 */
    protected volatile boolean closed = false;
    /** {@link io.netty.channel.EventLoopGroup} workerGroup */
    protected volatile EventLoopGroup workerGroup;
    /** {@link io.netty.bootstrap.Bootstrap} */
    protected volatile Bootstrap bootstrap;
    /** {@link io.netty.channel.Channel} */
    protected volatile Channel channel;

    /**
     * Create a tcp client.
     *
     * @param addr      IP 地址
     * @param port      端口号
     */
    public SimpleTcpClient(String addr, int port) {
        this.addr = addr;
        this.port = port;
    }

    /**
     * Close the tcp client.
     */
    public void stop() {
        this.closed = true;
        Optional.ofNullable(this.channel).ifPresent(x -> x.close());
        Optional.ofNullable(this.workerGroup).ifPresent(x -> x.shutdownGracefully());
    }

    /**
     * Connect to tcp server.
     *
     * @param consumer
     * @throws Exception
     */
    public void start(Consumer<SocketChannel> consumer) throws Exception {
        this.closed = false;
        this.workerGroup = new NioEventLoopGroup();
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(this.workerGroup)
                      .channel(NioSocketChannel.class)
                      .option(ChannelOption.TCP_NODELAY, true)
                      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                      .handler(new ChannelInitializer<SocketChannel>() {
                           @Override
                           protected void initChannel(SocketChannel ch) throws Exception {
                               // 断线重连
                               ch.pipeline().addFirst(new ChannelInboundHandlerAdapter() {
                                   @Override
                                   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                                       super.channelInactive(ctx);
                                       ctx.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
                                   }
                                   @Override
                                   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                       if (evt instanceof IdleStateEvent) {
                                           IdleStateEvent event = (IdleStateEvent) evt;
                                           if (event.state() == IdleState.READER_IDLE) {
                                               ctx.channel().close();
                                           }
                                       } else {
                                           super.userEventTriggered(ctx, evt);
                                       }
                                   }
                               });
                               // 心跳处理
                               ch.pipeline().addFirst(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
                               // 业务处理
                               Optional.ofNullable(consumer).ifPresent(x -> x.accept(ch));
                           }
                      });
        doConnect();
    }

    /**
     *
     */
    protected void doConnect() {
        if (this.closed) {
            return;
        }
        this.bootstrap.connect(this.addr, this.port).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    future.channel().eventLoop().schedule(() -> doConnect(), 1, TimeUnit.SECONDS);
                } else {
                    channel = future.channel();
                }
            }
        });
    }

    /**
     *
     * @param data
     */
    public void sent(Object data) {
        Optional.ofNullable(this.channel)
                .filter(x -> x.isActive() && x.isWritable())
                .ifPresent(x -> x.writeAndFlush(data));
    }

}
(0)

相关推荐