netty源码解析1
文章目录
- DefaultChannelPipeline
- AbstractChannelHandlerContext
- HeadContext
- TailContext
- ByteToMessageDecoder
- MessageToByteEncoder
- ChannelFuture
- DefaultPromise
- AbstractNioChannel
DefaultChannelPipeline
implements ChannelPipeline
- 构造方法,维护头尾节点,头尾节点组成双向链表。ChannelHandler封装成ChannelHandlerContext,再有ChannelHandlerContext组成链表的元素。
protected DefaultChannelPipeline(Channel channel) { // ... // 头尾节点 tail = new TailContext(this); head = new HeadContext(this); // 双向链表 head.next = tail; tail.prev = head;}
- 往尾部添加handler,头尾节点固定,新增的handler的下一个节点为尾节点
private void addLast0(AbstractChannelHandlerContext newCtx) { // 在链表添加一个handler,即在head和tail之间构建双向链表 AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx;}
- 写操作,很明显,从尾节点开始往前面的节点写数据
public final ChannelFuture write(Object msg) { return tail.write(msg);}
AbstractChannelHandlerContext
implements ChannelHandlerContext
- 向通道写数据
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // 如果是同一个线程,那么执行下一个节点的invokeChannelRead方法 next.invokeChannelRead(m); } else { // 如果不是同一线程,那么投放到队列去执行 executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
- 找到下一个InboundHandler(从head到tail的顺序找)
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx;}
- 找到下一个OutboundHandler(从tail到head的顺序)
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx;}
HeadContext
extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler,它是一个context,同时是一个出入站handler,说明不管出入站,都会流经。
- HeadContext是入站的第一个handler,负责传递入站消息到下一个ChannelInboundHandler
@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive();}
- HeadContext是出站的最后一个handler,直接往网络发送
@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise);}@Overridepublic void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.deregister(promise);}
TailContext
extends AbstractChannelHandlerContext implements ChannelInboundHandler,它是一个context,同时是一个入站handler,也是最后一个入站handler
- TailContext可用来做资源的释放,假如前面的handler不释放资源,那么它必须把释放资源的操作交给后面的handler,最后由TailContext处理
protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); }}
ByteToMessageDecoder
extends ChannelInboundHandlerAdapter
- 读操作
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { // 如果消息是一个ByteBuf,那么执行解码操作 // 创建一个存放解码数据的list集合 CodecOutputList out = CodecOutputList.newInstance(); try { // cumulation也是一个ByteBuf,表示累计的消息 first = cumulation == null; // 如果cumulation为空,则无需累加,初始化为一个空的buffer。否则,说明cumulation有数据,需要累加数据 cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg); callDecode(ctx, cumulation, out); // 解码操作 } // ... } else { // 如果消息不是ByteBuf,则传递给下一个handler ctx.fireChannelRead(msg); }}
- 解码逻辑
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { // 读取ByteBuf直到没数据 int outSize = out.size(); if (outSize > 0) { // 如果有解码结果(List<Object>不为空),那么传递给下一个Handler fireChannelRead(ctx, out, outSize); // 清空List<Object> out.clear(); if (ctx.isRemoved()) { break; } outSize = 0; } int oldInputLength = in.readableBytes(); // 调用该方法进行解码 decodeRemovalReentryProtection(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); }}
- 解码逻辑
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; try { decode(ctx, in, out); // 调用decode方法,这个方法需要我们实现 } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) { fireChannelRead(ctx, out, out.size()); out.clear(); handlerRemoved(ctx); } }}
MessageToByteEncoder
extends ChannelOutboundHandlerAdapter
- 写操作
@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") // 如果类型正确,强转 I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); try { encode(ctx, cast, buf); // 调用encode方法,需要我们实现 } finally { // 结束,需要释放,为什么需要释放?因为Message转到ByteBuf了,说明已经是出站了,msg没用了,所以需要释放msg了。 ReferenceCountUtil.release(cast); } if (buf.isReadable()) { ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } // ...}
- 判断是否是我们想要的类型
public boolean acceptOutboundMessage(Object msg) throws Exception { return matcher.match(msg);}
ChannelFuture
extends Future
状态:完成(可能成功、可能失败、也可能被取消),未完成
- 添加监听器
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
- 监听器
public interface GenericFutureListener<F extends Future<?>> extends EventListener { /** * Invoked when the operation associated with the {@link Future} has been completed. * * @param future the source {@link Future} which called this callback */ void operationComplete(F future) throws Exception;}
一般不推荐直接使用future.get,建议使用监听器,完成后通知执行operationComplete(F future)方法,在这个方法里面去获取值
DefaultPromise
extends AbstractFuture implements Promise
public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { return this; } throw new IllegalStateException("complete already: " this);}
private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result);}
private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || // 使用原子类保障安全 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { // 执行成功后,唤醒监听器 notifyListeners(); } return true; } return false;}
private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } });}
- 调用监听器的完成方法
private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " l.getClass().getName() ".operationComplete()", t); } }}
AbstractNioChannel
extends AbstractChannel
- 封装了nio的一些东东
private final SelectableChannel ch;protected final int readInterestOp;volatile SelectionKey selectionKey; // nio的selectionkeyboolean readPending;private final Runnable clearReadPendingRunnable = new Runnable() { @Override public void run() { clearReadPending0(); }};/** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. */private ChannelPromise connectPromise;private ScheduledFuture<?> connectTimeoutFuture;private SocketAddress requestedRemoteAddress;
- 构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; // 设置通道关注事件类型为读 try { ch.configureBlocking(false); // 设置为非阻塞 } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket.", e2); } throw new ChannelException("Failed to enter non-blocking mode.", e); }}
- 注册逻辑
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 0表示所有事件都不感兴趣 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; // 注册成功 } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } }}
- 开始读
protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; // 大爷表示我对读事件很感兴趣 AbstractNioByteChannel()构造方法可以看下,会调用本类的构造方法,把read事件传递过去 final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { // 如果当前不是读事件,那么设置成读事件,位运算 selectionKey.interestOps(interestOps | readInterestOp); }}
AbstractNioByteChannel
- 写方法,为什么需要对写的循环次数做限制,因为担心写线程假死
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = config().getWriteSpinCount(); // 循环写的次数,缺省16次 do { Object msg = in.current(); if (msg == null) { // 为空,说明写完了,那么本大爷就对写事件不感兴趣了 // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } writeSpinCount -= doWriteInternal(in, msg); // 每次写后,次数-1 } while (writeSpinCount > 0); // 最多循环16次 incompleteWrite(writeSpinCount < 0); // 超过16次没有写完的数据,再来写}
- 取消写事件
protected final void clearOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); }}
- 写逻辑
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { if (msg instanceof ByteBuf) { // 如果是bytebuf类型 ByteBuf buf = (ByteBuf) msg; if (!buf.isReadable()) { in.remove(); return 0; // 返回0,说明不扣减, } final int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (!buf.isReadable()) { in.remove(); } return 1; } } else if (msg instanceof FileRegion) { // 如果是文件传输类型 FileRegion region = (FileRegion) msg; if (region.transferred() >= region.count()) { in.remove(); return 0; } long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (region.transferred() >= region.count()) { in.remove(); } return 1; } } else { // Should not reach here. throw new Error(); } return WRITE_STATUS_SNDBUF_FULL;}
赞 (0)