首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

ServerSocket 用法详解(2)

2012-10-28 
ServerSocket 用法详解(二)在ThreadPool 类中定义了一个LinkedList 类型的 workQueue 成员变量, 它表示工

ServerSocket 用法详解(二)

在ThreadPool 类中定义了一个LinkedList 类型的 workQueue 成员变量, 它表示工作队列, 用来存放线程池要执行的任务, 每个任务都是 Runnable 实例. ThreadPool 类的客户程序(利用 ThreadPool 来执行任务的程序) 只要调用 ThreadPool?类的execute(Runnable task) 方法, 就能向线程池提交任务. 在 ThreadPool 类的 execute() 方法中, 先判断线程池是否已经关闭. 如果线程池已经关闭, 就不再接受任务, 负责就把任务加入到工作队列中, 并且呼醒正在等待任务的工作线程.

????? 在 ThreadPool 的构造方法中, 会创建并启动若干工作线程, 工作线程的数目由构造方法的参数 poolSize 决定. WorkThread 类表示工作线程, 它是 ThreadPool 类的内部类. 工作线程从工作队列中取出一个任务, 接着执行该任务, 然后再从工作队列中取出下一个任务并执行它, 如此反复.

?

????? 工作线程从工作队列中取任务的操作是由 ThreadPool 类的 getTask() 方法实现的, 它的处理逻辑如下:

如果队列为空并且线程池已关闭, 那就返回 null, 表示已经没有任务可以执行了;如果队列为空并且线程池没有关闭, 那就在此等待, 直到其他线程将其呼醒或者中断;如果队列中有任务, 就取出第一个任务并将其返回.

???? ?线程池的 join() 和 close() 方法都可用来关闭线程池. join() 方法确保在关闭线程之前, 工作线程把队列中的所有任务都执行完. 而 close() 方法则立即清空队列, 并且中断所有的工作线程.

?

????? ThreadPool 类是 ThreadGroup 类的子类, ThreadGroup 类表示线程组, 它提供了一些管理线程组中线程的方法. 例如, interrupt() 方法相当于调用线程组中所有活着的线程的 interrupt() 方法. 线程池中的所有工作线程都加入到当前 ThreadPool 对象表示的线程组中. ThreadPool 类在 close() 方法中调用了interrupt() 方法:

?/** 关闭线程池 */
?public synchronized void close(){
??if(!isClosed){
???isClosed = true;
???workQueue.clear();????//清空工作队列
???interrupt();??????????????????? //中断所有的的工作线程, 该方法继承自 ThreadGroup 类
??}
?}

?

???? 以上 interrupt() 方法用于中断所有的工作线程. interrupt() 方法会对工作线程造成以下影响:

如果此时一个工作线程正在ThreadPool 的 getTask() 方法中因为执行 wait() 方法而阻塞, 则会抛出 InterruptedException;如果此时一个工作线程正在执行任务, 并且这个任务不会被阻塞, 那么这个工作线程会正常执行完任务, 但是在执行下一轮 while(!isInterrupted()){.....} 循环时, 由于 isInterrupted() 方法返回 true, 因此退出 while 循环.

???? ThreadPoolTester 类用于测试 ThreadPool 的用法.

?

?????ThreadPoolTester 略..............

?

???? 利用线程池ThreadPool 来完成与客户的通信任务的 EchoServer 类

?

???? EchoServer.java(利用线程池 ThreadPool 类)

