首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

java.util.concurrent包API学习札记

2012-11-07 
java.util.concurrent包API学习笔记newFixedThreadPool创建一个固定大小的线程池。shutdown():用于关闭启动

java.util.concurrent包API学习笔记
newFixedThreadPool

创建一个固定大小的线程池。

shutdown():用于关闭启动线程,如果不调用该语句,jvm不会关闭。

awaitTermination():用于等待子线程结束,再继续执行下面的代码。该例中我设置一直等着子线程结束。

?

?

public class Test {public static void main(String[] args) throws IOException, InterruptedException {ExecutorService service = Executors.newFixedThreadPool(2);for (int i = 0; i < 4; i++) {Runnable run = new Runnable() {@Overridepublic void run() {System.out.println("thread start");}};service.execute(run);}service.shutdown();service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);System.out.println("all thread complete");}}

?

?

?

import java.io.IOException;import java.util.Random;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;class Runner implements Runnable {private CyclicBarrier barrier;private String name;public Runner(CyclicBarrier barrier, String name) {super();this.barrier = barrier;this.name = name;}@Overridepublic void run() {try {Thread.sleep(1000 * (new Random()).nextInt(8));System.out.println(name + " 准备OK.");barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println(name + " Go!!");}}public class Race {public static void main(String[] args) throws IOException, InterruptedException {CyclicBarrier barrier = new CyclicBarrier(3);ExecutorService executor = Executors.newFixedThreadPool(3);executor.submit(new Thread(new Runner(barrier, "zhangsan")));executor.submit(new Thread(new Runner(barrier, "lisi")));executor.submit(new Thread(new Runner(barrier, "wangwu")));executor.shutdown();}}

?

?

public class Test {public static void main(String[] args) {BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.DAYS, queue);for (int i = 0; i < 20; i++) {final int index = i;executor.execute(new Runnable() {public void run() {try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(String.format("thread %d finished", index));}});}executor.shutdown();}}

?

原子变量(Atomic )

并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()和take(),前者将一个对象放到队列中,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。

?

下面的例子比较简单,一个读线程,用于将要处理的文件对象添加到阻塞队列中,另外四个写线程用于取出文件对象,为了模拟写操作耗时长的特点,特让线程睡眠一段随机长度的时间。另外,该Demo也使用到了线程池和原子整型(AtomicInteger),AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞队列的put和take操作会阻塞,为了使线程退出,在队列中添加了一个“标识”,算法中也叫“哨兵”,当发现这个哨兵后,写线程就退出。

?


