首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

NIO socket 的简略连接池

2012-10-24 
NIO socket 的简单连接池????? 在最近的项目中,需要写一个socket 与 底层服务器通信的模块。在设计中,请求

NIO socket 的简单连接池

????? 在最近的项目中,需要写一个socket 与 底层服务器通信的模块。在设计中,请求对象被封装 xxxRequest,消息返回被封装为 xxxResponse. 由于socket的编程开发经验少,一开始我使用了短连接的方式,每个请求建立一个socket通信,由于每个socket只进行一次读写,这大大浪费了系统资源。

????? 于是考虑使用长连接,系统公用一个client socket 并对send 操作进行加锁,结果在处理并发的时候,各种慢,各种等待。没有办法,考虑使用两节池,预先创建多个 client socket 放入 连接池,需要发送请求时从连接池获取一个socket,完成请求时放入连接池中。下面是一个简单的实现。

???????

??????? private? static String IP=GlobalNames.industryIP;
?private? static int PORT =Integer.parseInt(GlobalNames.industryPort);
?
?private static? int CONNECTION_POOL_SIZE = 10;
?private static NIOConnectionPool self = null;
?private Hashtable<Integer, SocketChannel> socketPool = null; // 连接池
?private boolean[] socketStatusArray = null; // 连接的状态(true-被占用,false-空闲)
?private static Selector selector? = null;
?private static InetSocketAddress SERVER_ADDRESS = null;
?
?/**
? * 初始化连接池,最大TCP连接的数量为10
? *
? * @throws IOException
? */
?public static synchronized void init() throws Exception {
??self = new NIOConnectionPool();
??self.socketPool = new Hashtable<Integer, SocketChannel>();
??self.socketStatusArray = new boolean[CONNECTION_POOL_SIZE];
??buildConnectionPool();
?}

?/**
? * 建立连接池
? */
?public synchronized static void buildConnectionPool() throws Exception {
??if (self == null) {
???init();
??}
??for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
???SocketChannel client = allocateSocketChannel();
???self.socketPool.put(new Integer(i), client);
???self.socketStatusArray[i] = false;
??}
?}

?/**
? * 从连接池中获取一个空闲的Socket
? *
? * @return 获取的TCP连接
? */
?public static SocketChannel getConnection() throws Exception {
??if (self == null)
???init();
??int i = 0;
??for (i = 0; i < CONNECTION_POOL_SIZE; i++) {
???if (!self.socketStatusArray[i]) {
????self.socketStatusArray[i] = true;
????break;
???}
??}
??if (i < CONNECTION_POOL_SIZE) {
???return self.socketPool.get(new Integer(i));
???
??} else {

? //目前连接池无可用连接时只是简单的新建一个连接
???SocketChannel newClient = allocateSocketChannel();
???CONNECTION_POOL_SIZE++;
???self.socketPool.put(CONNECTION_POOL_SIZE, newClient);
???return newClient;
??}
?}

?/**
? * 当获得的socket不可用时,重新获得一个空闲的socket。
? *
? * @param socket
? *??????????? 不可用的socket
? * @return 新得到的socket
? * @throws Exception
? */
?public static SocketChannel rebuildConnection(SocketChannel socket)
???throws Exception {
??if (self == null) {
???init();
??}
??SocketChannel newSocket = null;
??try {
???for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
????if (self.socketPool.get(new Integer(i)) == socket) {
?????newSocket = allocateSocketChannel();
?????self.socketPool.put(new Integer(i), newSocket);
?????self.socketStatusArray[i] = true;
????}
???}

??} catch (Exception e) {
???System.out.println("重建连接失败!");
???throw new RuntimeException(e);
??}
??return newSocket;
?}


?/**
? * 将用完的socket放回池中,调整为空闲状态。此时连接并没有断开。
? *
? * @param socket
? *??????????? 使用完的socket
? * @throws Exception
? */
?public static void releaseConnection(SocketChannel socket) throws Exception {
??if (self == null) {
???init();
??}
??for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
???if (self.socketPool.get(new Integer(i)) == socket) {
????self.socketStatusArray[i] = false;
????break;
???}
??}
?}

