session.write类型引发的思考---Mina Session.write流程探索.doc
基于Mina开发网络通信程序,在传感器数据接入领域应用的很广泛,今天我无意中发现一个问题,那就是我在前端session.write(msg)数据出去之后,却没有经过Filter的Encoder方法,同样能够写入远程服务器。因为我所发送的数据不需要很复杂的编码,所以encoder方法也一直没有去看,今天发现无法被自己写的过滤器所编码,针对这个问题,我打开以前的代码以及以前的项目中的相关代码,有些同事也是session.write(IoBuffer)之后,在encoder方法里面还加上了一句out.write(message);通过跟踪Mina源码发现,session写出去的数据类型是IoBuffer格式的,就不经过自定义的过滤器了。所以下面的代码压根是多余的
@Overridepublic void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {out.write(message);//IoBuffer格式写出去之后,跳过了encoder.}public class MyFilter implements ProtocolCodecFactory {private ProtocolEncoder encoder = new MyEncoder();private ProtocolDecoder decoder = new MyDecoder();@Overridepublic ProtocolEncoder getEncoder(IoSession session) throws Exception {return encoder;}@Overridepublic ProtocolDecoder getDecoder(IoSession session) throws Exception {return decoder;}}public WriteFuture write(Object message) { return write(message, null);}// Now, we can write the message. First, create a future WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); // Then, get the chain and inject the WriteRequest into it IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest);
public void fireFilterWrite(WriteRequest writeRequest) { Entry tail = this.tail; callPreviousFilterWrite(tail, session, writeRequest);}private final Map<String, Entry> name2entry = new ConcurrentHashMap<String, Entry>(); /** The chain head */ private final EntryImpl head; /** The chain tail */ private final EntryImpl tail;
try { IoFilter filter = entry.getFilter(); NextFilter nextFilter = entry.getNextFilter(); filter.filterWrite(nextFilter, session, writeRequest); } catch (Throwable e) { writeRequest.getFuture().setException(e); fireExceptionCaught(e); }Entry nextEntry = EntryImpl.this.prevEntry;callPreviousFilterWrite(nextEntry, session, writeRequest);
if ((message instanceof IoBuffer) || (message instanceof FileRegion)) { nextFilter.filterWrite(session, writeRequest); return; }if (writeRequest.getMessage() instanceof IoBuffer) { IoBuffer buffer = (IoBuffer) writeRequest.getMessage(); // I/O processor implementation will call buffer.reset() // it after the write operation is finished, because // the buffer will be specified with messageSent event. buffer.mark(); int remaining = buffer.remaining(); if (remaining == 0) { // Zero-sized buffer means the internal message // delimiter. s.increaseScheduledWriteMessages(); } else { s.increaseScheduledWriteBytes(remaining); } } else { s.increaseScheduledWriteMessages(); } s.getWriteRequestQueue().offer(s, writeRequest); if (!s.isWriteSuspended()) { s.getProcessor().flush(s); }