首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > JAVA > J2SE开发 >

java非阻塞通信的有关问题,高手请解答

2012-12-21 
java非阻塞通信的问题,高手请解答最近学习非阻塞通信的知识,有点一知半解,下面的例子是参考 孙卫琴 java

java非阻塞通信的问题,高手请解答
最近学习非阻塞通信的知识,有点一知半解,下面的例子是参考 孙卫琴 <<java网络编程精讲>>的
高手给看看

客户端代码:
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.nio.charset.*;

public class NonBlockingClient {

   private SocketChannel socketChannel=null;
   
   private ByteBuffer sendBuffer=ByteBuffer.allocate(1024);      // 发送缓冲区用于向服务器发送数据
   private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024); //接收缓冲区用于从服务器端接收数据

   private Charset charset=Charset.forName("GBK");
   private Selector selector;
   
   public NonBlockingClient() throws IOException
   {
    socketChannel=SocketChannel.open();  //创建SocketChannel对象
    InetAddress ia=InetAddress.getLocalHost();
    InetSocketAddress isa=new InetSocketAddress(ia,10000);
    socketChannel.connect(isa);   //采用阻塞模式连接服务器
    socketChannel.configureBlocking(false); //采用非阻塞模式接收和发送数据
    System.out.println("与服务器连接成功");
    selector=Selector.open(); //创建监听器
   }
   
   public void receiveData() //从控制台接收数据到缓冲区
   {
       try{
       
     BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
     String msg=null;
     while((msg=br.readLine())!=null)
     {
  if(msg.equals("exit"))
            break;
       synchronized(sendBuffer) //同步代码块,sendBuffer为共享资源
       {
       sendBuffer.put(encode(msg+"\r\n"));
       }
       
     }
       }
       catch(IOException e)
       {
        e.printStackTrace();
       }
   }
    
   public void communicate() throws IOException
   {
    socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
    while(selector.select()>0) //采用阻塞方式,返回相关事件已经发生的SelectionKey对象的数目
    {
    Set readyKeys=selector.selectedKeys(); //返回相关事件已经发生的SelectionKey对象的集合
    Iterator it=readyKeys.iterator();
    while(it.hasNext())
    {
       SelectionKey key=null;
       try
       {
        key=(SelectionKey)it.next();
        it.remove();
        
        if(key.isReadable())


        {
        receive(key);
        }
        if(key.isWritable())
        {
        send(key);
        }
       }
       catch(IOException e)
       {
        e.printStackTrace();
        try
        {
        if(key!=null)
        {
           key.cancel();
           key.channel().close();
        }
        }
        catch(Exception e1)
        {
        e1.printStackTrace();
        }
       }
    }
    }
   }
   
   public void send(SelectionKey key) throws IOException //将缓冲区中的数据通过channel发送出去
   {
     SocketChannel socketChannel=(SocketChannel)key.channel();//得到相关的SocketChannel
     synchronized(sendBuffer) //同步代码段
     {
      sendBuffer.flip();                  //把极限设为位置,把位置设为0
      socketChannel.write(sendBuffer);    //发送数据
      sendBuffer.compact();               //删除已经发送的数据
     }
   }
   
   public void receive(SelectionKey key) throws IOException
   {
     SocketChannel socketChannel=(SocketChannel)key.channel();
     socketChannel.read(receiveBuffer);
     receiveBuffer.flip();
     String receiveData=decode(receiveBuffer);   //从接收缓冲区中解码字节序列,转换为字符串
     
     if(receiveData.indexOf("\n")==-1) //不能凑成一行数据
     return;
     
     String outputData=receiveData.substring(0,receiveData.indexOf("\n")+1);//一行数据
     System.out.println(outputData);
     if(outputData.equals("echo:bye\r\n"));
     {
      key.cancel();
      socketChannel.close();
      System.out.println("关闭与服务器的连接");
      selector.close();
      System.exit(0); //结束程序
     }
     
     ByteBuffer temp=encode(outputData);


     receiveBuffer.position(temp.limit());
     receiveBuffer.compact();        //删除已经打印的数据
   }
   public String decode(ByteBuffer buffer) //解码
   {
     CharBuffer charBuffer=charset.decode(buffer);
     return charBuffer.toString();
   }
   
   public ByteBuffer encode(String str)   //编码
   {
     return charset.encode(str);
   }
   
   public static void main(String args[]) throws IOException
   {
      final NonBlockingClient nbc=new NonBlockingClient();
      
      Thread t=new Thread(new Runnable()
       {
       public void run()   //匿名内部类
        {
        nbc.receiveData();
        }
        });
      t.start();
      nbc.communicate();
   }  
}
服务器端代码:
import java.util.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.io.*;

