消息写
处理流程
正如JDK规范中指出一个Channel任意时刻只能执行单个线程的写操作。单个Nioworker可以顺序处理多个socketChannel的写操作,单个SocketChannel上的多次写操作会事先放入到写请求队列;结果由Nioworker调度执行。当Channel被调度时正常情况下消息队列会被出列处理之至为空;如果中途Channel内核缓冲区已满,未写入数据将保留在内存中则下次Channel被调度时优先处理上次剩余数据之后出列可能的消息。
而且一旦内核缓存区满并尝试n次写之后失败,将挂起写操作之后该channel的写入全部保存在消息队里中,等待下一次调度。其实挂起写操作时,selectKey可能修改为支持OP_WRITE,下次select()被唤醒时解挂。在此之前客户端发起的写全部被保存到消息队列中;以下是单个channel的写操作流程。
写消息逻辑大部分在如下方法中
//AbstractNioWorker protected void write0(AbstractNioChannel<?> channel)
SendBuffer acquire(Object message) { if (message instanceof ChannelBuffer) { return acquire((ChannelBuffer) message); } if (message instanceof FileRegion) { return acquire((FileRegion) message); } throw new IllegalArgumentException( "unsupported message type: " + message.getClass()); } //SendBuffer interface SendBuffer { boolean finished(); long writtenBytes(); long totalBytes(); long transferTo(WritableByteChannel ch) throws IOException; long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException; void release(); } //PooledSendBuffer final class PooledSendBuffer extends UnpooledSendBuffer { private final Preallocation parent; PooledSendBuffer(Preallocation parent, ByteBuffer buffer) { super(buffer); this.parent = parent; } @Override public void release() { final Preallocation parent = this.parent; if (-- parent.refCnt == 0) { parent.buffer.clear(); if (parent != current) { poolHead = new PreallocationRef(parent, poolHead); } } } }