?/**
? * 断开池中所有连接
? *
? * @throws Exception
? */
?public synchronized static void releaseAllConnection() throws Exception {
??if (self == null)
???init();

??// 关闭所有连接
??SocketChannel socket = null;
??for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
???socket = self.socketPool.get(new Integer(i));
???try {
????socket.close();
????self.socketStatusArray[i] = false;
???} catch (Exception e) {
????e.printStackTrace();
???}
??}
?}
???
?
?public static SocketChannel allocateSocketChannel(){
??
???SERVER_ADDRESS = new InetSocketAddress(??
??????????????? IP, PORT);??
??SocketChannel socketChannel = null;
???? SocketChannel client = null;
???? try{
???? socketChannel = SocketChannel.open();??
??????? socketChannel.configureBlocking(false);??
??????? selector = Selector.open();??
??????? socketChannel.register(selector, SelectionKey.OP_CONNECT);??
??????? socketChannel.connect(SERVER_ADDRESS);
??????? Set<SelectionKey> selectionKeys;??
??????? Iterator<SelectionKey> iterator;??
??????? SelectionKey selectionKey;
??????? selector.select();??
??????? selectionKeys = selector.selectedKeys();??
??????? iterator = selectionKeys.iterator();??
??????? while (iterator.hasNext()) {??
??????????? selectionKey = iterator.next();??
??????????? if (selectionKey.isConnectable()) {??
??????????????? client = (SocketChannel) selectionKey.channel();??
??????????????? if (client.isConnectionPending()) {??
??????????????????? client.finishConnect();
??????????????????? client.register(selector, SelectionKey.OP_WRITE);??
??????????????????? break;
??????????????? }
??????????? }
??????? }
?}catch(Exception e){
??e.printStackTrace();
?}
?return client;
? }

?public static Selector getSelector() {
??return selector;
?}

?

使用连接池进行通信:

?/*缓冲区大小*/?
???? private static int BLOCK = 8*4096;??
???? /*发送数据缓冲区*/?
???? private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);??
???
???? /*接受数据缓冲区*/
???? private static ByteBuffer protocalNum = ByteBuffer.allocate(4);
???? private static ByteBuffer functionNum = ByteBuffer.allocate(4);
???? private static ByteBuffer messageLen = ByteBuffer.allocate(4);
???? private static ByteBuffer receivebuffer = null;
???? private? SocketChannel client = null;
???? private Selector selector = null;
????
???? private boolean readable = true;
???? private boolean writable = true;
????
????
???? public NIOSocketBackUp() throws Exception{
???? ?client = NIOConnectionPool.getConnection();
???? ?selector = NIOConnectionPool.getSelector();
???? }
????
???? public String send(ServiceRequest request) throws Exception {???
?????????????
???????? Set<SelectionKey> selectionKeys;??
???????? Iterator<SelectionKey> iterator;??
???????? SelectionKey selectionKey;??
???????? int count=0;??
???????? boolean flag = true;
???????? String receiveText="";??
????????? while (flag) {??
???????????? selector.select();??
???????????? //返回此选择器的已选择键集。??
???????????? selectionKeys = selector.selectedKeys();??
???????????? iterator = selectionKeys.iterator();??
???????????? while (iterator.hasNext()) {??
???????????????? selectionKey = iterator.next();??
???????????????? if (selectionKey.isWritable() && (writable)) {??
???????????????????????? sendbuffer.clear();??
???????????????????????? sendbuffer.put(request.getProtocalNum());
???????????????????????? sendbuffer.put(request.getFunctionNum());
???????????????????????? sendbuffer.put(request.getMessageLen());
???????????????????????? sendbuffer.put(request.getXmlbytes());
???????????????????????? sendbuffer.flip();??
???????????????????????? client.write(sendbuffer);??
???????????????????????? client.register(selector, SelectionKey.OP_READ);??
???????????????????????? writable = false;
???????????????? } else if (selectionKey.isReadable() && (readable) ) {??
???????????????????? protocalNum.clear();
???????????????????? functionNum.clear();
???????????????????? messageLen.clear();
????????????????????
????????????????????
???????????????????? count=client.read(protocalNum);??
???????????????????? count=client.read(functionNum);
???????????????????? count=client.read(messageLen);
???????????????????? messageLen.rewind();
???????????????????? int length = messageLen.asIntBuffer().get(0);
???????????????????? receivebuffer = ByteBuffer.allocate(length-12);
???????????????????? receivebuffer.clear();
????????????????????
???????????????????? //read方式竟然不阻塞
???????????????????? int total=0;
???????????????????? while(total!=(length-12)){
?????????????????????? count=client.read(receivebuffer);
?????????????????????? total+=count;
???????????????????? }
???????????????????? client.register(selector, SelectionKey.OP_WRITE);??
???????????????????? receiveText = new String(receivebuffer.array(),"GBK");
???????????????????? flag = false;
???????????????????? readable = false;
???????????????????? break;
???????????????? }
???????????? }??
???????? }

??????
??????NIOConnectionPool.releaseConnection(client);
???????? return receiveText.trim();
???? }??

?

?

?

?

热点排行