加入收藏 | 设为首页 | 会员中心 | 我要投稿 我爱制作网_沈阳站长网 (https://www.024zz.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

linux系统线程池

发布时间:2022-09-26 14:50:01 所属栏目:Linux 来源:
导读:  本文简单介绍了线程池的概念和特点,对线程池的结构体和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序演示了线程池的运行过程。 简述 一个进程中的线程

  本文简单介绍了线程池的概

  本文简单介绍了线程池的概念和特点,对线程池的结构体和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序演示了线程池的运行过程。 简述 一个进程中的线程
 
  本文简单介绍了线程池的概念和特点,对线程池的结构体和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序演示了线程池的运行过程。简述
 
  一个进程中的线程就好比是一家公司里的员工,员工的数目应该根据公司的业务多少来定线程池linux,太少了忙不过来,但是太多了也浪费资源。最理想的情况是让进程有一些初始数目的线程(线程池),当没有任务时这些线程自动进入睡眠,有了任务它们会立即执行任务,不断循环。进程还应根据自身任务的繁重与否来增删线程的数目,当所有的任务都完成之后,所有的线程还能妥当地收官。
 
  有以下几点需要注意:
 
  (1)任务队列中刚开始没有任何任务,是一个具有头结点的空链队列
 
  (2)使用互斥锁来保护这个队列
 
  (3)使用条件变量来代表任务队列中的任务个数的变化,将来如果主线程向队列中投放任务,那么可以通过条件变量来唤醒那么睡着了的线程
 
  (4)通过一个公共开关----shutdown,来控制线程退出,进而销毁整个线程池
 
  接口设计
 
  线程池相关结构体如下表所示。
 
  原型struct task功能描述任务节点,包含需要执行的函数及其参数,通过链表连成一个任务队列成员列表void * (* task)(void *arg);void *arg;struct task *next;备注任务实例最终形成一条单向链表原型thread_pool功能描述线程池实例,包含一个线程池的所有信息成员列表pthread_mutex_t lock; //互斥锁,保护任务队列pthread_cond_t cond; //条件变量,同步所有线程bool shutdown; //线程池销毁标记struct task *task_list; //任务链队列指针pthread_t *tids; //线程ID存放位置unsigned int waiting tasks; //任务链队列中等待的任务个数unsigned int active_threads; //当前活跃线程个数备注活跃线程个数可以修改,但至少有1条活跃线程
 
  下面是线程池的接口说明
 
  (1)线程池初始化:init_pool()
 
  原型bool init_pool(thread_pool *pool, unsigned int threads_number);功能描述创建一个新的线程池,包含threads_number个活跃线程参数pool:线程池指针threads_number:初始活跃线程个数(大于或等于1)返回值成功返回true,失败返回false头文件thread_pool.h备注线程池最少线程个数为1
 
  (2)投放任务:add_task()
 
  原型bool add_task(thread_pool pool, void * (do_task)(void *arg), void *arg);功能描述往线程池投放任务参数pool:线程池指针do_task:投放至线程池的执行例程arg:执行例程do_task的参数,若该执行例程不需要参数可设置为NULL返回值成功返回true,失败返回false头文件thread_pool.h备注任务队列中最大任务个数为MAX_WAITING_TASKS
 
  (3) 增加活跃线程:add_thread()
 
  原型int add_thread(thread_pool *pool, unsigned int additional_threads);功能描述增加线程池中活跃线程的个数参数pool:需要增加线程的线程池指针additional_threads:新增线程个数返回值>0 : 实际新增线程个数-1 :失败头文件thread_pool.h
 
  (4)删除活跃线程:remove_thread()
 
  原型int remove_thread(thread_pool *pool, unsigned int removing threads);功能描述删除线程池中活跃线程的个数参数pool:需要删除线程的线程池指针removing_threads:要删除的线程个数;该参数设置为0时直接返回当前线程池线程总数,对线程池不造成任何其他影响返回值>0 : 当前线程池剩余线程个数-1: 失败头文件thread_pool.h备注线程池至少会存在1条活跃线程如果被删除的线程正在执行任务,就等待其完成任务之后删除
 
  (5)销毁线程池:destroy_pool()
 
  原型bool destroy_pool(thread_pool *pool);功能描述阻塞等待所有任务完成,然后立即销毁整个线程池,释放所有资源和内存参数pool:将要销毁的线程池返回值成功返回true,失败返回false头文件thread_pool.h源代码实现
 
  头文件thread_pool.h
 
  //////////////////////////////////////////////////////////////////
  //  
  //  Description: 本文件包含了线程池基本结构体定义,以及各个操作函
  //               数的声明
  //
  //////////////////////////////////////////////////////////////////
  #ifndef _THREAD_POOL_H_
  #define _THREAD_POOL_H_
  #include
  #include
  #include
  #include
  #include
  #include
  #include
  #include
  #define MAX_WAITING_TASKS 1000
  #define MAX_ACTIVE_THREADS 20
  struct task
  {
   void *(*task)(void *arg);
   void *arg;
   struct task *next;
  };
  typedef struct thread_pool
  {
   pthread_mutex_t lock;
   pthread_cond_t  cond;
   struct task *task_list;
   pthread_t *tids;
   unsigned waiting_tasks;
   unsigned active_threads;
   bool shutdown;
  }thread_pool;
  bool
  init_pool(thread_pool *pool,
            unsigned int threads_number);
  bool
  add_task(thread_pool *pool,
           void *(*task)(void *arg),
           void *arg);
  int
  add_thread(thread_pool *pool,
             unsigned int additional_threads_number);
  int
  remove_thread(thread_pool *pool,
                unsigned int removing_threads_number);
  bool destroy_pool(thread_pool *pool);
  void *routine(void *arg);
  #endif
  接口实现:thread_pool.c
 
  //////////////////////////////////////////////////////////////////
  //  Description: 本文件包含了线程池操作函数的定义
  //////////////////////////////////////////////////////////////////
  #include "thread_pool.h"
  void handler(void *arg)
  {
   pthread_mutex_unlock((pthread_mutex_t *)arg);
  }
  void *routine(void *arg)
  {
   thread_pool *pool = (thread_pool *)arg;
   struct task *p;
   while(1)
   {
   pthread_cleanup_push(handler, (void *)&pool->lock);
   pthread_mutex_lock(&pool->lock);
   while(pool->waiting_tasks == 0 && !pool->shutdown)
   {
   pthread_cond_wait(&pool->cond, &pool->lock);
   }
   if(pool->waiting_tasks == 0 && pool->shutdown == true)
   {
   pthread_mutex_unlock(&pool->lock);
   pthread_exit(NULL);
   }
   p = pool->task_list->next;
   pool->task_list->next = p->next;
   pool->waiting_tasks--;
   pthread_mutex_unlock(&pool->lock);
 
   pthread_cleanup_pop(0);
   pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
   (p->task)(p->arg);
   pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
   free(p);
   }
   pthread_exit(NULL);
  }
  bool init_pool(thread_pool *pool, unsigned int threads_number)
  {
   pthread_mutex_init(&pool->lock, NULL);
   pthread_cond_init(&pool->cond, NULL);
   pool->shutdown = false;
   pool->task_list = malloc(sizeof(struct task));
   pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
   if(pool->task_list == NULL || pool->tids == NULL)
   {
   perror("allocate memory error");
   return false;
   }
   pool->task_list->next = NULL;
   pool->waiting_tasks = 0;
   pool->active_threads = threads_number;
   int i;
   for(i=0; iactive_threads; i++)
   {
   if(pthread_create(&((pool->tids)[i]), NULL,
   routine, (void *)pool) != 0)
   {
   perror("create threads error");
   return false;
   }
   }
   return true;
  }
  bool add_task(thread_pool *pool,
   void *(*task)(void *arg), void *arg)
  {
   struct task *new_task = malloc(sizeof(struct task));
   if(new_task == NULL)
   {
   perror("allocate memory error");
   return false;
   }
   new_task->task = task;
   new_task->arg = arg;
   new_task->next = NULL;
   pthread_mutex_lock(&pool->lock);
   if(pool->waiting_tasks >= MAX_WAITING_TASKS)
   {
   pthread_mutex_unlock(&pool->lock);
   fprintf(stderr, "too many tasks.\n");
   free(new_task);
   return false;
   }
  
   struct task *tmp = pool->task_list;
   while(tmp->next != NULL)
   tmp = tmp->next;
   tmp->next = new_task;
   pool->waiting_tasks++;
   pthread_mutex_unlock(&pool->lock);
   pthread_cond_signal(&pool->cond);
   return true;
  }
  int add_thread(thread_pool *pool, unsigned additional_threads)
  {
   if(additional_threads == 0)
   return 0;
   unsigned total_threads =
        pool->active_threads + additional_threads;
   int i, actual_increment = 0;
   for(i = pool->active_threads;
       i < total_threads && i < MAX_ACTIVE_THREADS;
       i++)
   {
   if(pthread_create(&((pool->tids)[i]),
   NULL, routine, (void *)pool) != 0)
   {
   perror("add threads error");
   if(actual_increment == 0)
   return -1;
   break;
   }
   actual_increment++;
   }
   pool->active_threads += actual_increment;
   return actual_increment;
  }
  int remove_thread(thread_pool *pool, unsigned int removing_threads)
  {
   if(removing_threads == 0)
   return pool->active_threads;
   int remain_threads = pool->active_threads - removing_threads;
   remain_threads = remain_threads>0 ? remain_threads:1;
   int i;
   for(i=pool->active_threads-1; i>remain_threads-1; i--)
   {
   errno = pthread_cancel(pool->tids[i]);
   if(errno != 0)
   break;
   }
   if(i == pool->active_threads-1)
   return -1;
   else
   {
   pool->active_threads = i+1;
   return i+1;
   }
  }
 
  bool destroy_pool(thread_pool *pool)
  {
   pool->shutdown = true;
   pthread_cond_broadcast(&pool->cond);
   int i;
   for(i=0; iactive_threads; i++)
   {
   errno = pthread_join(pool->tids[i], NULL);
   if(errno != 0)
   {
   printf("join tids[%d] error: %s\n",
   i, strerror(errno));
   }
   else
   printf("[%u] is joined\n", (unsigned)pool->tids[i]);
  
   }
   free(pool->task_list);
   free(pool->tids);
   free(pool);
   return true;
  }
  测试程序
 
  main.c
 
  //////////////////////////////////////////////////////////////////
  //  Description: 一个使用了线程池的示例
  //////////////////////////////////////////////////////////////////
  #include "thread_pool.h"
  void *mytask(void *arg)
  {
   int n = (int)arg;
   printf("[%u][%s] ==> job will be done in %d sec...\n",
   (unsigned)pthread_self(), __FUNCTION__, n);
   sleep(n);
   printf("[%u][%s] ==> job done!\n",
   (unsigned)pthread_self(), __FUNCTION__);
   return NULL;
  }
  void *count_time(void *arg)
  {
   int i = 0;
   while(1)
   {
   sleep(1);
   printf("sec: %d\n", ++i);
   }
  }
  int main(void)
  {
   pthread_t a;
   pthread_create(&a, NULL, count_time, NULL);
   // 1, initialize the pool
   thread_pool *pool = malloc(sizeof(thread_pool));
   init_pool(pool, 2);
   // 2, throw tasks
   printf("throwing 3 tasks...\n");
   add_task(pool, mytask, (void *)(rand()));
   add_task(pool, mytask, (void *)(rand()));
   add_task(pool, mytask, (void *)(rand()));
   // 3, check active threads number
   printf("current thread number: %d\n",
   remove_thread(pool, 0));
   sleep(9);
   // 4, throw tasks
   printf("throwing another 2 tasks...\n");
   add_task(pool, mytask, (void *)(rand()));
   add_task(pool, mytask, (void *)(rand()));
   // 5, add threads
   add_thread(pool, 2);
   sleep(5);
   // 6, remove threads
   printf("remove 3 threads from the pool, "
          "current thread number: %d\n",
   remove_thread(pool, 3));
   // 7, destroy the pool
   destroy_pool(pool);
   return 0;
  }
  运行结果
 
  zzc@zzc-virtual-machine:~/share/example/thread$ ./test
  throwing 3 tasks...
  current thread number: 2
  [3981727488][mytask] ==> job will be done in 3 sec...
  [3990120192][mytask] ==> job will be done in 6 sec...
  sec: 1
  sec: 2
  [3981727488][mytask] ==> job done!
  [3981727488][mytask] ==> job will be done in 7 sec...
  sec: 3
  sec: 4
  sec: 5
  [3990120192][mytask] ==> job done!
  sec: 6
  sec: 7
  sec: 8
  throwing another 2 tasks...
  [3990120192][mytask] ==> job will be done in 5 sec...
  [3892311808][mytask] ==> job will be done in 3 sec...
  sec: 9
  [3981727488][mytask] ==> job done!
  sec: 10
  sec: 11
  [3892311808][mytask] ==> job done!
  sec: 12
  sec: 13
  remove 3 threads from the pool, current thread number: 1
  [3990120192][mytask] ==> job done!
  [3990120192] is joined
  延伸
 
  实际使用的时候,只需将上述代码中的mytask函数改成我们需要实现的功能函数即可。
 
  总结
 
  本文简单介绍了线程池的概念和特点,对线程池的结构体和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序演示了线程池的运行过程。
 

(编辑:我爱制作网_沈阳站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!