首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

深入显出Netty之三 Server请求处理

2012-12-14 
深入浅出Netty之三Server请求处理?Server bind之后,就可以对外提供服务了。Netty使用了reactor模式来提升服

深入浅出Netty之三 Server请求处理

?

Server bind之后,就可以对外提供服务了。Netty使用了reactor模式来提升服务的并发处理能力。boss线程负责监听新的连接请求,当有新的连接进来时,将对应的channel指派一个worker线程来处理。Worker线程负责对该Channel的读写操作。

一.Boss线程

?

1.阻塞Select

 for (;;) {                    try {                        // Boss线程专门负责监听新入连接,所以阻塞select                        selector.select();                        // 如果有新连接,先把key清掉                        selector.selectedKeys().clear();                        // 循环请求队列,处理连接                        for (;;) {                            SocketChannel acceptedSocket = channel.socket.accept();                            if (acceptedSocket == null) {                                break;                            }                            registerAcceptedChannel(acceptedSocket, currentThread);                        }......}
2.注册新连接
    private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {            ......//根据用户自定义的的PipelineFactory创建pipeline                ChannelPipeline pipeline =                    channel.getConfig().getPipelineFactory().getPipeline();//hash分配worker线程,默认使用递增循环worker数组方式                NioWorker worker = nextWorker();//将新的连接注册到worker线程,让worker线程负责后续读写//新的channel是主channel的子channel,而PipelineSink和主channel是同一个                worker.register(new NioAcceptedSocketChannel(                        channel.getFactory(), pipeline, channel,                        NioServerSocketPipelineSink.this, acceptedSocket,                        worker, currentThread), null);            ......        }
void register(AbstractNioChannel<?> channel, ChannelFuture future) {        synchronized (startStopLock) {            ......    //创建注册通道的任务            Runnable registerTask = createRegisterTask(channel, future);    //提交任务到阻塞队列            boolean offered = registerTaskQueue.offer(registerTask);    //唤醒selector            if (wakenUp.compareAndSet(false, true)) {                selector.wakeup();            }        }    }

3.创建注册任务

?

 protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {        boolean server = !(channel instanceof NioClientSocketChannel);        return new RegisterTask((NioSocketChannel) channel, future, server);    }

?二.worker线程

worker线程负责对应channel的读写操作,一个worker对应一个selector,会同时处理多个channel的读写。

1.主循环

for (;;) {            wakenUp.set(false);......                               if (wakenUp.get()) {                    wakenupFromLoop = true;                    selector.wakeup();                } else {                    wakenupFromLoop = false;                }                cancelledKeys = 0;//处理注册通道的任务                processRegisterTaskQueue();//处理异步事件,比如writeComplete事件                processEventQueue();//处理写数据任务,如果业务线程有异步写的时候,会有WriteTask放入队列                processWriteTaskQueue();//处理IO准备好的那些channel                processSelectedKeys(selector.selectedKeys());......        }

2.RegisterTask执行

?

public void run() {......           //如果是server,则使用异步模式                if (server) {                    channel.channel.configureBlocking(false);                }//将新的channel注册到worker线程的selector上,默认监听READ事件                synchronized (channel.interestOpsLock) {                    channel.channel.register(                            selector, channel.getRawInterestOps(), channel);                }                ......//触发BOUND的upstream事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件                if (server || !((NioClientSocketChannel) channel).boundManually) {                    fireChannelBound(channel, localAddress);                }//触发CONNECTED的upsteam事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件                fireChannelConnected(channel, remoteAddress);......        }

?3.处理读写准备好的那些channel

?

for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {            SelectionKey k = i.next();            i.remove();            try {                int readyOps = k.readyOps();//如果某个channel写就位,则读数据                if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {                    if (!read(k)) {                        // Connection already closed - no need to handle write.                        continue;                    }                }//如果写就位,则写数据                if ((readyOps & SelectionKey.OP_WRITE) != 0) {                    writeFromSelectorLoop(k);                }            } catch (CancelledKeyException e) {                close(k);            }......        }

4. 读取

?

 //从channel中读取数据到内部的buffer,转换成内部的ChannelBuffer,触发messageReceived事件    protected boolean read(SelectionKey k) {        final SocketChannel ch = (SocketChannel) k.channel();        final NioSocketChannel channel = (NioSocketChannel) k.attachment();//预测下次读将读取的buffer大小,默认使用自适应的预测算法,如果上次读取把buffer读满,则增大该值,如果连续2次都没读满,则减小该值//如果以上都不满足,则保持不变,默认长度1024        final ReceiveBufferSizePredictor predictor =            channel.getConfig().getReceiveBufferSizePredictor();        final int predictedRecvBufSize = predictor.nextReceiveBufferSize();//默认BufferFactory为HeapChannelBufferFactory,默认使用Big Endian字节序        final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();        int ret = 0;        int readBytes = 0;        boolean failure = true;//从共享pool中拿配额,从channel中读取对应数据        ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());        try {            while ((ret = ch.read(bb)) > 0) {                readBytes += ret;                if (!bb.hasRemaining()) {                    break;                }            }            failure = false;        } ......//有数据读入,则转换成自己的ChannelBuffer,并触发messageReceived事件,该事件将在用户自定义的Pipeline中执行        if (readBytes > 0) {            bb.flip();    //构造一个ChannelBuffer,默认使用堆内的数组实现            final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);    //复制数据到channelBuffer            buffer.setBytes(0, bb);    //写游标            buffer.writerIndex(readBytes);            // 修改预测器的下次读取buffer大小            predictor.previousReceiveBufferSize(readBytes);            // 触发messageReceived事件            fireMessageReceived(channel, buffer);        }......    }

? 5.EchoServerHandler接受消息

?

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {//通过channel将数据写回e.getChannel().write(e.getMessage());}

?6.数据写回,write方法其实是触发一个Downsteam事件

?

public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {        ChannelFuture future = future(channel);        channel.getPipeline().sendDownstream(                new DownstreamMessageEvent(channel, future, message, remoteAddress));        return future;    }

?7.ChannelPipeline中的处理

?

public void sendDownstream(ChannelEvent e) {        DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);//如果handler已经处理完了,则转发到ChannelSink处理,对于nioserver来说就是NioServerSocketPipelineSink        if (tail == null) {            try {                getSink().eventSunk(this, e);                return;            } catch (Throwable t) {                notifyHandlerException(e, t);                return;            }        }//否则,继续调用其他handler        sendDownstream(tail, e);    }

?8.NioServerSocketPipelineSink中处理channel事件

?

else if (e instanceof MessageEvent) {            MessageEvent event = (MessageEvent) e;            NioSocketChannel channel = (NioSocketChannel) event.getChannel();    //先放入写任务队列            boolean offered = channel.writeBufferQueue.offer(event);            assert offered;    //最后还是要通过work来写回数据            channel.worker.writeFromUserCode(channel);        }

?9.worker线程的处理

?

void writeFromUserCode(final AbstractNioChannel<?> channel) {        ......//如果业务方使用了业务线程异步写,则直接往worker线程的写队列扔一个WriteTask任务        if (scheduleWriteIfNecessary(channel)) {            return;        }......//如果业务方没有使用业务线程异步写,说明现在还在netty的Worker线程中,直接写        write0(channel);    }

?10.Worker线程直接写

?

protected void write0(AbstractNioChannel<?> channel) {        boolean open = true;        boolean addOpWrite = false;        boolean removeOpWrite = false;    //循环写入,如果都写成功了,则将去掉该channel在selector中注册的WRITE事件监听            for (;;) {                MessageEvent evt = channel.currentWriteEvent;                SendBuffer buf = null;                ChannelFuture future = null;                try {                    if (evt == null) {//从队列中拿需要写回的数据内容,如果没有了,则认为写成功了                        if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {                            removeOpWrite = true;                            channel.writeSuspended = false;                            break;                        }                        future = evt.getFuture();//将ChannelBuffer转换成ByteBuffer,此处使用PooledSendBuffer                        channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());                    } else {                        future = evt.getFuture();                        buf = channel.currentWriteBuffer;                    }                    long localWrittenBytes = 0;                    for (int i = writeSpinCount; i > 0; i --) {//将Buffer里的数据写出,因为是异步channel,如果socket的write队列满,会导致写处返回0,则重试                        localWrittenBytes = buf.transferTo(ch);//有数据写出就返回,不管是否全部写出                        if (localWrittenBytes != 0) {                            writtenBytes += localWrittenBytes;                            break;                        }                        if (buf.finished()) {                            break;                        }                    }    //如果全部写出,则通知调用方                    if (buf.finished()) {                        // Successful write - proceed to the next message.                        buf.release();                        channel.currentWriteEvent = null;                        channel.currentWriteBuffer = null;                        evt = null;                        buf = null;                        future.setSuccess();                    }     //如果还没写完,则需要让selector也关心这个channel的write事件,让write就位时,继续写    else {                        // Not written fully - perhaps the kernel buffer is full.                        addOpWrite = true;                        channel.writeSuspended = true;......                                           }                } ......            }            channel.inWriteNowLoop = false;//让selector监听write事件                            if (addOpWrite) {                    setOpWrite(channel);                } //写成功后,把write监听去掉else if (removeOpWrite) {                    clearOpWrite(channel);                }        }//如果worker线程直接写,直接触发writeComplete upstream事件,让handler处理        if (iothread) {            fireWriteComplete(channel, writtenBytes);        } //如果是业务线程异步写,将通过worker线程的eventQueue实现异步延时触发writeComplete事件else {            fireWriteCompleteLater(channel, writtenBytes);        }    }

11.Worker线程异步写,当业务方使用多线程处理时,写回的动作对worker来说是异步的

12.业务线程放入写任务队列

?

 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {                boolean offered = writeTaskQueue.offer(channel.writeTask);                assert offered;            }

?13.worker线程执行写任务

?

 private void processWriteTaskQueue() throws IOException {        for (;;) {            final Runnable task = writeTaskQueue.poll();            if (task == null) {                break;            }            task.run();            cleanUpCancelledKeys();        }    }

?14.WriteTask执行

?

private final class WriteTask implements Runnable {        WriteTask() {        }        public void run() {            writeTaskInTaskQueue.set(false);            worker.writeFromTaskLoop(AbstractNioChannel.this);        }    }

?15.worker线程执行数据写入

?

    void writeFromSelectorLoop(final SelectionKey k) {        AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();        ch.writeSuspended = false;        write0(ch);    }
?

?

热点排行