import java.io.File;import java.io.FileFilter;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicInteger;public class Test {static long randomTime() {return (long) (Math.random() * 1000);}public static void main(String[] args) {// 能容纳100个文件final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);// 线程池final ExecutorService exec = Executors.newFixedThreadPool(5);final File root = new File("D:\\dist\\blank");// 完成标志final File exitFile = new File("");// 读个数final AtomicInteger rc = new AtomicInteger();// 写个数final AtomicInteger wc = new AtomicInteger();// 读线程Runnable read = new Runnable() {public void run() {scanFile(root);scanFile(exitFile);}public void scanFile(File file) {if (file.isDirectory()) {File[] files = file.listFiles(new FileFilter() {public boolean accept(File pathname) {return pathname.isDirectory() || pathname.getPath().endsWith(".log");}});for (File one : files)scanFile(one);} else {try {int index = rc.incrementAndGet();System.out.println("Read0: " + index + " " + file.getPath());queue.put(file);} catch (InterruptedException e) {}}}};exec.submit(read);// 四个写线程for (int index = 0; index < 4; index++) {// write threadfinal int num = index;Runnable write = new Runnable() {String threadName = "Write" + num;public void run() {while (true) {try {Thread.sleep(randomTime());int index = wc.incrementAndGet();File file = queue.take();// 队列已经无对象if (file == exitFile) {// 再次添加"标志",以让其他线程正常退出queue.put(exitFile);break;}System.out.println(threadName + ": " + index + " " + file.getPath());} catch (InterruptedException e) {}}}};exec.submit(write);}exec.shutdown();}}
?CountDownLatch

?

从名字可以看出,CountDownLatch是一个倒数计数的锁,当倒数到0时触发事件,也就是开锁,其他人就可以进入了。在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。?

CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。

一个CountDouwnLatch实例是不能重复使用的,也就是说它是一次性的,锁一经被打开就不能再关闭使用了,如果想重复使用,请考虑使用CyclicBarrier。

下面的例子简单的说明了CountDownLatch的使用方法,模拟了100米赛跑,10名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。

?


import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Test {public static void main(String[] args) throws InterruptedException {// 开始的倒数锁final CountDownLatch begin = new CountDownLatch(1);// 结束的倒数锁final CountDownLatch end = new CountDownLatch(10);// 十名选手final ExecutorService exec = Executors.newFixedThreadPool(10);for (int index = 0; index < 10; index++) {final int NO = index + 1;Runnable run = new Runnable() {public void run() {try {begin.await();Thread.sleep((long) (Math.random() * 10000));System.out.println("No." + NO + " arrived");} catch (InterruptedException e) {} finally {end.countDown();}}};exec.submit(run);}System.out.println("Game Start");begin.countDown();end.await();System.out.println("Game Over");exec.shutdown();}}

使用Callable和Future实现线程等待
?假设在main线程启动一个线程,然后main线程需要等待子线程结束后,再继续下面的操作,我们会通过join方法阻塞main线程,代码如下:
    Runnable runnable = ...;    Thread t = new Thread(runnable);    t.start();    t.join();    ......
?通过JDK1.5线程池管理的线程可以使用Callable和Future实现(join()方法无法应用到在线程池线程)
import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class Test {public static void main(String[] args) throws InterruptedException, ExecutionException {System.out.println("start main thread");final ExecutorService exec = Executors.newFixedThreadPool(5);Callable<String> call = new Callable<String>() {public String call() throws Exception {System.out.println("  start new thread.");Thread.sleep(1000 * 5);System.out.println("  end new thread.");return "some value.";}};Future<String> task = exec.submit(call);Thread.sleep(1000 * 2);task.get(); // 阻塞,并待子线程结束,exec.shutdown();System.out.println("end main thread");}}
?CompletionService

这个东西的使用上很类似上面的example,不同的是,它会首先取完成任务的线程。下面的参考文章里,专门提到这个,大家有兴趣可以看下,例子:

?

?

import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class Test {public static void main(String[] args) throws InterruptedException,ExecutionException {ExecutorService exec = Executors.newFixedThreadPool(10);CompletionService<String> serv =new ExecutorCompletionService<String>(exec);for (int index = 0; index < 5; index++) {final int NO = index;Callable<String> downImg = new Callable<String>() {public String call() throws Exception {Thread.sleep((long) (Math.random() * 10000));return "Downloaded Image " + NO;}};serv.submit(downImg);}Thread.sleep(1000 * 2);System.out.println("Show web content");for (int index = 0; index < 5; index++) {Future<String> task = serv.take();String img = task.get();System.out.println(img);}System.out.println("End");// 关闭线程池exec.shutdown();}}

?

?

Semaphore信号量

?

拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。下面的例子只允许5个线程同时进入执行acquire()和release()之间的代码

?

?

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class Test {public static void main(String[] args) {// 线程池ExecutorService exec = Executors.newCachedThreadPool();// 只能5个线程同时访问final Semaphore semp = new Semaphore(5);// 模拟20个客户端访问for (int index = 0; index < 20; index++) {final int NO = index;Runnable run = new Runnable() {public void run() {try {// 获取许可semp.acquire();System.out.println("Accessing: " + NO);Thread.sleep((long) (Math.random() * 10000));// 访问完后,释放semp.release();} catch (InterruptedException e) {}}};exec.execute(run);}// 退出线程池exec.shutdown();}}
?

?


?

参考:

jdk1.5中的线程池使用简介

http://www.java3z.com/cwbwebhome/article/article2/2875.html

CAS原理

http://www.blogjava.net/syniii/archive/2010/11/18/338387.html?opt=admin

jdk1.5中java.util.concurrent包编写多线程

http://hi.baidu.com/luotoo/blog/item/b895c3c2d650591e0ef47731.html

ExecutorSerive vs CompletionService

http://www.coderanch.com/t/491704/threads/java/ExecutorSerive-vs-CompletionService

?

?

-- end --?

?

?

热点排行