一种简单无锁队列的实现
Disruptor是内存无锁并发框架,基于一个环数组作为缓冲,详见Disruptor-1.0。
下面是自己设计的一个简易版,目前没有发现存在冲突或错误的测试用例。大家可以一起测试下。
package tianshui.lockfree.queue;import java.util.concurrent.atomic.AtomicInteger;/** * @author 天水 * @date 2013-4-9 下午04:13:29 */public class TestBuffer {public static AtomicInteger index = new AtomicInteger(0);public static void main(String[] args){int tCount = 10; // thread countint length = 0; // buffer length -> 2^16final RingBuffer<Integer> buffer = new RingBuffer<Integer>(Integer.class, length);// providerRunnable pr = new Runnable(){@Overridepublic void run() {while(true){try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}int tindex = index.getAndIncrement();buffer.enQueue(tindex);System.out.println("buffer enQueue: " + tindex);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}};// consumerRunnable cr = new Runnable(){@Overridepublic void run() {while(true){try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}Integer cindex = buffer.deQueue();System.out.println("buffer deQueue: " + cindex);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}};Thread[] pt = new Thread[tCount];Thread[] ct = new Thread[tCount];for(int i=0; i<tCount; i++){ct[i] = new Thread(cr);ct[i].start();}for(int i=0; i<tCount; i++){pt[i] = new Thread(pr);pt[i].start();}}}