首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

session.write门类引发的思考-Mina Session.write流程探索.doc

2012-12-25 
session.write类型引发的思考---Mina Session.write流程探索.doc基于Mina开发网络通信程序,在传感器数据接

session.write类型引发的思考---Mina Session.write流程探索.doc
    基于Mina开发网络通信程序,在传感器数据接入领域应用的很广泛,今天我无意中发现一个问题,那就是我在前端session.write(msg)数据出去之后,却没有经过Filter的Encoder方法,同样能够写入远程服务器。因为我所发送的数据不需要很复杂的编码,所以encoder方法也一直没有去看,今天发现无法被自己写的过滤器所编码,针对这个问题,我打开以前的代码以及以前的项目中的相关代码,有些同事也是session.write(IoBuffer)之后,在encoder方法里面还加上了一句out.write(message);通过跟踪Mina源码发现,session写出去的数据类型是IoBuffer格式的,就不经过自定义的过滤器了。所以下面的代码压根是多余的

@Overridepublic void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {out.write(message);//IoBuffer格式写出去之后,跳过了encoder.}

下面我把自己跟踪调试Mina的过程记录下来.
一、场景
客户端需要每隔Time时间向服务端发送心跳包,代码如下:
session.write(IoBuffer.wrap("心跳包XXX".getBytes()));
二、现象
MyFilter中的Encoder方法encoder不执行
public class MyFilter implements ProtocolCodecFactory {private ProtocolEncoder encoder = new MyEncoder();private ProtocolDecoder decoder = new MyDecoder();@Overridepublic ProtocolEncoder getEncoder(IoSession session) throws Exception {return encoder;}@Overridepublic ProtocolDecoder getDecoder(IoSession session) throws Exception {return decoder;}}

三、分析
进入session.write方法,实现IoSession.write方法的是AbstractIoSession。直接调用的是
public WriteFuture write(Object message) {        return write(message, null);}

而AbstractIoSession.write(Object message, SocketAddress address)
该方法的工作流程是:
创建WriteFeature对象,用于返回值(session.write本身就是返回writeFeature)将session.write(message)中的Object类型的message封装成writeRequest.启动write动作,这个主要是IoFilterChain来完成的。
具体的核心代码如下:
// Now, we can write the message. First, create a future        WriteFuture writeFuture = new DefaultWriteFuture(this);        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);        // Then, get the chain and inject the WriteRequest into it        IoFilterChain filterChain = getFilterChain();        filterChain.fireFilterWrite(writeRequest);

继续跟踪到fireFilterWrite里面去,可知IoFilterChain的默认实现类DefaultIoFilterChain中的关键方法:
public void fireFilterWrite(WriteRequest writeRequest) {        Entry tail = this.tail;        callPreviousFilterWrite(tail, session, writeRequest);}

在这里先要介绍一下DefaultIoFiterChain的数据格式,主要的属性如下:
private final Map<String, Entry> name2entry = new ConcurrentHashMap<String, Entry>();    /** The chain head */    private final EntryImpl head;    /** The chain tail */    private final EntryImpl tail;

其中 head与tail都是DefaultIoFilterChain固有的属性,name2entity是我们为FilterChain添加的过滤器。因而IoFilterChain是用一个链表来保存过滤器的(('tail', prev: 'myFilter:ProtocolCodecFilter', next: 'null')),其中表头和表位都是固定的head和tail,他们对应的Filter也是专有的,HeadFilter和TailFilter.
关键方法是callPreviousFilterWrite(tail, session, writeRequest);
try {            IoFilter filter = entry.getFilter();            NextFilter nextFilter = entry.getNextFilter();            filter.filterWrite(nextFilter, session, writeRequest);        } catch (Throwable e) {            writeRequest.getFuture().setException(e);            fireExceptionCaught(e);        }

从上面两个代码片段中,可以看出,IoFilterChain首先从列表中找到tail,从tail开始查找filter,顺序调用每个filter的filterWrite()方法。这里的‘顺序调用’,指的是从tail->head调用,也就是逆向调用Filter。但是看到filter.filterWrite(nextFilter, session, writeRequest);这行代码中的参数可以发现,nextFilter,表面的意思是下一个过滤器,有点误解,感觉tail下一个过滤器不就是null吗,其实不然,进入filterWriter可知。
Entry nextEntry = EntryImpl.this.prevEntry;callPreviousFilterWrite(nextEntry, session, writeRequest);

对于除head和tail过滤器外,其他的过滤器是如何工作的呢?我们看看ProtocolCodecFilter中的fireFilter方法,做了这样的处理:

if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {            nextFilter.filterWrite(session, writeRequest);            return;        }

到这里,就明白了为什么session.write(IoBuffer.wrap())这样写出去,无法经过自己定义的过滤器了,原来在fireFilter中,对message做了判断,如果已经是IoBuffer类型的,就直接return了。
最后执行的是HeadFilter的fireFilter方法,直接看内容:
if (writeRequest.getMessage() instanceof IoBuffer) {                IoBuffer buffer = (IoBuffer) writeRequest.getMessage();                // I/O processor implementation will call buffer.reset()                // it after the write operation is finished, because                // the buffer will be specified with messageSent event.                buffer.mark();                int remaining = buffer.remaining();                if (remaining == 0) {                    // Zero-sized buffer means the internal message                    // delimiter.                    s.increaseScheduledWriteMessages();                } else {                    s.increaseScheduledWriteBytes(remaining);                }            } else {                s.increaseScheduledWriteMessages();            }            s.getWriteRequestQueue().offer(s, writeRequest);            if (!s.isWriteSuspended()) {                s.getProcessor().flush(s);            }

WriteRequestQueue的默认实现就是java.util.concurrent.ConcurrentLinkedQueue,舍去传入的session对象。


热点排行