在现代C++应用程序中,线程池是一种常用的并发编程模式,它允许我们管理一组工作线程,以并行方式执行多个任务,从而提高程序的执行效率和响应速度。在本篇博客中,我将从基础出发,介绍如何构建一个简单而高效的C++线程池,并展示其在实际应用中的使用方式。

线程池的基本概念

线程池是一种预先创建一组工作线程,并在需要时从池中分配线程来执行任务的机制。这样做的好处包括减少线程的创建和销毁开销、提高线程的复用性,以及更好地控制并发执行的线程数量。

线程池的实现

类设计

我们的线程池将包含以下几个主要部分:

  • 任务基类 (Xtask):所有任务都必须继承这个基类,并实现run()方法。

  • 线程池类 (Thread_pool):管理线程和任务,包括初始化线程、执行任务队列等。

  • 任务类(MyTask):模拟实际任务

线程池类的实现

  • 初始化 (Init):设置线程池中的线程数量,并启动这些线程。注意,在这个实现中,Init 方法同时也调用了 start 方法来创建和启动工作线程。这是一种简化处理,但在更复杂的设计中,你可能希望将线程池的初始化和线程的启动分开。

  • 运行 (run):这是每个工作线程执行的方法。它不断从任务队列中取出任务并执行,直到没有更多任务可执行为止(在这个简单的实现中,工作线程实际上会永远循环下去,因为我们没有实现一个明确的停止机制)。

  • 获取任务 (GetTask):这是一个同步方法,用于从任务队列中安全地获取一个任务。它使用互斥锁和条件变量来确保在队列为空时线程会等待,直到有新任务被添加到队列中。

  • 添加任务 (AddTask):向任务队列中添加一个新任务,并通知一个等待的工作线程(如果有的话)。

  • 启动 (start):根据thread_num创建并启动指定数量的工作线程。

  • 析构函数 (~Thread_pool):在析构时,设置thread_num为0以通知工作线程停止执行新任务(尽管在这个简单的实现中,我们并没有在run方法中检查这个条件)。然后,它等待所有工作线程完成当前任务并退出。

任务类 (MyTask):

继承自Xtask,并实现了run方法以执行具体的任务逻辑。在这个例子中,它仅仅是打印出当前线程ID和任务编号,并休眠一秒钟来模拟耗时操作。

具体实现:

1. 定义任务基类

首先,我们需要定义一个任务基类,这个基类将包含一个纯虚函数run,所有具体的任务类都将继承这个基类并实现run函数。

#include <iostream>
#include <memory>
#include <thread>
#include <vector>
#include <list>
#include <mutex>
#include <condition_variable>
#include <chrono>

class Xtask {
public:
    virtual void run() = 0;
    virtual ~Xtask() {}
};

2. 实现线程池

接下来,我们实现线程池类Thread_pool。这个类将包含初始化、添加任务、启动线程池和执行任务的逻辑。

class Thread_pool {
public:
    void Init(int n) {
        std::unique_lock<std::mutex> lock(mtx_);
        std::cout << "init start" << std::endl;
        thread_num = n;
        std::cout << "init end" << std::endl;
    }

    void run() {
        std::unique_lock<std::mutex> lock(mtx_);
        std::cout << "begin threadpool run " << std::this_thread::get_id() << std::endl;

        while (true) {
            auto task = GetTask();
            if (!task)
                continue;

            try {
                task->run();
            } catch (...) {
                std::cout << "catch exception" << std::endl;
            }
        }
    }

    std::shared_ptr<Xtask> GetTask() {
        std::unique_lock<std::mutex> lock(mtx_);
        cv_.wait(lock, [this]{ return !tasks_.empty(); });

        auto task = tasks_.front();
        tasks_.pop_front();
        return task;
    }

    void AddTask(std::shared_ptr<Xtask> task) {
        std::unique_lock<std::mutex> lock(mtx_);
        tasks_.push_back(task);
        cv_.notify_one();
    }

    void start() {
        std::unique_lock<std::mutex> lock(mtx_);
        for (int i = 0; i < thread_num; i++) {
            auto thr = std::make_shared<std::thread>(&Thread_pool::run, this);
            threads_.push_back(thr);
        }
    }

    ~Thread_pool() {
        // 注意:这里没有实现优雅关闭线程池的逻辑
        // 在实际使用中,需要实现适当的关闭逻辑
    }

private:
    int thread_num = 0;
    std::vector<std::shared_ptr<std::thread>> threads_;
    std::list<std::shared_ptr<Xtask>> tasks_;
    std::condition_variable cv_;
    std::mutex mtx_;
};

3. 创建具体任务类

然后,我们定义一个具体的任务类MyTask,它继承自Xtask并实现了run函数。

class MyTask : public Xtask {
public:
    void run() override {
        std::cout << std::this_thread::get_id() << " my task " << name << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    void setName(int n) {
        name = n;
    }

private:
    int name = 0;
};

4. 使用线程池

最后,在main函数中,我们创建一个线程池,初始化它,启动它,并向它添加一些任务。

int main() {
    Thread_pool pool;
    pool.Init(8);
    pool.start();

    for (int i = 0; i < 4; i++) {
        auto task = std::make_shared<MyTask>();
        task->setName(i);
        pool.AddTask(task);
    }

    std::getchar(); // 等待用户