java并发学习之二:线程池(三)
没找到什么好工具,也没得到好建议,好技巧,只能自己摸索了
一步步优化把
先是优化测试代码
加的内容不多,但挺关键的
测试要模拟的是一个平稳的任务下发过程(如同现实生活中的),所以一下子将所有任务丢到队列中是很不对的,而且数量一大,堆就满了
经过多次测试,决定通过让主线程每发送n个,就休息一段时间来降低主线程的下发频率
以达到平稳的目的
这样设置之后,再适当调一下,就可以让cpu利用率达到一个相对稳定的值了,从而可以更好地通过工具进行观察
public class TestThreadPoolTest1 {public static void main(String[] args) throws InterruptedException{//testEasyRunnableThreadPool(new ThreadPoolTest1(10), 1000000, 10);//22553768552//testEasyRunnableThreadPool(Executors.newFixedThreadPool(10), 1000000, 10);//2946477652testEasyRunnableThreadPool(new ThreadPoolTest2(8), 10000000, 8);}/** * 一个产生随机数的方法,防止jvm优化 * @param seed * @return */static int getRandomNum(int seed){seed ^= (seed << 6);seed ^= (seed >>> 21);seed ^= (seed << 7);return seed;}/** * 任务是执行一个简单的计算,只占用cpu,没有io和其他阻塞的方法 */static void testEasyRunnableThreadPool(Executor pool,int tryTime,int threadNum) throws InterruptedException{final AtomicInteger count = new AtomicInteger(0);//construct runnableRunnable command = new Runnable() {public void run() {final int addTime = 10000;long sum = 0;int temp = this.hashCode() ^ (int)System.currentTimeMillis();for(int i = 0;i<addTime;i++){sum += (temp = getRandomNum(temp));}}};testThreadPool(tryTime, pool, command);}static void testThreadPool(int tryNum,Executor pool,final Runnable command) throws InterruptedException{final CountDownLatch latch = new CountDownLatch(tryNum);final Random r = new Random();Runnable wrapper = new Runnable() {public void run() {command.run();//想测试并发,在并发中加入适当的同步操作是无法避免的,只能减少//,在这,只是做了一个简单的countdown,影响不大latch.countDown();}};long startTime = System.nanoTime();//加一层循环for(int j = 0;j<100000;j++){for(int i = 0;i<tryNum/100000;i++){pool.execute(wrapper);}//主线程休息Thread.sleep(0, r.nextInt(50));}long dispatchTime = System.nanoTime();//加一个全部任务全部下发完的时间,与最终时间做对比System.out.println(dispatchTime - startTime);latch.await();long endTime = System.nanoTime();System.out.println(endTime-startTime);}}
public class ThreadPoolTest2 implements Executor {//等待队列Queue<Runnable> waitingQueue = null;ConcurrentLinkedQueue<ThreadNode> freeThread;//相当于一个freeThread的状态,根据状态决定行为,原则上将freeThread.size()+busyThreadsNum=MAXTHREADNUMprivate AtomicInteger busyThreadsNum = new AtomicInteger(0);//最大线程数final int MAXTHREADNUM;public ThreadPoolTest2 (int threadNum){this.MAXTHREADNUM = threadNum;init(MAXTHREADNUM,new ConcurrentLinkedQueue<Runnable>());}private void init(int threadNum,ConcurrentLinkedQueue<Runnable> queue){freeThread = new ConcurrentLinkedQueue<ThreadNode>();waitingQueue = queue;}//去掉了synchronizedprivate void threadExecute(Runnable command){for(;;){//得到开始的值int expect = busyThreadsNum.get();//由于else中先执行了busyThreadsNum.incrementAndGet();才添加,所以会出现//busyThreadsNum为MAXTHREADNUM,但实际的运行数量少于busyThreadsNum,少1,但这种//现象是本应如此的if(expect == MAXTHREADNUM){waitingQueue.add(command);return;}else{busyThreadsNum.incrementAndGet();ThreadNode t = freeThread.poll();if(t == null){t = new ThreadNode();t.setCommand(command);t.start();return;}t.setCommand(command);LockSupport.unpark(t);}}}private class ThreadNode extends Thread{//加了线程保证可见性,因为根据处理逻辑,只会有一个线程进行修改,所以只需要保证可见性就可以了//同时,由于之前用的不是LockSupport,所以wait或者await还有notify或者signal都是需要加锁的//之前没加volatile,是由于使用了额外的同步(在wait,notify之前用了synchronized),所以//相当于加入了一个synchronized with关系,所以command变为可见了(setCommand操作发生在同步之前)volatile Runnable command = null;Exception e = null;public ThreadNode() {}Exception getException(){return e;}void setCommand(Runnable c){command = c;}@Overridepublic void run() {try {for(;;){if(command == null){ThreadPoolTest2.this.waitThread(this);continue;}command.run();command = ThreadPoolTest2.this.getCommand();}}catch (InterruptedException e) {}}}Runnable getCommand() throws InterruptedException{return waitingQueue.poll();}void waitThread(ThreadNode t) throws InterruptedException{//先往队列中放,再减少busyThreadsNum//因为即使busyThreadsNum已经为MAXTHREADNUM了,但其实队列中有空闲的线程//这也是允许的,造成的问题最严重不过往等待任务队列里面添加了不该等待,而是马上执行的任务//而且只是瞬间的freeThread.offer(t);busyThreadsNum.getAndDecrement();LockSupport.park(t);}protected void beforeExecute(){}public void execute(Runnable command) {beforeExecute();threadExecute(command);afterExecute();}protected void afterExecute(){}}