构建高性能工作线程 一 扩展 Spring 线程池
扩展 Spring 线程池
/**扩展SPRING 2.0的threadPooltaskExecutor提供返回theadPoolexuecutor的引用,用于实时调整CorePoolSize、MaximumPoolSize、KeepAliveTime三个参数。*/public class MyThreadPoolTaskExecutor implements SchedulingTaskExecutor,Executor, InitializingBean, DisposableBean { //核心线程池大小 private int corePoolSize; //最大线程池大小 private int maxPoolSize; //线程保持活动时间 private int keepAliveSeconds; //缓冲队列大小 private int queueCapacity; //线程池名称 private String threadPoolName; //thread 工厂 private ThreadFactory threadFactory; //未执行任务的接口 private RejectedExecutionHandler rejectedExecutionHandler; //jdk 线程池执行器 private ThreadPoolExecutor executorService; //用于线程同步对象 private final Object poolSizeMonitor = new Object(); //MAP 用于管理所有线程池 private static ConcurrentHashMap threadPoolMap = new ConcurrentHashMap <String,ThreadPoolExecutor>(); //默认构造函数 public MyThreadPoolTaskExecutor() { corePoolSize = 1; maxPoolSize = 2147483647; keepAliveSeconds = 60; queueCapacity = 2147483647; threadFactory = Executors.defaultThreadFactory(); rejectedExecutionHandler = new java.util.concurrent.ThreadPoolExecutor.AbortPolicy(); threadPoolName =""; } //创建缓冲队列 protected BlockingQueue createQueue(int queueCapacity) { if (this.queueCapacity > 0) return new LinkedBlockingQueue(this.queueCapacity); else return new SynchronousQueue(); } //线程池执行方法 public void execute(Runnable task) { Assert.notNull(executorService,"ThreadPoolTaskExecutor not initialized"); executorService.execute(task); } //SPRING 线程池方法扩展,返回线程池对象用于实时调整CorePoolSize、 //MaximumPoolSize、KeepAliveTime三个参数。 public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { Assert.state(this.executorService != null, "ThreadPoolTaskExecutor not initialized"); return this.executorService; } public void setCorePoolSize(int corePoolSize) { synchronized (this.poolSizeMonitor) { this.corePoolSize = corePoolSize; if (this.executorService != null) { this.executorService.setCorePoolSize(corePoolSize); } } } public int getCorePoolSize() { synchronized (this.poolSizeMonitor) { return this.executorService.getCorePoolSize(); } } public void setMaxPoolSize(int maxPoolSize) { synchronized (this.poolSizeMonitor) { this.maxPoolSize = maxPoolSize; if (this.executorService != null) { this.executorService.setMaximumPoolSize(maxPoolSize); } } } public int getMaxPoolSize() { synchronized (this.poolSizeMonitor) { return this.executorService.getMaximumPoolSize(); } } public void setKeepAliveSeconds(int keepAliveSeconds) { synchronized (this.poolSizeMonitor) { this.keepAliveSeconds = keepAliveSeconds; if (this.executorService != null) { this.executorService.setKeepAliveTime(keepAliveSeconds,TimeUnit.SECONDS); } } } public int getKeepAliveSeconds() { synchronized (this.poolSizeMonitor) { return this.keepAliveSeconds; } } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory == null ? Executors.defaultThreadFactory() : threadFactory; } public void setRejectedExecutionHandler( RejectedExecutionHandler rejectedExecutionHandler) { this.rejectedExecutionHandler = ((RejectedExecutionHandler)(rejectedExecutionHandler == null ? ((RejectedExecutionHandler) (new java.util.concurrent.ThreadPoolExecutor.AbortPolicy())): rejectedExecutionHandler)); }}?