package multithread2;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class EchoServer {
?private int port = 8000;
?private ServerSocket serverSocket;
?private ThreadPool threadPool;?????????????? //线程池
?private final int? POOL_SIZE = 4;??????????? //单个CPU 时线程池中工作线程的数目
?
?public EchoServer() throws IOException{
??serverSocket = new ServerSocket(port);
??//创建线程池
??//Runtime 的 availablePocessors() 方法返回当前系统的CPU 的数目
??//系统的CPU 越多, 线程池中工作线程的数目也越多
??threadPool = new ThreadPool(Runtime.getRuntime().availableProcessors()* POOL_SIZE);
??System.out.println("服务器启动");???
?}
?
?public void service(){
??while(true){
???Socket socket = null;
???try{
????socket = serverSocket.accept();
????threadPool.execute(new Handler(socket));??? //把与可以通信的任务交给线程池???????
???}catch(IOException e){
????e.printStackTrace();
???}
??}
?}
?
?/**
? * @param args
? * @throws IOException
? */
?public static void main(String[] args) throws IOException {
??new EchoServer().service();
?}
?
?/** 负责与单个客户通信的任务, 代码与 6.1 的例子相同*/
?class Handler implements Runnable{....}?

?

?????在以上 EchoServer 的 service() 方法中, 每接收到一个客户连接, 就向线程池 ThreadPool 提交一个与客户通信的任务. ThreadPool 把任务加入到工作队列中, 工作线程会在适当的时候从队列中取出这个任务并执行它.

?

6.3 使用 JDK 类库提供的线程池

?

????? java.util.concurrent 包提供现成的线程池的实现, 它比 6.2 节介绍的线程池更加健壮, 而且功能也更强大. 如图3-4 所示是线程池的类框图.

???????ServerSocket 用法详解(2)

?

??????????????????????????????图3-4 JDK 类库中的线程池的类框图

?

????? Executor 接口表示线程池, 它的 execute(Runable task) 方法用来执行 Runable 类型的任务. Executor 的子接口 ExecutorService 中声明了管理线程池的一些方法, 比如用于关闭线程池的 shutdown() 方法等. Executors 类中包含一些静态方法, 他们负责生成各种类型的线程池 ExecutorService 实例, 入表 3-1 所示.

?

??????????????? 表3-1 Executors 类生成的 ExecutorService 实例的静态方法?

?

????? 下面是具有关闭服务器功能的 EchoServer 的源代码, 其中关闭服务器的任务是由 shutdown-thread 线程来负责的.

?

????? EchoServer.java (具有关闭服务器的功能)

package multithread4;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

???????

public class EchoServer {
?private int port = 8000;
?private ServerSocket serverSocket;
?private ExecutorService executorService;
?private final int POOL_SIZE = 4;

?private int portForShutdown = 8001;?????????????????? //用于监听关闭服务器命令的端口
?private ServerSocket serverSocketForShutdown;????????
?private boolean isShutdown = false;?????????????????? //服务器是否关闭标志

?????????

?private Thread shutdownThread = new Thread() {??????? //负责关闭服务器的进程
??public void start() {
???this.setDaemon(true);???????????????????????? //设置为守护进程(也称为后台进程)
???super.start();
??}

??public void run() {
???while (!isShutdown) {
????Socket socketForShutdown = null;
????try {
?????socketForShutdown = serverSocketForShutdown.accept();
?????BufferedReader br = new BufferedReader(
???????new InputStreamReader(socketForShutdown
?????????.getInputStream()));

?????String command = br.readLine();
?????if (command != null && command.equalsIgnoreCase("shutdown")) {
??????long beginTime = System.currentTimeMillis();
??????socketForShutdown.getOutputStream().write(
????????"服务器正在关闭/r/n".getBytes());
??????isShutdown = true;
??????//请求关闭线程池
??????//线程池不再接收新的任务, 但是会继续执行完工作队列中现有的任务
??????executorService.shutdown();
??????
??????//等待关闭线程池, 每次等待的超时时间为 30 秒
??????while (!executorService.isTerminated())
???????executorService.awaitTermination(30, TimeUnit.SECONDS);

????????

??????serverSocket.close();????????? //关闭与EchoClient 客户通信的 ServerSocket

?????????????

??????long endTime = System.currentTimeMillis();
??????socketForShutdown.getOutputStream().write(("服务器已经关闭,关闭服务器所用的时间:"
????????+ (endTime - beginTime) + "毫秒/r/n").getBytes());
????????
??????socketForShutdown.close();
??????serverSocketForShutdown.close();

?????}else{
??????//接到其他命令的处理
??????socketForShutdown.getOutputStream().write("错误的命令/r/n".getBytes());
??????socketForShutdown.close();
?????}
????} catch (Exception e) {
?????e.printStackTrace();
????}
???}
??}
?};
?
?public EchoServer() throws IOException{
??serverSocket = new ServerSocket(port);
??serverSocket.setSoTimeout(60000);????????????????? //设定等待客户连接的超时时间为 60 秒
??
??serverSocketForShutdown = new ServerSocket(portForShutdown);??? //启动关闭服务器的服务, 监听 8001 端口
??shutdownThread.start();???????????????????????????????????????? //启动负责关闭服务器的线程
??
??//创建线程池
??executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE);
??
??System.out.println("服务器启动");
??
?}
?
?public void service() {
??while (!isShutdown) {

???Socket socket = null;
???try {
????socket = serverSocket.accept();
????//可能会抛出 SocketTimeoutExcepiton 和 SocketException
????socket.setSoTimeout(60000);????????????? //把等待客户发送数据的超时时间设为 60 秒
????
????//如果线程池被标示为停止 或者 任务为null, 执行execute()方法会抛出 RejectedException,
????//有兴趣的可以看看ThreadPoolExecutor 的execute() 和 shutdown()方法
????executorService.execute(new Handler(socket));
????
???} catch (SocketTimeoutException e) {
????//不必处理等待客户连接时出现的超时异常
???} catch (RejectedExecutionException e) {
????//这个是线程池被标示为停止后, 执行executorService.execute() 抛出的异常
????try{
?????if(socket != null) socket.close();
????}catch(IOException xe){}
????return;
???}catch (SocketException e) {
????//serverSocket 被 ShutdownThread 线程关闭后,
????//在执行 serverSocket.accept() 方法时, 将会抛出SocketException,
????//如果确实是这个原因导致的异常, 退出 service() 方法
????//作者是写 socket closed, 其实应该是 Socket is closed
????if(e.getMessage().indexOf("Socket is closed") != -1) return;
???}catch (IOException e) {
????e.printStackTrace();
???}
??}

?}

?public static void main(String[] args) throws IOException {
??new EchoServer().service();

?}
}

