加入收藏 | 设为首页 | 会员中心 | 我要投稿 我爱制作网_沈阳站长网 (https://www.024zz.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

Linux C实现线程池

发布时间:2022-10-04 13:42:15 所属栏目:Linux 来源:
导读:  主要目的

  主要是在linux下C语言代码实现线程池,关于链表的操作,多线程,以及多线程锁、条件变量等知识点请自行参考其他博客。

  线程池 使用场景

  高性能服务器处理大量客户端的情景,比
  主要目的
 
  主要是在linux下C语言代码实现线程池,关于链表的操作,多线程,以及多线程锁、条件变量等知识点请自行参考其他博客。
 
  线程池 使用场景
 
  高性能服务器处理大量客户端的情景,比如火车售票系统,购物网,炒股网站等。
 
  为什么使用线程池
 
  想想在一个百万级客户端使用的服务器,客户集中在某个时刻访问服务器,服务器是否在某一时刻为所有客户开启一个线程去处理任务,显然不现实,比如1W个客户同时访问服务器,以posix来说,每个线程需要的内存资源在8M左右,那么1w个需要多少内存?
 
  线程池作用
 
  前文说了为什么要使用线程池,那么线程池的主要作用除了上面说的避免线程太多,导致服务器内存耗尽。那么另外创建线程的另外2个作用是:避免创建于销毁线程的代价和任务与执行分离的作用
 
  或许有的朋友对任务与执行分离感到疑惑,那么举个简单例子吧。对于游戏服务器来说,某个时刻有大量客户登录,这时候需要将所有的客户的登录信息(登录时间等)记录下来,这些信息是需要记录到数据库文件的。我们知道,磁盘写入和内存读写相比,是很慢的,那么我们只需要在主线程告诉某个客户登录了,然后将客户登录的信息记录到数据库由线程池来实现写入。这就是任务与执行分离的一个例子。
 
  生活例子来说明线程池
 
  我们以银行办理业务来说明线程池线程池linux,通过这个例子我们可以知道线程池的主要数据结构。
 
  银行为客户办理业务的过程中,主要有客户任务、柜员、公示牌(排队号)3个角色,客户是办理业务的,因此对应于线程要执行的任务,柜员为客户服务,即柜员是线程池中线程的概念,公示牌的作用是连接柜员与办理业务的桥梁,主要的作用是柜员呼叫客户的标志。在深入一点,我们将客户的任务当做线程中的临界资源,所有的柜员都有机会去为某个客户办理服务, 这取决于柜员当前是否处于忙碌中,我们可以将柜员的服务过程用伪代码来表示
 
  while(1)
  {
   加锁
   while(当前没有客户)
   {
   释放锁,让公示牌有机会去增加客户号码
   (如果这里不释放锁,那么就会造成死锁)
  
   摸鱼中....
    
   当银行来客户了,那么柜员就呼叫客户去服务
   (这里要注意,是所有柜员去争夺这个客户,
   当然现实柜员没有那么认真,一个柜员去服务即可,
   想象力更丰富一点,洗脚店...多个技师就你一个客户...)
       
       如果柜员中途有事或者被银行经理叫走,
       那么应该直接将自己服务的标志设置为停止,
       并且将自己从线程池中取出,直接退出循环
       
       加锁(我抢到了这个客户,其他柜员不允许为其服务)  
   }
  
   从任务列表中取出一个任务(该客户的号码不会
      出现等待队列中,被其他柜员呼叫)
      
   释放锁
   为取出的任务服务。
  }
  释放自身的线程资源
  不知道你们对上面的伪代码是否能理解上文的代码,是否可以根据上文的描述,知道使用C语言的相关代码来描述柜员(即线程)的主要逻辑。
 
  通过上面的描述,我们知道了得到了线程池中主要需要3个结构体:
 
  (1) 执行队列(线程的概念): 对应柜员
 
  (2)任务队列: 对应客户任务
 
  (3)管理组件(线程池):对应公示牌
 
  先来的客户先服务,我们将所有的客户进行排队,因此想到任务结构体应该使用队列的形式来表示,所有的柜员使用链表来表示,为了代码的简单性,将柜员和客户都使用双向链表表示,公示牌是连接二者的桥梁,因此应该具有线程和任务,在数据结构中即是将2个对象放入到线程池中,在后面的代码很容易理解。另外,银行服务分为VIP客户和普通客户,那么我们知道柜员被分为2类,即有2种线程池,一类线程池(服务VIP的柜员),一类线程池(服务普通客户的柜员),因此在线程的结构中还应该有线程池的对象。因此如果任务有优先级的特性,还应该使用多种线程池(这是自我理解,可能是错误的)
 
  线程池主要API
 
  一般线程池在一个软件中,是作为基础组件为上层服务的,那么实现一个线程池需要实现哪些API供上层调用呢?
 
  (1)首先,我们从线程池的名字可以看出,线程池是由一些线程构成的,通常,我们在使用线程之前,需要创建线程,那么肯定线程池也少不了初始化的功能,其包括了创建线程的功能,初始化锁和条件变量等
 
  (2)其次,线程池的主要功能是告诉线程有任务执行,因此必须要有一个接口用于插入任务的接口。
 
  (3)有创建,那当然少不了释放线程池的接口
 
  其次,像获取线程的个数,或者空闲线程都是一些为线程池锦上添花的一些功能,在我们的代码中没有实现。
 
  代码实现
 
  #include
  #include
  #include
  #include
  //以银行办理业务来说明
  //柜员的处理--对应线程(执行)逻辑
  //客户的任务--对应任务
  //所有柜员-对应执行队列
  //所有客户的任务-对应任务队列
  //要明白客户的任务相对于柜员是临界资源,柜员叫号就相当于去争取资源。
  //公示牌-对应线程池(通知某个客户到某个柜员处办理业务-
  //柜员主要去拉客户的任务来执行
  //--对应任务的线程池来说就是某一个线程会在某一时刻去执行任务
  //执行队列
  typedef struct NWORKER
  {
      pthread_t id;  //线程id(用来控制线程)
      int terminate; //是否停止的标志
      struct NWORKER *prev;
      struct NWORKER *next;
      struct NTHREADPOLL *pool; //线程所属的线程池
  } nworker;
  //任务队列
  typedef struct NJOB
  {
      void *user_data;用来做任务的参数
      void (*job_func)(struct NJOB *job);//任务执行的函数指针
      struct NJOB *prev;
      struct NJOB *next;
  } njob;
  //(管理组件)线程池
  typedef struct NTHREADPOLL
  {
      pthread_mutex_t mutex;  //线程锁(为了使任务有序的执行)
      //条件变量(等待任务的到来的变量,如果没有任务时,会释放锁,
      //有任务时,又会去争夺任务)
      pthread_cond_t cond;    
      struct NWORKER *workers; //执行队列(指向首节点)
      struct NJOB *njobs;      //任务队列(指向首节点)
  } nthreadpoll;
  //头插法-插入结点
  //(list)加()是保证*的优先于->,否则不会得到正确的结果
  #define LL_ADD(item, list)     \
      do                         \
      {                          \
          item->prev = NULL;     \
          item->next = (list);     \
          if ((list) != NULL)      \
              (list)->prev = item; \
          (list) = item;           \
      } while (0);
  //删除结点(不释放内存)
  #define LL_REMOVE(item, list)              \
      do                                     \
      {                                      \
          if (item->prev != NULL)            \
              item->prev->next = item->next; \
          if (item->next != NULL)            \
              item->next->prev = item->prev; \
          if (item == (list))                  \
              (list) = (list)->next;             \
          item->next = item->prev = NULL;    \
      } while (0);
  /**
   * @description: 线程的回调逻辑
   *(银行柜员的工作逻辑:有客户就为客户执行任务,没有客户就等待客户来)
   * @param {*arg worker}
   * @return {*}
   */
  void *thread_callback(void *arg)
  {
      nworker *worker = (nworker *)arg;
      while (1)
      {
          pthread_mutex_lock(&worker->pool->mutex);
          while (worker->pool->njobs == NULL)
          { //没有执行的任务
              if (worker->terminate)
                  break;  
              pthread_cond_wait(&worker->pool->cond,
              &worker->pool->mutex);
          }
          //柜员下班或者中途有事,释放自己占有的资源(客户的任务)
          //让其他柜员去执行
          if (worker->terminate)//
          {
              pthread_mutex_unlock(&worker->pool->mutex);
              break;
          }
          struct NJOB *job = worker->pool->njobs;//取出队列首任务
          if(job)
          {
              LL_REMOVE(job, worker->pool->njobs);
          }
          pthread_mutex_unlock(&worker->pool->mutex);
          //暂时不清楚这里为啥还要判断!
          //老师讲的是如果有一个任务,有多个线程去争夺,
          //可能njob *job = worker->poll->njobs取出来时为空
          //那么pthread_cond_wait在释放锁时,资源被其他线程争夺了,
          //任务队列再次为空,
          //那么上面的while (!worker->poll->njobs)会往下走吗?
          //难道是pthread_cond_wait还没加锁
          //完成时,这时候while循环判断不为空,
          //其他线程执行了 njob *job = worker->poll->njobs,这样
          //导致该线程执行 njob *job = worker->poll->njobs为空?
          if(!job) continue;
          执行任务(这里的job一定不会被其他线程获取到,
          //因为前面使用LL_REMOVE从任务队列中删除了)
          job->job_func(job);//参数也是自己
      }
      free(worker);//释放线程内存空间
  }
  /**
   * @description:
   * @param {poll 要创建的线程池对象}
   * @param {thread_num 要创建的线程数量}
   * @return {成功创建线程的个数,小于0为错误}}
   */
  int pthreadpool_create(nthreadpoll *pool, int thread_num)
  {
      //参数判断
      if (!pool)
          return -1;
      memset(pool, 0, sizeof(nthreadpoll)); //
      if (thread_num < 1)
          thread_num = 1;
      //初始化poll参数
      // cond
      pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
      memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
      //metex
      pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
      memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
      //构造线程
      int idx;
      for (idx = 0; idx < thread_num; idx++)
      {
          nworker *worker = (nworker *)malloc(sizeof(nworker));
          if (worker == NULL)
          {
              perror("malloc worker error!");
              return idx;
          }
          memset(worker, 0, sizeof(nworker));
          worker->pool = pool;
          int ret = pthread_create(&worker->id, NULL,
          thread_callback, worker);
          if (ret)
          {
              perror("pthread create error!");
               //释放最后一个分配poll失败的空间,
               //前面的线程都创建成功了,不用释放空间。
              free(worker);
              return idx;   //
          }
          LL_ADD(worker, pool->workers);
      }
      return idx;
  }
  //往线程池丢任务
  int pthreadpool_push_task(nthreadpoll *pool, njob *njob)
  {
      pthread_mutex_lock(&pool->mutex);
      LL_ADD(njob, pool->njobs);
      //通知等待的线程,已经有任务可以执行了
      pthread_cond_signal(&pool->cond);
      pthread_mutex_unlock(&pool->mutex);
      return 0;
  }
  //释放线程池资源
  int pthreadpool_destory(nthreadpoll *pool)
  {
      nworker *worker = NULL;
      for(worker = pool->workers; worker != NULL;
       worker = pool->workers->next)
      {
          worker->terminate = 1;
      }
      pthread_mutex_lock(&pool->mutex);
      
      //广播给所有线程,告诉他们应该停止工作,释放自己的空间
      pthread_cond_broadcast(&pool->cond);
      pthread_mutex_unlock(&pool->mutex);
      return 0;
  }
  //debug 以下是测试代码
  #define TASK_COUNT 1000
  //要完成的任务
  void counter(struct NJOB *job)
  {
      if (job == NULL) return ;
   int idx = *(int*)job->user_data;
   printf("idx : %d, selfid: %lu\n", idx, pthread_self());
   free(job->user_data);
   free(job);
  }
  int main(int argc, char *argv[])
  {
      int thread_num = 50;
      nthreadpoll pool = {0};
      pthreadpool_create(&pool, thread_num);
      int idx;
      for(idx = 0; idx < TASK_COUNT; ++idx)
      {   
          njob *job = (njob *)malloc(sizeof(njob));
          if(job == NULL) exit(0);
          job->job_func = counter;
          //任务的参数需要在其他函数中使用,需要在堆上分配内存
          job->user_data = malloc(sizeof(int));
   *(int *)(job->user_data) = idx;//任务编号
           pthreadpool_push_task(&pool, job);
      }
      getchar();
      pthreadpool_destory(&pool);
      return 0;
  }
  代码说明
 
  代码来源:腾讯课堂-零声学院king老师(尊重他人成果,不是为了该学院打广告)
 
  个人感觉代码不合理的地方:任务队列使用头插法的双链表感觉不合适,使用队列更合适,或者使用尾插法也可以。否则可能会导致后来的请求被先执行的问题,不过这里保证任务不丢失特性即可
 

(编辑:我爱制作网_沈阳站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!