netty源码解析1

文章目录

  • DefaultChannelPipeline
  • AbstractChannelHandlerContext
  • HeadContext
  • TailContext
  • ByteToMessageDecoder
  • MessageToByteEncoder
  • ChannelFuture
  • DefaultPromise
  • AbstractNioChannel

DefaultChannelPipeline

implements ChannelPipeline

  • 构造方法,维护头尾节点,头尾节点组成双向链表。ChannelHandler封装成ChannelHandlerContext,再有ChannelHandlerContext组成链表的元素。
protected DefaultChannelPipeline(Channel channel) {    // ...    // 头尾节点    tail = new TailContext(this);    head = new HeadContext(this);    // 双向链表    head.next = tail;    tail.prev = head;}
  • 往尾部添加handler,头尾节点固定,新增的handler的下一个节点为尾节点
private void addLast0(AbstractChannelHandlerContext newCtx) {    // 在链表添加一个handler,即在head和tail之间构建双向链表    AbstractChannelHandlerContext prev = tail.prev;    newCtx.prev = prev;    newCtx.next = tail;    prev.next = newCtx;    tail.prev = newCtx;}
  • 写操作,很明显,从尾节点开始往前面的节点写数据
public final ChannelFuture write(Object msg) {    return tail.write(msg);}

AbstractChannelHandlerContext

implements ChannelHandlerContext

  • 向通道写数据
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        // 如果是同一个线程,那么执行下一个节点的invokeChannelRead方法        next.invokeChannelRead(m);    } else {        // 如果不是同一线程,那么投放到队列去执行        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}
  • 找到下一个InboundHandler(从head到tail的顺序找)
private AbstractChannelHandlerContext findContextInbound() {    AbstractChannelHandlerContext ctx = this;    do {        ctx = ctx.next;    } while (!ctx.inbound);    return ctx;}
  • 找到下一个OutboundHandler(从tail到head的顺序)
private AbstractChannelHandlerContext findContextOutbound() {    AbstractChannelHandlerContext ctx = this;    do {        ctx = ctx.prev;    } while (!ctx.outbound);    return ctx;}

HeadContext

extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler,它是一个context,同时是一个出入站handler,说明不管出入站,都会流经。

  • HeadContext是入站的第一个handler,负责传递入站消息到下一个ChannelInboundHandler
@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {    ctx.fireChannelInactive();}
  • HeadContext是出站的最后一个handler,直接往网络发送
@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {    unsafe.close(promise);}@Overridepublic void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {    unsafe.deregister(promise);}

TailContext

extends AbstractChannelHandlerContext implements ChannelInboundHandler,它是一个context,同时是一个入站handler,也是最后一个入站handler

  • TailContext可用来做资源的释放,假如前面的handler不释放资源,那么它必须把释放资源的操作交给后面的handler,最后由TailContext处理
protected void onUnhandledInboundMessage(Object msg) {    try {        logger.debug(                "Discarded inbound message {} that reached at the tail of the pipeline. "                          "Please check your pipeline configuration.", msg);    } finally {        ReferenceCountUtil.release(msg);    }}

ByteToMessageDecoder

extends ChannelInboundHandlerAdapter

  • 读操作
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    if (msg instanceof ByteBuf) {        // 如果消息是一个ByteBuf,那么执行解码操作        // 创建一个存放解码数据的list集合        CodecOutputList out = CodecOutputList.newInstance();        try {            // cumulation也是一个ByteBuf,表示累计的消息            first = cumulation == null;            // 如果cumulation为空,则无需累加,初始化为一个空的buffer。否则,说明cumulation有数据,需要累加数据            cumulation = cumulator.cumulate(ctx.alloc(),                    first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);            callDecode(ctx, cumulation, out); // 解码操作        }        // ...    } else {        // 如果消息不是ByteBuf,则传递给下一个handler        ctx.fireChannelRead(msg);    }}
  • 解码逻辑
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {    try {        while (in.isReadable()) {            // 读取ByteBuf直到没数据            int outSize = out.size();            if (outSize > 0) {                // 如果有解码结果(List<Object>不为空),那么传递给下一个Handler                fireChannelRead(ctx, out, outSize);                 // 清空List<Object>                out.clear();                if (ctx.isRemoved()) {                    break;                }                outSize = 0;            }            int oldInputLength = in.readableBytes();            // 调用该方法进行解码            decodeRemovalReentryProtection(ctx, in, out);            // Check if this handler was removed before continuing the loop.            // If it was removed, it is not safe to continue to operate on the buffer.            //            // See https://github.com/netty/netty/issues/1664            if (ctx.isRemoved()) {                break;            }            if (outSize == out.size()) {                if (oldInputLength == in.readableBytes()) {                    break;                } else {                    continue;                }            }            if (oldInputLength == in.readableBytes()) {                throw new DecoderException(                        StringUtil.simpleClassName(getClass())                                  ".decode() did not read anything but decoded a message.");            }            if (isSingleDecode()) {                break;            }        }    } catch (DecoderException e) {        throw e;    } catch (Exception cause) {        throw new DecoderException(cause);    }}
  • 解码逻辑
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)        throws Exception {    decodeState = STATE_CALLING_CHILD_DECODE;    try {        decode(ctx, in, out); // 调用decode方法,这个方法需要我们实现    } finally {        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;        decodeState = STATE_INIT;        if (removePending) {            fireChannelRead(ctx, out, out.size());            out.clear();            handlerRemoved(ctx);        }    }}

MessageToByteEncoder

extends ChannelOutboundHandlerAdapter

  • 写操作
@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {    ByteBuf buf = null;    try {        if (acceptOutboundMessage(msg)) {            @SuppressWarnings("unchecked")            // 如果类型正确,强转            I cast = (I) msg;            buf = allocateBuffer(ctx, cast, preferDirect);            try {                encode(ctx, cast, buf); // 调用encode方法,需要我们实现            } finally {                // 结束,需要释放,为什么需要释放?因为Message转到ByteBuf了,说明已经是出站了,msg没用了,所以需要释放msg了。                ReferenceCountUtil.release(cast);            }            if (buf.isReadable()) {                ctx.write(buf, promise);            } else {                buf.release();                ctx.write(Unpooled.EMPTY_BUFFER, promise);            }            buf = null;        } else {            ctx.write(msg, promise);        }    }    // ...}
  • 判断是否是我们想要的类型
public boolean acceptOutboundMessage(Object msg) throws Exception {    return matcher.match(msg);}

ChannelFuture

extends Future

状态:完成(可能成功、可能失败、也可能被取消),未完成

  • 添加监听器
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
  • 监听器
public interface GenericFutureListener<F extends Future<?>> extends EventListener {    /**     * Invoked when the operation associated with the {@link Future} has been completed.     *     * @param future  the source {@link Future} which called this callback     */    void operationComplete(F future) throws Exception;}

一般不推荐直接使用future.get,建议使用监听器,完成后通知执行operationComplete(F future)方法,在这个方法里面去获取值

DefaultPromise

extends AbstractFuture implements Promise

public Promise<V> setSuccess(V result) {    if (setSuccess0(result)) {        return this;    }    throw new IllegalStateException("complete already: "   this);}
private boolean setSuccess0(V result) {    return setValue0(result == null ? SUCCESS : result);}
private boolean setValue0(Object objResult) {    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||        // 使用原子类保障安全        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {        if (checkNotifyWaiters()) {            // 执行成功后,唤醒监听器            notifyListeners();        }        return true;    }    return false;}
private void notifyListeners() {    EventExecutor executor = executor();    if (executor.inEventLoop()) {        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();        final int stackDepth = threadLocals.futureListenerStackDepth();        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {            threadLocals.setFutureListenerStackDepth(stackDepth   1);            try {                notifyListenersNow();            } finally {                threadLocals.setFutureListenerStackDepth(stackDepth);            }            return;        }    }    safeExecute(executor, new Runnable() {        @Override        public void run() {            notifyListenersNow();        }    });}
  • 调用监听器的完成方法
private static void notifyListener0(Future future, GenericFutureListener l) {    try {        l.operationComplete(future);    } catch (Throwable t) {        if (logger.isWarnEnabled()) {            logger.warn("An exception was thrown by "   l.getClass().getName()   ".operationComplete()", t);        }    }}

AbstractNioChannel

extends AbstractChannel

  • 封装了nio的一些东东
private final SelectableChannel ch;protected final int readInterestOp;volatile SelectionKey selectionKey; // nio的selectionkeyboolean readPending;private final Runnable clearReadPendingRunnable = new Runnable() {    @Override    public void run() {        clearReadPending0();    }};/** * The future of the current connection attempt.  If not null, subsequent * connection attempts will fail. */private ChannelPromise connectPromise;private ScheduledFuture<?> connectTimeoutFuture;private SocketAddress requestedRemoteAddress;
  • 构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {    super(parent);    this.ch = ch;    this.readInterestOp = readInterestOp; // 设置通道关注事件类型为读    try {        ch.configureBlocking(false); // 设置为非阻塞    } catch (IOException e) {        try {            ch.close();        } catch (IOException e2) {            logger.warn(                        "Failed to close a partially initialized socket.", e2);        }        throw new ChannelException("Failed to enter non-blocking mode.", e);    }}
  • 注册逻辑
protected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            // 0表示所有事件都不感兴趣            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true; // 注册成功            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}
  • 开始读
protected void doBeginRead() throws Exception {    // Channel.read() or ChannelHandlerContext.read() was called    final SelectionKey selectionKey = this.selectionKey;    if (!selectionKey.isValid()) {        return;    }    readPending = true;    // 大爷表示我对读事件很感兴趣 AbstractNioByteChannel()构造方法可以看下,会调用本类的构造方法,把read事件传递过去    final int interestOps = selectionKey.interestOps();    if ((interestOps & readInterestOp) == 0) {        // 如果当前不是读事件,那么设置成读事件,位运算        selectionKey.interestOps(interestOps | readInterestOp);    }}

AbstractNioByteChannel

  • 写方法,为什么需要对写的循环次数做限制,因为担心写线程假死
protected void doWrite(ChannelOutboundBuffer in) throws Exception {    int writeSpinCount = config().getWriteSpinCount(); // 循环写的次数,缺省16次    do {        Object msg = in.current();        if (msg == null) {            // 为空,说明写完了,那么本大爷就对写事件不感兴趣了            // Wrote all messages.            clearOpWrite();            // Directly return here so incompleteWrite(...) is not called.            return;        }        writeSpinCount -= doWriteInternal(in, msg); // 每次写后,次数-1    } while (writeSpinCount > 0); // 最多循环16次    incompleteWrite(writeSpinCount < 0); // 超过16次没有写完的数据,再来写}
  • 取消写事件
protected final void clearOpWrite() {    final SelectionKey key = selectionKey();    // Check first if the key is still valid as it may be canceled as part of the deregistration    // from the EventLoop    // See https://github.com/netty/netty/issues/2104    if (!key.isValid()) {        return;    }    final int interestOps = key.interestOps();    if ((interestOps & SelectionKey.OP_WRITE) != 0) {        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);    }}
  • 写逻辑
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {    if (msg instanceof ByteBuf) {        // 如果是bytebuf类型        ByteBuf buf = (ByteBuf) msg;        if (!buf.isReadable()) {            in.remove();            return 0; // 返回0,说明不扣减,        }        final int localFlushedAmount = doWriteBytes(buf);        if (localFlushedAmount > 0) {            in.progress(localFlushedAmount);            if (!buf.isReadable()) {                in.remove();            }            return 1;        }    } else if (msg instanceof FileRegion) {        // 如果是文件传输类型        FileRegion region = (FileRegion) msg;        if (region.transferred() >= region.count()) {            in.remove();            return 0;        }        long localFlushedAmount = doWriteFileRegion(region);        if (localFlushedAmount > 0) {            in.progress(localFlushedAmount);            if (region.transferred() >= region.count()) {                in.remove();            }            return 1;        }    } else {        // Should not reach here.        throw new Error();    }    return WRITE_STATUS_SNDBUF_FULL;}

来源:https://www.icode9.com/content-1-769201.html

(0)

相关推荐