基于Apache Mina实现的TCP长连接和短连接实例
Apache MINA是Apache组织的一个优秀的项目。MINA是Multipurpose Infrastructure for NetworkApplications的缩写。它是一个网络应用程序框架,用来帮助用户非常方便地开发高性能和高可靠性的网络应用程序。在本文中介绍了如何通过Apache Mina2.0来实现TCP协议长连接和短连接应用。
整个系统由两个服务端程序和两个客户端程序组成。分别实现TCP长连接和短连接通信。
系统业务逻辑是一个客户端与服务端建立长连接,一个客户端与服务端建立短连接。数据从短连接客户端经过服务端发送到长连接客户端,并从长连接客户端接收响应数据。当收到响应数据后断开连接。
系统架构图如下:
系统处理流程如下:
1)???????启动服务端程序,监听8001和8002端口。
2)???????长连接客户端向服务端8002端口建立连接,服务端将连接对象保存到共享内存中。由于采用长连接方式,连接对象是唯一的。
3)???????短连接客户端向服务端8001端口建立连接。建立连接后创建一个连接对象。
4)???????短连接客户端连接成功后发送数据。服务端接收到数据后从共享内存中得到长连接方式的连接对象,使用此对象向长连接客户端发送数据。发送前将短连接对象设为长连接对象的属性值。
5)???????长连接客户端接收到数据后返回响应数据。服务端从长连接对象的属性中取得短连接对象,通过此对象将响应数据发送给短连接客户端。
6)???????短连接客户端收到响应数据后,关闭连接。
服务启动
public class?MinaLongConnServer {
private static final int?PORT?= 8002;
?
????public void?start()throws?IOException{
?????? IoAcceptor acceptor =?new?NioSocketAcceptor();
?
?????? acceptor.getFilterChain().addLast("logger",?new?LoggingFilter());
?????? acceptor.getFilterChain().addLast("codec",?newProtocolCodecFilter(newTextLineCodecFactory(Charset.forName("UTF-8"))));
?
?????? acceptor.setHandler(new?MinaLongConnServerHandler());
?????? acceptor.getSessionConfig().setReadBufferSize(2048);
?????? acceptor.bind(new?InetSocketAddress(PORT));
?????? System.out.println("Listeningon port " +?PORT);
??? }
}
消息处理
public class?MinaLongConnServerHandler?extends?IoHandlerAdapter {
????private final?Logger logger = (Logger) LoggerFactory.getLogger(getClass());
?
??? @Override
????public void?sessionOpened(IoSession session) {
?????? InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();
?????? String clientIp = remoteAddress.getAddress().getHostAddress();
?????? logger.info("LongConnect Server opened Session ID ="+String.valueOf(session.getId()));
?????? logger.info("接收来自客户端 :" + clientIp + "的连接.");
?????? Initialization init = Initialization.getInstance();
?????? HashMap<String, IoSession> clientMap =init.getClientMap();
?????? clientMap.put(clientIp, session);
??? }
?
??? @Override
????public void?messageReceived(IoSession session, Object message) {
?????? logger.info("Messagereceived in the long connect server..");
?????? String expression = message.toString();
?????? logger.info("Message is:" + expression);
?????? IoSession shortConnSession =(IoSession) session.getAttribute("shortConnSession");
?????? logger.info("ShortConnect Server Session ID ="+String.valueOf(shortConnSession.getId()));
?????? shortConnSession.write(expression);
??? }
?
??? @Override
????public void?sessionIdle(IoSession session, IdleStatus status) {
?????? logger.info("Disconnectingthe idle.");
?????? // disconnect an idle client
?????? session.close(true);
??? }
?
??? @Override
????public void?exceptionCaught(IoSession session, Throwable cause) {
?????? // close the connection onexceptional situation
?????? logger.warn(cause.getMessage(), cause);
?????? session.close(true);
??? }
}
服务启动
public class?MinaShortConnServer {
????private static final int?PORT?= 8001;
?
????public void?start()throws?IOException{
?????? IoAcceptor acceptor =?new?NioSocketAcceptor();
?
?????? acceptor.getFilterChain().addLast("logger",?new?LoggingFilter());
?????? acceptor.getFilterChain().addLast("codec",?newProtocolCodecFilter(newTextLineCodecFactory(Charset.forName("UTF-8"))));
?
?????? acceptor.setHandler(new?MinaShortConnServerHandler());
?????? acceptor.getSessionConfig().setReadBufferSize(2048);
?????? acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 3);
?????? acceptor.bind(new?InetSocketAddress(PORT));
?????? System.out.println("Listeningon port " +?PORT);
??? }
}
消息处理
public class?MinaShortConnServerHandler?extends?IoHandlerAdapter {
????private final?Logger logger = (Logger) LoggerFactory.getLogger(getClass());
?
??? @Override
????public void?sessionOpened(IoSession session) {
?????? InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();
?????? logger.info(remoteAddress.getAddress().getHostAddress());
?????? logger.info(String.valueOf(session.getId()));
??? }
?
??? @Override
????public void?messageReceived(IoSession session, Object message) {
?????? logger.info("Messagereceived in the short connect server...");
?????? String expression = message.toString();
?????? Initialization init = Initialization.getInstance();
?????? HashMap<String, IoSession> clientMap =init.getClientMap();
???????if?(clientMap ==?null?|| clientMap.size() == 0) {
?????????? session.write("error");
?????? }?else?{
?????????? IoSession longConnSession =?null;
?????????? Iterator<String> iterator =clientMap.keySet().iterator();
?????????? String key = "";
???????????while?(iterator.hasNext()) {
????????????? key = iterator.next();
????????????? longConnSession = clientMap.get(key);
?????????? }
?????????? logger.info("ShortConnect Server Session ID :"+String.valueOf(session.getId()));
?????????? logger.info("LongConnect Server Session ID :"+String.valueOf(longConnSession.getId()));
?????????? longConnSession.setAttribute("shortConnSession",session);
?????????? longConnSession.write(expression);
?????? }
??? }
?
??? @Override
????public void?sessionIdle(IoSession session, IdleStatus status) {
?????? logger.info("Disconnectingthe idle.");
?????? // disconnect an idle client
?????? session.close(true);
??? }
?
??? @Override
????public void?exceptionCaught(IoSession session, Throwable cause) {
?????? // close the connection onexceptional situation
?????? logger.warn(cause.getMessage(), cause);
?????? session.close(true);
??? }
}
?
使用java.net.Socket来实现向服务端建立连接。Socket建立后一直保持连接,从服务端接收到数据包后直接将原文返回。
public class?TcpKeepAliveClient {
????private?String ip;
????private int?port;
????private static?Socket?socket?=?null;
????private static int?timeout?= 50 * 1000;
?
????public?TcpKeepAliveClient(String ip,?int?port) {
???????this.ip = ip;
???????this.port = port;
??? }
?
????public void?receiveAndSend()?throws?IOException {
?????? InputStream input =?null;
?????? OutputStream output =?null;
?
???????try?{
???????????if?(socket?==?null?||?socket.isClosed() || !socket.isConnected()) {
??????????????socket?=?new?Socket();
????????????? InetSocketAddress addr =?new?InetSocketAddress(ip, port);
??????????????socket.connect(addr,?timeout);
??????????????socket.setSoTimeout(timeout);
????????????? System.out.println("TcpKeepAliveClientnew ");
?????????? }
?
?????????? input =?socket.getInputStream();
?????????? output =?socket.getOutputStream();
?
?????????? // read body
???????????byte[] receiveBytes = {};// 收到的包字节数组
???????????while?(true) {
??????????????if?(input.available() > 0) {
????????????????? receiveBytes =?new byte[input.available()];
????????????????? input.read(receiveBytes);
?
????????????????? // send
????????????????? System.out.println("TcpKeepAliveClientsend date :" +?new?String(receiveBytes));
????????????????? output.write(receiveBytes, 0, receiveBytes.length);
????????????????? output.flush();
????????????? }
?????????? }
?
?????? }?catch?(Exception e) {
?????????? e.printStackTrace();
?????????? System.out.println("TcpClientnew socket error");
?????? }
??? }
?
????public static void?main(String[] args)?throws?Exception {
?????? TcpKeepAliveClient client =?new?TcpKeepAliveClient("127.0.0.1", 8002);
?????? client.receiveAndSend();
??? }
?
}
服务启动
public class?MinaShortClient {
????private static final int?PORT?= 8001;
?
????public static void?main(String[] args)?throws?IOException,InterruptedException {
?????? IoConnector connector =?new?NioSocketConnector();
?????? connector.getSessionConfig().setReadBufferSize(2048);
?
?????? connector.getFilterChain().addLast("logger",?new?LoggingFilter());
?????? connector.getFilterChain().addLast("codec",?newProtocolCodecFilter(newTextLineCodecFactory(Charset.forName("UTF-8"))));
?
?????? connector.setHandler(new?MinaShortClientHandler());
???????for?(int?i = 1; i <= 10; i++) {
?????????? ConnectFuture future = connector.connect(new?InetSocketAddress("127.0.0.1",?PORT));
?????????? future.awaitUninterruptibly();
?????? ??? IoSession session =future.getSession();
?????????? session.write(i);
?????????? session.getCloseFuture().awaitUninterruptibly();
?
?????????? System.out.println("result=" + session.getAttribute("result"));
?????? }
?????? connector.dispose();
?
??? }
}
消息处理
public class?MinaShortClientHandler?extends?IoHandlerAdapter{
????private final?Logger logger = (Logger) LoggerFactory.getLogger(getClass());
?
????public?MinaShortClientHandler() {
?
??? }
?
??? @Override
????public void?sessionOpened(IoSession session) {
??? }
?
??? @Override
????public void?messageReceived(IoSession session, Object message) {
?????? logger.info("Messagereceived in the client..");
?????? logger.info("Message is:" + message.toString());
?????? session.setAttribute("result", message.toString());
?????? session.close(true);
??? }
?
??? @Override
????public void?exceptionCaught(IoSession session, Throwable cause) {
?????? session.close(true);
??? }
}
通过本文中的例子,Apache Mina在服务端可实现TCP协议长连接和短连接。在客户端只实现了短连接模式,长连接模式也是可以实现的(在本文中还是采用传统的java Socket方式)。两个服务端之间通过共享内存的方式来传递连接对象也许有更好的实现方式。