public class NonBlockingServer 
{
    private Selector selector=null;
    private ServerSocketChannel serverSocketChannel=null;
    private int port=10000;
    private Charset charset=Charset.forName("GBK");
    
    public NonBlockingServer() throws IOException
    {
       selector=Selector.open();   //创建一个selector对象
       serverSocketChannel=ServerSocketChannel.open();   //创建一个ServerSocketChannel对象
       serverSocketChannel.socket().setReuseAddress(true);   //设置服务器端口可以重用
       serverSocketChannel.configureBlocking(false); //设置为非阻塞模式
       serverSocketChannel.socket().bind(new InetSocketAddress(port));
       
       System.out.println("服务器启动");
    
    }
    
    public void service() throws IOException
    {
       //向selector注册监听客户连接请求就绪事件
       serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
       while(selector.select()>0)
       {
        Set readyKeys=selector.selectedKeys(); //得到selector中的事件集合
        Iterator it=readyKeys.iterator();   //得到事件集合的迭代器
          while(it.hasNext())
          {


           SelectionKey key=null;
           try
           {
              key=(SelectionKey)it.next();
           it.remove();      //取出一个事件就从集合中将其删除
           
           if(key.isAcceptable()) //接收客户端的连接请求
           {
             //得到与SelectionKey关联的ServerSocketChannel
             ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
              //得到客户端SocketChannel
              SocketChannel socketChannel=(SocketChannel)ssc.accept();
               System.out.println("接收到客户端连接,来自:"+socketChannel.socket().getInetAddress()
                              +":"+socketChannel.socket().getPort());
               socketChannel.configureBlocking(false);
               ByteBuffer buffer=ByteBuffer.allocate(1024);

               //向selector注册与客户端socketChannel的读、写就绪事件,buffer为关联的对象
               socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);
               //socketChannel.register(selector,SelectionKey.OP_WRITE,buffer);
                             
           }
           //读就绪事件发生
           if(key.isReadable())
           {
           receive(key);
           }
           //写就绪事件发生
           if(key.isWritable())
           {
              send(key);
           }
           }
           catch(Exception e)
           {
           e.printStackTrace();
           try
           {


           if(key!=null)
           {
           key.cancel();
           key.channel().close();
           }
           }
           catch(Exception e1)
           {
           e1.printStackTrace();
           }
           }
          } //end while
       } // end while 
       
    }
    public void send(SelectionKey key) throws IOException 
    {
     
 ByteBuffer buffer=(ByteBuffer)key.attachment();   //得到与该SelectionKey关联的ByteBuffer
     SocketChannel socketChannel=(SocketChannel)key.channel();    //得到与该SelectionKey关联的SocketChannel
         
 buffer.flip();    //把极限设为位置,把位置设为0;
         
         String data=decode(buffer);  //按照GBK编码进行解码,将字节缓冲区中的数据转换为字符串
         
 if(data.indexOf("\r\n")==-1)  //如果还没有读到一行数据,就返回
         return;
         
         String outputData=data.substring(0,data.indexOf("\n")+1);   //截取一行数据,把\n包含进去
        
 System.out.print(outputData);   //在服务器端的控制台将读到的数据输出
         
 //String strToClient=new StringBuffer(outputData).reverse().toString();
         //System.out.println(strToClient);
 ByteBuffer outputBuffer=encode("服务器端来的数据: "+outputData);
         
 while(outputBuffer.hasRemaining())   //当前位置与极限之间是否还有数据
              socketChannel.write(outputBuffer);
         
         ByteBuffer temp=encode(outputData);  //把outputData转换为字节,放入ByteBuffer中
          buffer.position(temp.limit());               //把buffer的位置设置为temp的极限
          buffer.compact();                                   //删除buffer中已经处理过的数据
          
          if(outputData.equals("byte\r\n"))
          {


          key.cancel();
          socketChannel.close();
          System.out.println("关闭与客户的连接");
          }
          
    }
    public void receive(SelectionKey key) throws IOException
    {
    ByteBuffer buffer=(ByteBuffer)key.attachment();     //获得与SelectionKey关联的缓冲区
    
    SocketChannel socketChannel=(SocketChannel)key.channel();   //获得与SelectionKey关联的SocketChannel
    //创建一个ByteBuffer,用于存放读到的数据
    ByteBuffer readBuffer=ByteBuffer.allocate(32);
    
    socketChannel.read(readBuffer);  //读入数据放入到readBuffer中
    
    readBuffer.flip();     //把极限设为当前位置,再把当前位置设为0
    
    buffer.limit(buffer.capacity());    //把buffer的极限设为为容量

    buffer.put(readBuffer);      //将readBuffer中的数据复制到buffer中,假定buffer的容量足够大,不会出现溢出的情况
    }
    
    private String decode(ByteBuffer buffer)    //解码
    {
    CharBuffer charBuffer=charset.decode(buffer);   //将字节转换为字符
    return charBuffer.toString();               //将字符缓冲区转换为字符串
    }
    private ByteBuffer encode(String str)        //编码
    { 
    return charset.encode(str);                   //将字符串转换为字节序列
    }
    
    public static void main(String args[]) throws Exception
    {
    NonBlockingServer server=new NonBlockingServer();
    server.service();
    }
}
问题1:
 先运行服务端 再运行客户端 客户端从键盘读一些数据 按回车 此时数据传递给服务器端 客户端打印出数据 就结束了 为什么只发送一次 客户端就结束了呢??
