C++线程池:从基础到实践
在现代C++应用程序中,线程池是一种常用的并发编程模式,它允许我们管理一组工作线程,以并行方式执行多个任务,从而提高程序的执行效率和响应速度。在本篇博客中,我将从基础出发,介绍如何构建一个简单而高效的C++线程池,并展示其在实际应用中的使用方式。
线程池的基本概念
线程池是一种预先创建一组工作线程,并在需要时从池中分配线程来执行任务的机制。这样做的好处包括减少线程的创建和销毁开销、提高线程的复用性,以及更好地控制并发执行的线程数量。
线程池的实现
类设计
我们的线程池将包含以下几个主要部分:
任务基类 (
Xtask
):所有任务都必须继承这个基类,并实现run()
方法。线程池类 (
Thread_pool
):管理线程和任务,包括初始化线程、执行任务队列等。任务类(MyTask):模拟实际任务
线程池类的实现
初始化 (
Init
):设置线程池中的线程数量,并启动这些线程。注意,在这个实现中,Init
方法同时也调用了start
方法来创建和启动工作线程。这是一种简化处理,但在更复杂的设计中,你可能希望将线程池的初始化和线程的启动分开。运行 (
run
):这是每个工作线程执行的方法。它不断从任务队列中取出任务并执行,直到没有更多任务可执行为止(在这个简单的实现中,工作线程实际上会永远循环下去,因为我们没有实现一个明确的停止机制)。获取任务 (
GetTask
):这是一个同步方法,用于从任务队列中安全地获取一个任务。它使用互斥锁和条件变量来确保在队列为空时线程会等待,直到有新任务被添加到队列中。添加任务 (
AddTask
):向任务队列中添加一个新任务,并通知一个等待的工作线程(如果有的话)。启动 (
start
):根据thread_num
创建并启动指定数量的工作线程。析构函数 (
~Thread_pool
):在析构时,设置thread_num
为0以通知工作线程停止执行新任务(尽管在这个简单的实现中,我们并没有在run
方法中检查这个条件)。然后,它等待所有工作线程完成当前任务并退出。
任务类 (MyTask
):
继承自Xtask
,并实现了run
方法以执行具体的任务逻辑。在这个例子中,它仅仅是打印出当前线程ID和任务编号,并休眠一秒钟来模拟耗时操作。
具体实现:
1. 定义任务基类
首先,我们需要定义一个任务基类,这个基类将包含一个纯虚函数run
,所有具体的任务类都将继承这个基类并实现run
函数。
#include <iostream>
#include <memory>
#include <thread>
#include <vector>
#include <list>
#include <mutex>
#include <condition_variable>
#include <chrono>
class Xtask {
public:
virtual void run() = 0;
virtual ~Xtask() {}
};
2. 实现线程池
接下来,我们实现线程池类Thread_pool
。这个类将包含初始化、添加任务、启动线程池和执行任务的逻辑。
class Thread_pool {
public:
void Init(int n) {
std::unique_lock<std::mutex> lock(mtx_);
std::cout << "init start" << std::endl;
thread_num = n;
std::cout << "init end" << std::endl;
}
void run() {
std::unique_lock<std::mutex> lock(mtx_);
std::cout << "begin threadpool run " << std::this_thread::get_id() << std::endl;
while (true) {
auto task = GetTask();
if (!task)
continue;
try {
task->run();
} catch (...) {
std::cout << "catch exception" << std::endl;
}
}
}
std::shared_ptr<Xtask> GetTask() {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]{ return !tasks_.empty(); });
auto task = tasks_.front();
tasks_.pop_front();
return task;
}
void AddTask(std::shared_ptr<Xtask> task) {
std::unique_lock<std::mutex> lock(mtx_);
tasks_.push_back(task);
cv_.notify_one();
}
void start() {
std::unique_lock<std::mutex> lock(mtx_);
for (int i = 0; i < thread_num; i++) {
auto thr = std::make_shared<std::thread>(&Thread_pool::run, this);
threads_.push_back(thr);
}
}
~Thread_pool() {
// 注意:这里没有实现优雅关闭线程池的逻辑
// 在实际使用中,需要实现适当的关闭逻辑
}
private:
int thread_num = 0;
std::vector<std::shared_ptr<std::thread>> threads_;
std::list<std::shared_ptr<Xtask>> tasks_;
std::condition_variable cv_;
std::mutex mtx_;
};
3. 创建具体任务类
然后,我们定义一个具体的任务类MyTask
,它继承自Xtask
并实现了run
函数。
class MyTask : public Xtask {
public:
void run() override {
std::cout << std::this_thread::get_id() << " my task " << name << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
void setName(int n) {
name = n;
}
private:
int name = 0;
};
4. 使用线程池
最后,在main
函数中,我们创建一个线程池,初始化它,启动它,并向它添加一些任务。
int main() {
Thread_pool pool;
pool.Init(8);
pool.start();
for (int i = 0; i < 4; i++) {
auto task = std::make_shared<MyTask>();
task->setName(i);
pool.AddTask(task);
}
std::getchar(); // 等待用户