使用mina传输大字节数组
使用mina传输超过2k以上的数据时(采用tcp方式,如果是UDP方式,好像一次传输的数据不能超过256字节,如果超过mina不会分批次发送,而tcp方式会分批次发送),mina会自动将这些数据分成多次发送。由于是分批次发送数据,所有客服端在接受数据时,需要等所有的数据接受完之后才能解码,否则无法解码,或者只能读取到部分文件。
以下是一个发送、接受大字节数组的主要代码
服务端向客服端发送字节数组
服务端代码:
编码器:
public?class?ImageDataEncoder?extends?ProtocolEncoderAdapter?{
?@Override
?public?void?encode(IoSession?session,?Object?message,
???ProtocolEncoderOutput?out)?throws?Exception?{
??CharsetEncoder?charset?=?Charset.forName("UTF-8").newEncoder();
??ImageData?image?=?(ImageData)?message;
??IoBuffer?buffer?=?IoBuffer.allocate(2048).setAutoExpand(true);
??buffer.putString(image.getYh(),?charset);//?发送数据类型
??buffer.putInt(image.getLength());//?发送字节数组的总长度,共解码时使用
??buffer.put(image.getBimage());//?发送字节数据
??buffer.flip();
??out.write(buffer);
??buffer.free();
?}
}
ImageData.java
public?class?ImageData?{
?private?static?final?long?serialVersionUID?=?1L;
?private?String?yh?=?YHConstants.YH_IMG;//?数据类型
?public?int?length?=?0;//?字节数组长度
?private?byte[]?bimage;//?待发送的字节数组
?private?BufferedImage?image;//将字节数组转换成图片文件
?public?ImageData()?{
?}
?public?ImageData(byte[]?bimage)?{
??this.bimage?=?bimage;
?}
?public?byte[]?getBimage()?{
??return?bimage;
?}
?public?BufferedImage?getImage()?{
??try?{
???if?(bimage.length?>?0)?{
????ByteArrayInputStream?in?=?new?ByteArrayInputStream(bimage);
????this.image?=?ImageIO.read(in);
????in.close();
???}
??}?catch?(RemoteException?e)?{
???e.printStackTrace();
??}?catch?(IOException?e)?{
???e.printStackTrace();
??}
??return?this.image;
?}
?public?int?getLength()?{
??return?bimage.length;
?}
?public?String?getYh()?{
??return?yh;
?}
?public?void?setBimage(byte[]?bimage)?{
??this.bimage?=?bimage;
?}
?public?void?setYh(String?yh)?{
??this.yh?=?yh;
?}
}
YHConstants.java
public?class?YHConstants?{
?public?static?final?int?LENGTH?=?7;//?命令数据类型
?public?static?final?String?YH_CMD?=?"YH?CMD?";//?命令数据类型
?public?static?final?String?YH_IMG?=?"YH?IMG?";//?图片数据类型
}
客服端:
解码器:分段发送的解码器一定要继承CumulativeProtocolDecoder?,这个是专门用来实现这种解码的
package?com.seara.socket.codec;
import?java.nio.charset.Charset;
import?java.nio.charset.CharsetDecoder;
import?org.apache.mina.core.buffer.IoBuffer;
import?org.apache.mina.core.session.AttributeKey;
import?org.apache.mina.core.session.IoSession;
import?org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import?org.apache.mina.filter.codec.ProtocolDecoderOutput;
import?com.seara.socket.message.ImageData;
import?com.seara.socket.message.YHConstants;
/**
?*?接收图片数据,由于图片数据比较大,tcp是采用分段式发送,所有需要等所有数据接收完之后才能解码
?*?
?*?解码原理:首先读取服务器端发送数据的总长度length,然后与当前的buff中的数据长度matchLength比较,如果matchLength>=
?*?length则认为数据发送完毕,?否則将当前的buff保存起来,在下次发送buff之时合并为一个buff,然后在按照以上条件判断
?*?
?*?@author?seara
?*?
?*/
public?class?ImageDataDecoder?extends?CumulativeProtocolDecoder?{
?private?final?AttributeKey?CONTEXT?=?new?AttributeKey(this.getClass(),
???"context");
?@Override
?protected?boolean?doDecode(IoSession?session,?IoBuffer?buff,
???ProtocolDecoderOutput?out)?throws?Exception?{
??CharsetDecoder?charset?=?Charset.forName("UTF-8").newDecoder();
??System.out.println("继续解码......."?+?buff.remaining());
??//?取出context
??Context?ctx?=?this.getContext(session);//?将contex从session中取出
??int?length?=?ctx.getLength();//?数据总长度
??IoBuffer?buffer?=?ctx.getBuffer();//?保存数据的buffer
??int?matchLength?=?ctx.getMatchLength();//?目前已经发送的数据的总长度
??if?(0?==?length)?{//?第一次取值
???String?yh?=?buff.getString(YHConstants.LENGTH,?charset);
???length?=?buff.getInt();
???matchLength?=?buff.remaining();
???ctx.setYh(yh);
???ctx.setLength(length);
??}?else?{
???matchLength?+=?buff.remaining();
??}
??ctx.setMatchLength(matchLength);
??if?(buff.hasRemaining())?{//?如果buff中还有数据
???buffer.put(buff);//?添加到保存数据的buffer中
???if?(matchLength?>=?length)?{//?如果已经发送的数据的长度>=目标数据的长度,则进行解码
????byte[]?b?=?new?byte[length];
????//?一定要添加以下这一段,否则不会有任何数据,因为,在执行buffer.put(buff)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
????buffer.flip();
????buffer.get(b);
????ImageData?image?=?new?ImageData(b);
????out.write(image);
????System.out.println("解码完成.......");
????return?true;
???}?else?{
????ctx.setBuffer(buffer);
???}
??}
??return?false;//?返回false时,解码器就不会执行解码,返回true是在解码完成
?}
?/**
??*?定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于大数据解析
??*?
??*?@author?seara
??*?
??*/
?public?class?Context?{
??public?IoBuffer?buffer;
??public?int?length?=?0;
??public?int?matchLength?=?0;
??public?String?yh?=?"";
??public?Context()?{
???this.buffer?=?IoBuffer.allocate(1024).setAutoExpand(true);
??}
??public?int?getMatchLength()?{
???return?matchLength;
??}
??public?void?setMatchLength(int?matchLength)?{
???this.matchLength?=?matchLength;
??}
??public?IoBuffer?getBuffer()?{
???return?buffer;
??}
??public?void?setBuffer(IoBuffer?buffer)?{
???this.buffer?=?buffer;
??}
??public?int?getLength()?{
???return?length;
??}
??public?void?setLength(int?length)?{
???this.length?=?length;
??}
??public?String?getYh()?{
???return?yh;
??}
??public?void?setYh(String?yh)?{
???this.yh?=?yh;
??}
?}
?public?Context?getContext(IoSession?session)?{
??Context?ctx?=?(Context)?session.getAttribute(CONTEXT);
??if?(ctx?==?null)?{
???ctx?=?new?Context();
???session.setAttribute(CONTEXT,?ctx);
??}
??return?ctx;
?}
}