Linux C++ 实现线 程池
发布时间:2022-12-08 11:36:34 所属栏目:Linux 来源:
导读: 线程池中的线程,在任务队列为空的时候线程池linux,等待任务的到来,任务队列中有任务时,则依次获取任务来执行,任务队列需要同步。
Linux线程同步有多种方法:互斥量、信号量、条件变量等。
Linux线程同步有多种方法:互斥量、信号量、条件变量等。
|
线程池中的线程,在任务队列为空的时候线程池linux,等待任务的到来,任务队列中有任务时,则依次获取任务来执行,任务队列需要同步。 Linux线程同步有多种方法:互斥量、信号量、条件变量等。 下面是根据互斥量、信号量、条件变量封装的三个类。 线程池中用到了互斥量和信号量。 #ifndef _LOCKER_H_ #define _LOCKER_H_ #include #include #include /*信号量的类*/ class sem_locker { private: sem_t m_sem; public: //初始化信号量 sem_locker() { if(sem_init(&m_sem, 0, 0) != 0) printf("sem init error\n"); } //销毁信号量 ~sem_locker() { sem_destroy(&m_sem); } //等待信号量 bool wait() { return sem_wait(&m_sem) == 0; } //添加信号量 bool add() { return sem_post(&m_sem) == 0; } }; /*互斥 locker*/ class mutex_locker { private: pthread_mutex_t m_mutex; public: mutex_locker() { if(pthread_mutex_init(&m_mutex, NULL) != 0) printf("mutex init error!"); } ~mutex_locker() { pthread_mutex_destroy(&m_mutex); } bool mutex_lock() //lock mutex { return pthread_mutex_lock(&m_mutex) == 0; } bool mutex_unlock() //unlock { return pthread_mutex_unlock(&m_mutex) == 0; } }; /*条件变量 locker*/ class cond_locker { private: pthread_mutex_t m_mutex; pthread_cond_t m_cond; public: // 初始化 m_mutex and m_cond cond_locker() { if(pthread_mutex_init(&m_mutex, NULL) != 0) printf("mutex init error"); if(pthread_cond_init(&m_cond, NULL) != 0) { //条件变量初始化是被,释放初始化成功的mutex pthread_mutex_destroy(&m_mutex); printf("cond init error"); } } // destroy mutex and cond ~cond_locker() { pthread_mutex_destroy(&m_mutex); pthread_cond_destroy(&m_cond); } //等待条件变量 bool wait() { int ans = 0; pthread_mutex_lock(&m_mutex); ans = pthread_cond_wait(&m_cond, &m_mutex); pthread_mutex_unlock(&m_mutex); return ans == 0; } //唤醒等待条件变量的线程 bool signal() { return pthread_cond_signal(&m_cond) == 0; } }; #endif 下面的是线程池类,是一个模版类: #ifndef _PTHREAD_POOL_ #define _PTHREAD_POOL_ #include "locker.h" #include #include #include #include #include #include template class threadpool { private: int thread_number; //线程池的线程数 int max_task_number; //任务队列中的最大任务数 pthread_t *all_threads; //线程数组 std::list task_queue; //任务队列 mutex_locker queue_mutex_locker; //互斥锁 sem_locker queue_sem_locker; //信号量 bool is_stop; //是否结束线程 public: threadpool(int thread_num = 20, int max_task_num = 30); ~threadpool(); bool append_task(T *task); void start(); void stop(); private: //线程运行的函数。执行run()函数 static void *worker(void *arg); void run(); }; template threadpool::threadpool(int thread_num, int max_task_num): thread_number(thread_num), max_task_number(max_task_num), is_stop(false), all_threads(NULL) { if((thread_num <= 0) || max_task_num <= 0) printf("threadpool can't init because thread_number = 0" " or max_task_number = 0"); all_threads = new pthread_t[thread_number]; if(!all_threads) printf("can't init threadpool because thread array can't new"); } template threadpool::~threadpool() { delete []all_threads; is_stop = true; } template void threadpool::stop() { is_stop = true; //queue_sem_locker.add(); } template void threadpool::start() { for(int i = 0; i < thread_number; ++i) { printf("create the %dth pthread\n", i); if(pthread_create(all_threads + i, NULL, worker, this) != 0) {//创建线程失败,清除成功申请的资源并抛出异常 delete []all_threads; throw std::exception(); } if(pthread_detach(all_threads[i])) {//将线程设置为脱离线程,失败则清除成功申请的资源并抛出异常 delete []all_threads; throw std::exception(); } } } //添加任务进入任务队列 template bool threadpool::append_task(T *task) { //获取互斥锁 queue_mutex_locker.mutex_lock(); //判断队列中任务数是否大于最大任务数 if(task_queue.size() > max_task_number) {//是则释放互斥锁 queue_mutex_locker.mutex_unlock(); return false; } //添加进入队列 task_queue.push_back(task); queue_mutex_locker.mutex_unlock(); //唤醒等待任务的线程 queue_sem_locker.add(); return true; } template void *threadpool::worker(void *arg) { threadpool *pool = (threadpool *)arg; pool->run(); return pool; } template void threadpool::run() { while(!is_stop) { //等待任务 queue_sem_locker.wait(); if(errno == EINTR) { printf("errno"); continue; } //获取互斥锁 queue_mutex_locker.mutex_lock(); //判断任务队列是否为空 if(task_queue.empty()) { queue_mutex_locker.mutex_unlock(); continue; } //获取队头任务并执行 T *task = task_queue.front(); task_queue.pop_front(); queue_mutex_locker.mutex_unlock(); if(!task) continue; // printf("pthreadId = %ld\n", (unsigned long)pthread_self()); task->doit(); //doit是T对象中的方法 } //测试用 printf("close %ld\n", (unsigned long)pthread_self()); } #endif 以上参考《Linux高性能服务器编程》 写个程序对线程池进行测试: #include #include #include #include "thread_pool.h" class task { private: int number; public: task(int num) : number(num) { } ~task() { } void doit() { printf("this is the %dth task\n", number); } }; int main() { task *ta; threadpool pool(10, 15); // pool.start(); for(int i = 0; i < 20; ++i) { ta = new task(i); // sleep(2); pool.append_task(ta); } pool.start(); sleep(10); printf("close the thread pool\n"); pool.stop(); pause(); return 0; } 经测试,线程池可以正常使用。 下一篇博客,使用线程池来实现回射服务器,测试可以达到多大的并发量。 (编辑:我爱制作网_沈阳站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐


浙公网安备 33038102330576号