/** 负责与单个客户通信的任务, 代码与 6.1 的例子相同 */
class Handler implements Runnable {....}

??????????????????????????????????????????????????????????????????????????????????????????????????

?

?

??????shutdownThread 线程负责关闭服务器. 它一直监听 8001 端口,? 如果接收到了 Adminclient 发送的"shutdown" 命令, 就把 isShutdown 变量设为 true. shutdownThread 线程接着执行 executorService.shutdown() 方法, 该方法请求关闭线程池 线程池将不再接收新任务, 但是会继续执行完工作队列中现有的任务. shutdownThread 线程接着等待线程池关闭:
?????? while (!executorService.isTerminated())
???????executorService.awaitTermination(30,TimeUnit.SECONDS);?????

?

????? 当线程池的工作队列中的所有任务执行完毕, executorService.isTerminated() 方法就会返回 true.

?

????? shutdownThread 线程接着关闭监听 8000 端口的 ServerSocket, 最后再关闭监听 8001 端口的 ServerSocket.

?

????? shutdownThread 线程在执行上述代码时, 主线程正在执行 EchoServer 的 service() 方法. shutdownThread 线程一系列操作会对主线程造成以下影响:

如果 shutdownThread 线程已经把 isShutdown 变量设为 true, 而主线程正准备执行 service() 方法的下一轮 while(!isShutdown){...} 循环时, 由于 isShutdwon 变量为 true, 就会退出循环.如果 shutdownThread 线程已经执行了监听 8000 端口的 serverSocket 的 close() 方法, 而主线程正在执行该 ServerSocket 的 accept() 方法, 那么该方法会抛出 SocketException. EchoServer 的 service() 方法捕获了该异常, 在异常处理代码块中退出了 service() 方法.如果 shutdownThread 线程已经执行了 executorService.shutdown() 方法, 而主线程正在执行 executorService.execute() 方法, 那么该方法会抛出 RejectedExecutionException. EchoServer 的 service() 方法捕获了该异常, 在异常处理代码块中退出 service() 方法.如果 shutdownThread 线程已经把 isShutdown 变量设为 true, 但还没有调用监听 8000 端口的 serverSocket 的 close() 方法, 而主线程正在执行 serverSocket 的 accept() 方法, 主线程阻塞 60 秒后会抛出 SocketTimeoutException. 在准备执行 service() 方法的下一轮 while(!isShutdown){...} 循环时, 由于 isShutdown 变量为 true, 就会退出循环.由此可见, 当 shutdownThread 线程开始执行关闭服务器的操作时, 主线程尽管不会立即终止, 但是迟早会结束运行.

????? 下面是 AdminClient 的源代码, 它负责向 EchoServer 发送 "shutdown" 命令, 从而关闭 EchoServer.

?

?????? AdminClient.java

package multithread4;

???????????????????????????

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;

?????????????????????????

public class AdminClient {

?public static void main(String[] args) {
??Socket socket = null;
??try {
???socket = new Socket("localhost", 8001);
???//发送关闭命令
???OutputStream socketOut = socket.getOutputStream();
???socketOut.write("shutdown/r/n".getBytes());

???//接收服务器的反馈
???BufferedReader br = new BufferedReader(new InputStreamReader(socket
?????.getInputStream()));
???String msg = null;

???while ((msg = br.readLine()) != null) {
????System.out.println(msg);
???}
??} catch (IOException e) {
???e.printStackTrace();
??} finally {
???try {
????if (socket != null)
?????socket.close(); // 断开连接
???} catch (IOException e) {
???}
??}
?}

}

?

???? ?下面按照以下方式运行 EchoServer, EchoClient 和 AdminClient, 以观察 EchoServer 服务器的关闭过程.

?

????? ?⑴ 先运行 EchoServer , 然后运行 AdminClient. EchoServer 与 AdminClient 进程都结束运行, 并且在 AdminClient 的控制台打印如下结果:

?????服务器正在关闭?????????????????????????????????????? ?? ?
???? 服务器已经关闭,关闭服务器所用的时间:0毫秒? ?

?

????? ?⑵ 先运行 EchoServer, 再运行 EchoClient, 然后再运行 AdminClient. EchoServer 程序不会立即结束, 因为它与 EchoClient 的通信任务还没有结束. 在 EchoClient 的控制台中输入 "bye" , EchoServer, EchoClient 和 AdminClient 进程都会结束运行.

?

????? ?⑶ 先运行 EchoServer, 再运行 EchoClient , 然后再运行 AdminClient. EchoServe 程序不会立即结束, 因为它与 EchoClient 的通信任务还没有结束. 不要在 EchoClient 的控制台输入任何字符串, 过 60 秒后, EchoServer 等待 EchoClient 的发送数据超时, 结束与 EchoClient 的通信任务, EchoServer 和 AdminClient 进程结束运行. 如果在 EchoClient 的控制台再输入字符串, 则会抛出 "连接已断开" 的 SocketException.(最后一句有问题, EchoClient 是不会抛出 SocketException 异常的)

?

热点排行