首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 操作系统 > windows >

Windows上一个并发阻塞队列(BlockingQueue)

2012-10-21 
Windows下一个并发阻塞队列(BlockingQueue)Windows下一个带有大小限制的并发阻塞队列,实现的比较简单。#ifn

Windows下一个并发阻塞队列(BlockingQueue)

Windows下一个带有大小限制的并发阻塞队列,实现的比较简单。

#ifndef BLOCKINGQUEUE_H_#define BLOCKINGQUEUE_H_#include <queue>#include <windows.h>using namespace std;template <typename T>class BoundedBlockingQueue { public:     BoundedBlockingQueue(int size) : maxSize(size)     {        _lock = CreateMutex(NULL,false,NULL);        _rsem = CreateSemaphore(NULL,0,size,NULL);        _wsem = CreateSemaphore(NULL,size,size,NULL);    }     ~BoundedBlockingQueue()     {         CloseHandle(_lock);        CloseHandle(_rsem);        CloseHandle(_wsem);    }     void push(const T& data);    T pop();    bool empty()    {        WaitForSingleObject(_lock,INFINITE);        bool is_empty = _array.empty();        ReleaseMutex(_lock);        return is_empty;    }private:     deque<T> _array;    int maxSize;    HANDLE _lock;    HANDLE _rsem, _wsem;};template <typename T>void BoundedBlockingQueue <T>::push(const T& value ) {     WaitForSingleObject(_wsem,INFINITE);    WaitForSingleObject(_lock,INFINITE);    _array.push_back(value);    ReleaseMutex(_lock);    ReleaseSemaphore(_rsem,1,NULL);}template <typename T>T BoundedBlockingQueue<T>::pop() {     WaitForSingleObject(_rsem,INFINITE);    WaitForSingleObject(_lock,INFINITE);    T _temp = _array.front();    _array.pop_front();    ReleaseMutex(_lock);    ReleaseSemaphore(_wsem,1,NULL);    return _temp;}#endif

主函数调用测试:一个生产者、两个消费者使用这个队列进行测试。

#include "BlockingQueue.h"#include <windows.h>#include <iostream>using namespace std;bool is_over = false;DWORD WINAPI produce(LPVOID lppara){    BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;    while(1)    {        for(int i=1; i<=50; ++i)        {            queue->push(i);            cout<<GetCurrentThreadId()<<" put a data: "<<i<<endl;            Sleep(10); //producer is fast        }        is_over = true;        break;    }    return NULL;}DWORD WINAPI consume(LPVOID lppara){    BoundedBlockingQueue<int> *queue = (BoundedBlockingQueue<int> *)lppara;    while(1)    {        int d = queue->pop();        cout<<GetCurrentThreadId()<<" get data: "<<d<<endl;        //double check        if(is_over && queue->empty())        {            cout<<"OVER!"<<endl;            break;        }        Sleep(10); //consumer is slow    }    return NULL;}int main(){    DWORD write_data;    DWORD read_data;    DWORD read_data1;    BoundedBlockingQueue<int> queue(20);    //一个生产者、两个消费者    if(CreateThread(NULL,0,produce,&queue,0,&write_data)==NULL)        return -1;    if(CreateThread(NULL,0,consume,&queue,0,&read_data)==NULL)        return -1;    if(CreateThread(NULL,0,consume,&queue,0,&read_data1)==NULL)        return -1;    char ch;    while(1)    {        ch = getchar(); //press "e" to exit        if(ch == 'e') break;    }    printf("Program ends successfully\n");    return 0;}



 

热点排行