Administrator
发布于 2022-03-28 / 4 阅读
0
0

手写线程池-第一版

手写线程池-第一版

线程池是什么

举个例子,在以前文章又提到过 好几种reactor模式

!单Reactor多线程模式

可以大概分为这几种

单 Reactor 单进程 / 线程: \\接收数据 –> 解析数据 –> 发送数据 \\

单 Reactor 多线程 : 接收数据 –> fd 交给线程 –> 线程全程处理

接收数据 –\> 从fd中抽出数据 –> 数据交给线程

如果接收一次数据,创建一次线程处理完后又销毁线程,这样太消耗cpu资源了,为了改善这个性能,可以先创建好一定数量的线程,线程处理饥渴的模式,有业务需要处理了,就派一个线程去处理,处理完后,又处于饥渴的转态,其实典型的空间换时间的问题,有很设计都是这样,如内存池(减少系统调用的次数),连接池等。

如何设计呢一个池呢? 好像Java有现成的类可以调,这次用C++ 写一个线程池,第一次写,有很多东西没考虑到,以后继续迭代吧

看了一个视频,用c写线程池,大致理解了思路,于是自己 用 c++写一遍( c with class …)

思路:

  • 一个线程池类, 可以初始初始化线程 ,任务队列,锁
  • 工作类, 一个线程对应个工作类,工作类从任务队列中抽取任务
  • 任务类 ,产生一个任务 插入任务队列中
  • 线程锁类, 用的还是 POSIX标准下的互斥锁,不过封装一遍(我看别人写的代码好多都是基本api在封装一遍的,所以我也尝试下), 还有一个条件变量,没任务全部线程等待,有任务 发送信号,通知一个等待的线程获取任务

class pool{
	private:
		list<Job*> Jobs;  //用双向链表 存放任务  应该用队列比较好的
		mylocker *mutelock = new mylocker(); // 初始化一个锁
	public:
         //初始化线程
		pool(int maxNumber){
			for(int i = 0; i < maxNumber; i++){
                Worker *work = new Worker(); //工作类
                work->lock = mutelock;       //全部线程公用一个锁
                work->Jobs = &Jobs;          //任务队列包含    我想着尽量不要用到 全局的变量,全部归到一个类里面
                int ret = pthread_create(&work->tid, NULL, fun, (void *)work);  // 开启线程  还有传参,工作类传进去
                pthread_detach(work->tid);  //线程分离,脱离主程序

			}
		}
		void addjob(Job *job){ //添加一个任务
            mutelock->mutelock(); //此时还是需要锁 保持同步
            if(job->state == run){ //添加了任务的转态,不是需要运行的就加进去
                				  //这里做的不是很好,是不是应该分为待处理,马上处理的任务队列呢,以后在补充
                cout << "添加队列中" << endl;
                Jobs.push_back(job);
                mutelock->condsignal();  //通知线程
            }
            mutelock->muteunlock();
		}
        int jobSize(){
            return Jobs.size();
        }
		~pool(){
            delete(mutelock);
		}
};



//任务类
class Job{
    public:
        int state = run;
        int data;
        void (*job_function)(int fd);  // 这里 添加个回调函数,可以自定义 要怎样的任务类

    public:
        Job(){
			//这里初始化
        }
        void runFun(){
            job_function(data);
        }
};


void *fun(void *arg){
    //cout << "创建成功 " << pthread_self() << endl;
    Worker *work = (Worker *)arg;  //参数类型转换   这个arg参数太好用了!
    while (1){
        work->lock->mutelock();    //条件变量需要 互斥锁
        // cout << "原地待命,等通知" << endl;
        // 队列空的话 等待等通知
        while (work->Jobs->empty()){
            cout << "等待任务队列 " << work->Jobs->empty() << endl;
            work->lock->condwait();
        }
        //走到这说明有任务了,此时还是锁的转态可以保持线程同步
        if(work->getTer() == 1){
            pthread_exit(NULL);
        }
        //从任务队列中 抽一个任务
        Job *job = work->Jobs->front();
        //pop掉任务
        work->Jobs->pop_front();
        work->lock->muteunlock();
        //此时解锁,因为我已经拿到任务里,应该不锁了,好像也不是很安全,如果是对处理的是共享的数据呢....
        cout << pthread_self() << "   data : ";
        job->runFun(); //线程运行任务的回调函数
        delete(job);
    }
}


class Worker{
    private:
		int terminate;
	public:
		Worker(){
			terminate = 0;
		};
		int getTer(){
            return terminate;
        }
        void run(){

        }
	public:
	    pthread_t tid;
        mylocker *lock;
        list<Job*> *Jobs;

};


//封装 互斥锁 和 条件变量
class mylocker{
    private:
        pthread_mutex_t mutexlock;
		pthread_cond_t cond;
    public:
        mylocker(){
            mutexlock = PTHREAD_MUTEX_INITIALIZER;
			cond = PTHREAD_COND_INITIALIZER;
        }
        ~mylocker(){
            pthread_mutex_destroy(&mutexlock);
            pthread_cond_destroy(&cond);
        }
        bool mutelock(){
            return pthread_mutex_lock(&mutexlock) == 0;
        }
        bool muteunlock(){
            return pthread_mutex_unlock(&mutexlock) == 0;
        }
		bool condwait(){
			return pthread_cond_wait(&cond,&mutexlock);
		}
        bool condsignal(){
            return pthread_cond_signal(&cond);
        }
        pthread_mutex_t *get(){
            return &mutexlock;
        }
};


评论