加入收藏 | 设为首页 | 会员中心 | 我要投稿 我爱制作网_沈阳站长网 (https://www.024zz.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

Linux C++ 实现线 程池

发布时间:2022-12-08 11:36:34 所属栏目:Linux 来源:
导读:  线程池中的线程,在任务队列为空的时候线程池linux,等待任务的到来,任务队列中有任务时,则依次获取任务来执行,任务队列需要同步。

  Linux线程同步有多种方法:互斥量、信号量、条件变量等。

  
  线程池中的线程,在任务队列为空的时候线程池linux,等待任务的到来,任务队列中有任务时,则依次获取任务来执行,任务队列需要同步。
 
  Linux线程同步有多种方法:互斥量、信号量、条件变量等。
 
  下面是根据互斥量、信号量、条件变量封装的三个类。
 
  线程池中用到了互斥量和信号量。
 
  #ifndef _LOCKER_H_
  #define _LOCKER_H_
  #include
  #include
  #include
  /*信号量的类*/
  class sem_locker
  {
  private:
      sem_t m_sem;
  public:
      //初始化信号量
      sem_locker()
      {
   if(sem_init(&m_sem, 0, 0) != 0)
       printf("sem init error\n");
      }
      //销毁信号量
      ~sem_locker()
      {
   sem_destroy(&m_sem);
      }
      //等待信号量
      bool wait()
      {
   return sem_wait(&m_sem) == 0;
      }
      //添加信号量
      bool add()
      {
   return sem_post(&m_sem) == 0;
      }
  };
  /*互斥 locker*/
  class mutex_locker
  {
  private:
      pthread_mutex_t m_mutex;
  public:
      mutex_locker()
      {
       if(pthread_mutex_init(&m_mutex, NULL) != 0)
       printf("mutex init error!");
      }
      ~mutex_locker()
      {
   pthread_mutex_destroy(&m_mutex);
      }
      bool mutex_lock()  //lock mutex
      {
   return pthread_mutex_lock(&m_mutex) == 0;
      }
      bool mutex_unlock()   //unlock
      {
   return pthread_mutex_unlock(&m_mutex) == 0;
      }
  };
  /*条件变量 locker*/
  class cond_locker
  {
  private:
      pthread_mutex_t m_mutex;
      pthread_cond_t m_cond;
  public:
      // 初始化 m_mutex and m_cond
      cond_locker()
      {
   if(pthread_mutex_init(&m_mutex, NULL) != 0)
       printf("mutex init error");
   if(pthread_cond_init(&m_cond, NULL) != 0)
   {   //条件变量初始化是被,释放初始化成功的mutex
       pthread_mutex_destroy(&m_mutex);
       printf("cond init error");
   }
      }
      // destroy mutex and cond
      ~cond_locker()
      {
   pthread_mutex_destroy(&m_mutex);
   pthread_cond_destroy(&m_cond);
      }
      //等待条件变量
      bool wait()
      {
   int ans = 0;
   pthread_mutex_lock(&m_mutex);
   ans = pthread_cond_wait(&m_cond, &m_mutex);
   pthread_mutex_unlock(&m_mutex);
   return ans == 0;
      }
      //唤醒等待条件变量的线程
      bool signal()
      {
   return pthread_cond_signal(&m_cond) == 0;
      }
  };
  #endif
  下面的是线程池类,是一个模版类:
 
  #ifndef _PTHREAD_POOL_
  #define _PTHREAD_POOL_
  #include "locker.h"
  #include
  #include
  #include
  #include
  #include
  #include
  template
  class threadpool
  {
  private:
      int thread_number;  //线程池的线程数
      int max_task_number;  //任务队列中的最大任务数
      pthread_t *all_threads;   //线程数组
      std::list task_queue; //任务队列
      mutex_locker queue_mutex_locker;  //互斥锁
      sem_locker queue_sem_locker;   //信号量
      bool is_stop; //是否结束线程
  public:
      threadpool(int thread_num = 20, int max_task_num = 30);
      ~threadpool();
      bool append_task(T *task);
      void start();
      void stop();
  private:
      //线程运行的函数。执行run()函数
      static void *worker(void *arg);
      void run();
  };
  template
  threadpool::threadpool(int thread_num, int max_task_num):
   thread_number(thread_num), max_task_number(max_task_num),
   is_stop(false), all_threads(NULL)
  {
      if((thread_num <= 0) || max_task_num <= 0)
   printf("threadpool can't init because thread_number = 0"
   " or max_task_number = 0");
      all_threads = new pthread_t[thread_number];
      if(!all_threads)
       printf("can't init threadpool because thread array can't new");
  }
  template
  threadpool::~threadpool()
  {
      delete []all_threads;
      is_stop = true;
  }
  template
  void threadpool::stop()
  {
      is_stop = true;
      //queue_sem_locker.add();
  }
  template
  void threadpool::start()
  {
      for(int i = 0; i < thread_number; ++i)
      {
   printf("create the %dth pthread\n", i);
   if(pthread_create(all_threads + i, NULL, worker, this) != 0)
   {//创建线程失败,清除成功申请的资源并抛出异常
       delete []all_threads;
       throw std::exception();
   }
   if(pthread_detach(all_threads[i]))
   {//将线程设置为脱离线程,失败则清除成功申请的资源并抛出异常
       delete []all_threads;
       throw std::exception();
   }
      }
  }
  //添加任务进入任务队列
  template
  bool threadpool::append_task(T *task)
  {   //获取互斥锁
      queue_mutex_locker.mutex_lock();
      //判断队列中任务数是否大于最大任务数
      if(task_queue.size() > max_task_number)
      {//是则释放互斥锁
   queue_mutex_locker.mutex_unlock();
   return false;
      }
      //添加进入队列
      task_queue.push_back(task);
      queue_mutex_locker.mutex_unlock();
      //唤醒等待任务的线程
      queue_sem_locker.add();
      return true;
  }
  template
  void *threadpool::worker(void *arg)
  {
      threadpool *pool = (threadpool *)arg;
      pool->run();
      return pool;
  }
  template
  void threadpool::run()
  {
      while(!is_stop)
      {   //等待任务
   queue_sem_locker.wait();
   if(errno == EINTR)
   {
       printf("errno");
       continue;
   }
   //获取互斥锁
   queue_mutex_locker.mutex_lock();
   //判断任务队列是否为空
   if(task_queue.empty())
   {
       queue_mutex_locker.mutex_unlock();
       continue;
   }
   //获取队头任务并执行
     T *task = task_queue.front();
   task_queue.pop_front();
   queue_mutex_locker.mutex_unlock();
   if(!task)
       continue;
  // printf("pthreadId = %ld\n", (unsigned long)pthread_self());
   task->doit();  //doit是T对象中的方法
      }
      //测试用
      printf("close %ld\n", (unsigned long)pthread_self());
  }
  #endif
  以上参考《Linux高性能服务器编程》
 
  写个程序对线程池进行测试:
 
  #include
  #include
  #include
  #include "thread_pool.h"
  class task
  {
  private:
      int number;
  public:
      task(int num) : number(num)
      {
      }
      ~task()
      {
      }
      void doit()
      {
   printf("this is the %dth task\n", number);
      }
  };
  int main()
  {
      task *ta;
      threadpool pool(10, 15);
  //    pool.start();
      for(int i = 0; i < 20; ++i)
      {
   ta = new task(i);
  // sleep(2);
   pool.append_task(ta);
      }
      pool.start();
      sleep(10);
      printf("close the thread pool\n");
      pool.stop();
      pause();
      return 0;
  }
  经测试,线程池可以正常使用。
 
  下一篇博客,使用线程池来实现回射服务器,测试可以达到多大的并发量。
 

(编辑:我爱制作网_沈阳站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!