问题2:
 客户端输入”exit“,为什么客户端进程没有结束,而是阻塞了呢?
问题3:
 服务器端在什么情况下 执行此局代码: System.out.println("关闭与客户的连接");

代码有点长,谢谢大家了
[最优解释]
你的第二个问题,你客户端main里面有两个线程,
你输入exit之后,只是把main线程结束了,
nbc.receiveData();
这个线程没结束吧?

从你的代码上下文来看,
应该是你输入exit
客户端会发送一个"exit"这样的字符串到服务器,
然后break,退出掉监听控制台输入的线程
然后服务器回发一个"echo:bye\r\n"
这样客户端收到了会channel.close()
退出监听网络连接的线程

因为我看到你客户端代码有"echo:bye\r\n"这个处理
服务器并没有地方发送
[其他解释]


public void receiveData() // 从控制台接收数据到缓冲区
{
try {

BufferedReader br = new BufferedReader(new InputStreamReader(
System.in));
String msg = null;
boolean flag = false; //修改点while ((msg = br.readLine()) != null) {


if (msg.equals("exit")){
msg = "byte"; //修改点flag = true; //修改点}
synchronized (sendBuffer) // 同步代码块,sendBuffer为共享资源
{
sendBuffer.put(encode(msg + "\r\n"));
}
if(flag) break; //修改点
}
} catch (IOException e) {
e.printStackTrace();
}
}




public void receive(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String receiveData = decode(receiveBuffer); // 从接收缓冲区中解码字节序列,转换为字符串

if (receiveData.indexOf("\n") == -1) // 不能凑成一行数据
return;

String outputData = receiveData.substring(0,
receiveData.indexOf("\n") + 1);// 一行数据
System.out.println(outputData);
if (outputData.equals("服务器端来的数据: byte\r\n")) //修改点{
key.cancel();
socketChannel.close();
System.out.println("关闭与服务器的连接");
selector.close();
System.exit(0); // 结束程序
}

ByteBuffer temp = encode(outputData);
receiveBuffer.position(temp.limit());
receiveBuffer.compact(); // 删除已经打印的数据
}


简单修改了下 NonBlockingClient
[其他解释]
问题1和问题3已解决 自己太粗心 if分支后面多了个;号

热点排行