手写线程池-第一版

Last updated on 9 months ago

线程池是什么

举个例子,在以前文章又提到过 好几种reactor模式

单Reactor多线程模式

可以大概分为这几种

单 Reactor 单进程 / 线程: **接收数据 –> 解析数据 –> 发送数据 **

单 Reactor 多线程 : 接收数据 –> fd 交给线程 –> 线程全程处理

​ 接收数据 –> 从fd中抽出数据 –> 数据交给线程

如果接收一次数据,创建一次线程处理完后又销毁线程,这样太消耗cpu资源了,为了改善这个性能,可以先创建好一定数量的线程,线程处理饥渴的模式,有业务需要处理了,就派一个线程去处理,处理完后,又处于饥渴的转态,其实典型的空间换时间的问题,有很设计都是这样,如内存池(减少系统调用的次数),连接池等。

如何设计呢一个池呢? 好像Java有现成的类可以调,这次用C++ 写一个线程池,第一次写,有很多东西没考虑到,以后继续迭代吧

看了一个视频,用c写线程池,大致理解了思路,于是自己 用 c++写一遍( c with class …)

思路:

  • 一个线程池类, 可以初始初始化线程 ,任务队列,锁
  • 工作类, 一个线程对应个工作类,工作类从任务队列中抽取任务
  • 任务类 ,产生一个任务 插入任务队列中
  • 线程锁类, 用的还是 POSIX标准下的互斥锁,不过封装一遍(我看别人写的代码好多都是基本api在封装一遍的,所以我也尝试下), 还有一个条件变量,没任务全部线程等待,有任务 发送信号,通知一个等待的线程获取任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

class pool{
private:
list<Job*> Jobs; //用双向链表 存放任务 应该用队列比较好的
mylocker *mutelock = new mylocker(); // 初始化一个锁
public:
//初始化线程
pool(int maxNumber){
for(int i = 0; i < maxNumber; i++){
Worker *work = new Worker(); //工作类
work->lock = mutelock; //全部线程公用一个锁
work->Jobs = &Jobs; //任务队列包含 我想着尽量不要用到 全局的变量,全部归到一个类里面
int ret = pthread_create(&work->tid, NULL, fun, (void *)work); // 开启线程 还有传参,工作类传进去
pthread_detach(work->tid); //线程分离,脱离主程序

}
}
void addjob(Job *job){ //添加一个任务
mutelock->mutelock(); //此时还是需要锁 保持同步
if(job->state == run){ //添加了任务的转态,不是需要运行的就加进去
//这里做的不是很好,是不是应该分为待处理,马上处理的任务队列呢,以后在补充
cout << "添加队列中" << endl;
Jobs.push_back(job);
mutelock->condsignal(); //通知线程
}
mutelock->muteunlock();
}
int jobSize(){
return Jobs.size();
}
~pool(){
delete(mutelock);
}
};

1
	
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//任务类
class Job{
public:
int state = run;
int data;
void (*job_function)(int fd); // 这里 添加个回调函数,可以自定义 要怎样的任务类

public:
Job(){
//这里初始化
}
void runFun(){
job_function(data);
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

void *fun(void *arg){
//cout << "创建成功 " << pthread_self() << endl;
Worker *work = (Worker *)arg; //参数类型转换 这个arg参数太好用了!
while (1){
work->lock->mutelock(); //条件变量需要 互斥锁
// cout << "原地待命,等通知" << endl;
// 队列空的话 等待等通知
while (work->Jobs->empty()){
cout << "等待任务队列 " << work->Jobs->empty() << endl;
work->lock->condwait();
}
//走到这说明有任务了,此时还是锁的转态可以保持线程同步
if(work->getTer() == 1){
pthread_exit(NULL);
}
//从任务队列中 抽一个任务
Job *job = work->Jobs->front();
//pop掉任务
work->Jobs->pop_front();
work->lock->muteunlock();
//此时解锁,因为我已经拿到任务里,应该不锁了,好像也不是很安全,如果是对处理的是共享的数据呢....
cout << pthread_self() << " data : ";
job->runFun(); //线程运行任务的回调函数
delete(job);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Worker{
private:
int terminate;
public:
Worker(){
terminate = 0;
};
int getTer(){
return terminate;
}
void run(){

}
public:
pthread_t tid;
mylocker *lock;
list<Job*> *Jobs;


};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//封装 互斥锁 和 条件变量
class mylocker{
private:
pthread_mutex_t mutexlock;
pthread_cond_t cond;
public:
mylocker(){
mutexlock = PTHREAD_MUTEX_INITIALIZER;
cond = PTHREAD_COND_INITIALIZER;
}
~mylocker(){
pthread_mutex_destroy(&mutexlock);
pthread_cond_destroy(&cond);
}
bool mutelock(){
return pthread_mutex_lock(&mutexlock) == 0;
}
bool muteunlock(){
return pthread_mutex_unlock(&mutexlock) == 0;
}
bool condwait(){
return pthread_cond_wait(&cond,&mutexlock);
}
bool condsignal(){
return pthread_cond_signal(&cond);
}
pthread_mutex_t *get(){
return &mutexlock;
}
};