转:线程实现
看群里有同志老是在找线程池的实现,听说网上曾经发布的都是不正确的,今天我就自己弄了一个,不正确的地方大家指点指点
mutex.hxx 互斥类
?1
#ifndef?INCLUDE_MUTEX_HH
?2
#define?INCLUDE_MUTEX_HH
?3
#include?<pthread.h>
?4
?5
class?Mutex
?6
{
?7
public:
?8
??Mutex();
?9
??virtual?~Mutex();
10
??void?lock();
11
??void?unlock();
12
??pthread_mutex_t?*get_mutex();
13
private:
14
??pthread_mutex_t?mutex;
15
};
16
17
#endif
18
mutex.cxx互斥实现类
#include?"mutex.hxx"
#include?"error.hxx"
Mutex::Mutex()
{
??if(pthread_mutex_init(&mutex,NULL))
????
{
??????perror("pthread_mutex_init?error");
??????throw?MutexError("pthread_mutex_init?error");
????}
}
Mutex::~Mutex()
{
??if(pthread_mutex_destroy(&mutex))
????
{
??????perror("pthread_mutex_destroy?error");
??????throw?MutexError("pthread_mutex_destroy?error");
????}
}
void?Mutex::lock()
{
??pthread_mutex_lock(&mutex);
}
void?Mutex::unlock()
{
??pthread_mutex_unlock(&mutex);
}
pthread_mutex_t?*Mutex::get_mutex()
{
??return?&mutex;
}
error.hxx 异常类型
#ifndef?INCLUDE_ERROR_HH
#define?INCLUDE_ERROR_HH
#include?<stdexcept>
class?MutexError:public?std::runtime_error
{
public:
??MutexError(const?std::string&?what)
????:std::runtime_error(what.c_str())
??
{}
??MutexError(const?char*?const?what)
????:std::runtime_error(what)
??
{}
};
#endif
task.hxx 任务类,所有的任务需要实现此接口
#ifndef?INCLUDE_TASK_HH
#define?INCLUDE_TASK_HH
#include?<string>
#include?"mutex.hxx"
//class?Mutex;
class?Task
{
??friend?bool?operator<(const?Task&?t1,const?Task&?t2);
public:
??Task(const?std::string&?taskName=std::string(),int?level=0);
??virtual?~Task()
{};
??void?setLevel(int?level);
??std::string?taskName()const;
??std::string?taskName();
??void?setName(const?std::string&);
??virtual?void?run()=0;
private:
??Mutex?mutex;
??int?level_;
??std::string?taskName_;
};
#endif
task.cxx 任务实现代码
#include?"task.hxx"
//#include?"mutex.hxx"
Task::Task(const?std::string&?name,int?level)
??:taskName_(name),level_(level)
{
}
void?Task::setLevel(int?level)
{
??mutex.lock();
??level_=level;
??mutex.unlock();
}
std::string?Task::taskName()const
{
??return?taskName_;
}
std::string?Task::taskName()
{
??return?taskName_;
}
void?Task::setName(const?std::string&?name)
{
??mutex.lock();
??taskName_=name;
??mutex.unlock();
}
bool?operator<(const?Task&?t1,const?Task&?t2)
{
??return?t1.level_<t2.level_;
}
池头文件 pool.hxx
#ifndef?INCLUDE_POOL_HH
#define?INCLUDE_POOL_HH
#include?<pthread.h>
#include?<queue>
#include?<list>
#include?"mutex.hxx"
class?Task;
class?ThreadPool
??:private?Mutex
{
public:
??ThreadPool(int);
??~ThreadPool();
??void?addTask(Task*);
??void?wait();
??void?release(const?pthread_t&);
??Task*?get();
??void?setTimeout(long?t);
private:
??typedef?std::list<pthread_t>::iterator?ThreadIterator;
??pthread_cond_t?release_cond;
??pthread_cond_t?task_cond;
??static?void*?threadFunc(void*);
??void?init(int);
??std::priority_queue<Task*>?tasks;
??std::list<pthread_t>?idleThreads;
??std::list<pthread_t>?busyThreads;
??long?timeout_second;
};
#endif
池实现文件
#include?"pool.hxx"
#include?"task.hxx"
#include?<algorithm>
#include?<ctime>
#include?<iostream>
ThreadPool::ThreadPool(int?threadNumber)
??:timeout_second(10)
{
??pthread_cond_init(&release_cond,NULL);
??pthread_cond_init(&task_cond,NULL);
??init(threadNumber);
}
ThreadPool::~ThreadPool()
{
??pthread_cond_destroy(&release_cond);
??pthread_cond_destroy(&task_cond);
}
void?ThreadPool::init(int?threadNumber)
{
??for(int?i=0;i<threadNumber;i++)
????
{
??????pthread_t?t;
??????pthread_create(&t,NULL,threadFunc,this);
??????busyThreads.push_back(t);
????}
}
void?ThreadPool::setTimeout(long?t)
{
??if(t>0)
????timeout_second=t;
}
void?ThreadPool::addTask(Task*?task)
{
??lock();
??tasks.push(task);
??pthread_cond_signal(&task_cond);
??unlock();
}
Task*?ThreadPool::get()
{
??struct?timespec?timeout;
??timeout.tv_sec=time(NULL)+timeout_second;
??timeout.tv_nsec=0;
??lock();
??if(tasks.empty())
????
{
??????pthread_cond_timedwait(&task_cond,get_mutex(),&timeout);
????}
??if(tasks.empty())
????
{
??????std::cout<<"empty"<<std::endl;
??????unlock();
??????return?NULL;
????}
??Task?*task=tasks.top();
??tasks.pop();
??unlock();
??return?task;
}
void?*?ThreadPool::threadFunc(void*?args)
{
??ThreadPool*?pool=static_cast<ThreadPool*>(args);
??Task*?task;
??while((task=pool->get())!=NULL)
????
{
??????task->run();
????}
??pool->release(pthread_self());
}
void?ThreadPool::release(const?pthread_t&?t)
{
??lock();
??ThreadIterator?it;
??it=std::find(busyThreads.begin(),busyThreads.end(),t);
??if(it!=busyThreads.end())
????
{
??????busyThreads.erase(it);
????}
??idleThreads.push_back(t);
??pthread_cond_signal(&release_cond);
??unlock();
}
void?ThreadPool::wait()
{
??lock();
??while(!busyThreads.empty())
????
{
??????struct?timespec?timeout;
??????timeout.tv_sec=time(NULL)+10;
??????timeout.tv_nsec=0;
??????pthread_cond_timedwait(&release_cond,get_mutex(),&timeout);
????}
??for(ThreadIterator?it=idleThreads.begin();it!=idleThreads.end();it++)
????
{
??????pthread_join(*it,NULL);
????}
??unlock();
??
}
测试文件
#include?"pool.hxx"
#include?"task.hxx"
#include?<unistd.h>
#include?<iostream>
#include?<string>
#include?<vector>
#include?<sstream>
#include?<memory>
#include?"mutex.hxx"
class?WorkTask
??:public?Task
{
public:
??WorkTask(int?level,void?*data):Task(std::string(),level)
??
{
????this->data_=data;
??}
??~WorkTask()
{}
??virtual?void?run()
??
{
????std::cout<<taskName()<<(char*)data_<<std::endl;
????sleep(2);
????std::cout<<taskName()<<"?ok"<<std::endl;
??}
private:
??void?*data_;
??Mutex?mutex;
};
int?main(void)
{
??ThreadPool?pool(5);
??char?szTemp[]="aaaaaaaaaaaaaaabbbbbbbbbbbccccccccccdddddddddd";
??WorkTask?task(1,szTemp);
??char?buf[20];
??std::vector<Task*>?tasks;
??for(int?i=0;i<10;i++)
????
{
??????snprintf(buf,sizeof(buf),"%s?%d","task",i);
??????task.setName(buf);
??????std::auto_ptr<Task>?t(new?WorkTask(task));
??????pool.addTask(t.get());
??????tasks.push_back(t.release());
????}
??pool.wait();
??for(std::vector<Task*>::iterator?it=tasks.begin();it!=tasks.end();it++)
????
{
??????delete?*it;
????}
??return?0;
}


测试的结果
没有注释直接看源码就可解决。。
注:使用本人代码请注明本人的信息