Thread_大批量数据的分页处理(生产者-消费者-阻塞队列)
java应用中通常会有处理大批量数据的场景,这里介绍一种分页处理的方式,仅供参考。
大批量数据通常不能一次读取或者一次写入,我的思路是参照分页查询的模式缩小每次操作的数据集,循环执行,直到处理完毕。
?
实现方式有两种:
1种是采用单线程模式顺序执行,1种是采用多线程模式,1个线程读取,另1个线程负责处理数据,这里介绍的是后一种。
?
具体说明:
1. 采用固定大小的队列来存储分页数据;
2. 队列满了,则开始执行操作(入库等)操作;
3. 队列空了,则开始向队列中插入数据;
?
?
/** * 批量数据分页处理 * * 场景:大批量数据查询、入库 * 解决:拆分大批量为小数据集合 * * 关键点:指定长度的queue/是否可读取read/存、取数据线程间的通讯 * * @author charles * */public class PageDataHandler {boolean parseComplete = false;DataBuffer dataBuffer = new DataBuffer();private static String[] data = null;private static int rowCount = 0;private static int pageCount = 0;// 准备数据static {data = new String[]{"0-0","1-1","2-2","3-3","4-4","5-5","6-6","7-7","8-8"};rowCount = data.length;pageCount = rowCount / DataBuffer.ROWS_PER_PAGE + ((rowCount % DataBuffer.ROWS_PER_PAGE) == 0 ? 0 : 1);}/** * 分页查询数据 * @param page * @return */private void findByPage() throws Exception {Thread t = new Thread() {public void run() {try {for(int i = 0; i < pageCount; i++){int begin = i * DataBuffer.ROWS_PER_PAGE;//装满queue为止for(int j = 0; j < DataBuffer.ROWS_PER_PAGE; j++){if(begin+j < data.length)dataBuffer.put(data[begin+j]);}}parseComplete = true;dataBuffer.triggerRead(true);} catch (Exception e) {e.printStackTrace();}}};t.start();}/** * 消费queue中的数据 */private void bizz(){Thread t = new Thread() {public void run() {try {int bufferSize = 0;while (true) {bufferSize = dataBuffer.queue.size();if(bufferSize > 0)dataBuffer.oper();if(parseComplete == true)break;}} catch (Exception e) {e.printStackTrace();}}};t.start();}public static void main(String args[]) throws Exception {PageDataHandler pageHandler = new PageDataHandler();pageHandler.findByPage();//1.分页获取数据pageHandler.bizz();//2.操作分页数据}}?
?
import java.util.Iterator;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class DataBuffer {public static final int ROWS_PER_PAGE = 3;// 指定长度的queuepublic static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(ROWS_PER_PAGE);// 是否可读取private boolean read = false;public synchronized void put(String data) throws Exception {queue.put(data);System.out.println("** put : "+ data);if(queue.size() == ROWS_PER_PAGE)triggerRead(true);while(read){try {this.wait();} catch (InterruptedException e) {}}}public synchronized void triggerRead(boolean read) {this.read = read;this.notifyAll();}public synchronized void oper() throws Exception {while(!read){try {this.wait();} catch (InterruptedException e) {}}System.out.println("** queue size : "+ queue.size());for(Iterator it = queue.iterator(); it.hasNext();){String data = (String)it.next();//操作数据System.out.println("** get : "+ queue.take());}triggerRead(false);}}?