1 什么是线程池

线程池从本质上可以看做是一个多生产者多消费者的多线程应用。

一个线程池包括以下四个基本组成部分:

  • 线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新的工作线程,添加工作任务;
  • 工作线程:属于线程池中的线程,用于处理实际任务,在没有工作任务时等待,在任务队列不为空时主动获取任务并处理任务;
  • 任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行;
  • 工作任务队列:用于存放需要处理的工作任务,采用先进先出机制;

线程池根据机器性能预先创建多个工作线程,位于主线程的线程池接收到工作任务并存入到工作任务队列中,工作线程从工作队列中取出工作任务进行处理,如果工作队列为空,则工作线程进入挂起状态。

2 C++实现一个线程池

在C++中实现一个线程池,通过对线程池的特性分析,线程池主要有以下功能:

  • 线程池可以创建给定数量的工作线程,工作线程执行在子线程中开启任务函数,并等待工作队列的新任务
  • 线程池可主动关闭线程池并结束所创建的工作线程;
  • 线程池对象可被多个工作线程互斥访问,我们可以将线程池看作为一个单例,而这个单例可满足被多个线程互斥访问,这需要实现一个线程安全的单例模式;
  • 需要实现一个线程安全的队列,在队列中有新任务压入时通知工作线程领取工作任务,当队列为空时阻塞线程,避免资源开销;并支持主动解锁。

2.1 线程安全的单例类

Singleton.h

#ifndef SINGLETON_H
#define SINGLETON_H

#include <memory>
#include <mutex>


template<typename T>
class Singleton
{
public:
    // 获取全局单例对象
    template<typename ...Args>
    static std::shared_ptr<T> GetInstance(Args&&... args)
    {
        if (!m_pSingleton)
        {
            std::lock_guard<std::mutex> gLock(m_Mutex);

            if (nullptr == m_pSingleton)
            {
                m_pSingleton = std::make_shared<T>(std::forward<Args>(args)...);
            }
        }
        return m_pSingleton;
    }

    // 主动析构单例对象(提供接口,但是不建议主动调用)
    static void DeleteInstance()
    {
        if (m_pSingleton != nullptr)
        {
            m_pSingleton.reset();
            m_pSingleton = nullptr;
        }
    }

private:
    explicit Singleton();
    Singleton(const Singleton&) = delete;
    Singleton& operator=(const Singleton&) = delete;
    ~Singleton() = default;

private:
    static std::shared_ptr<T> m_pSingleton;
    static std::mutex m_Mutex;
};

template<typename T>
std::shared_ptr<T> Singleton<T>::m_pSingleton = nullptr;

template<typename T>
std::mutex Singleton<T>::m_Mutex;

#endif

2.2 线程安全的队列类

参考项目链接:https://github.com/alfredopons/queue-thread-safe

并对该线程安全类进行了魔改,主要增加了不管当前队列是否有工作任务,都可强制退出的新功能,方便线程池退出。

SafeQueue.hpp

/*
 * SafeQueue.hpp
 * Copyright (C) 2019 Alfredo Pons Menargues <apons@linucleus.com>
 *
 * This program is free software: you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 * See the GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
#ifndef SAFEQUEUE_HPP_
#define SAFEQUEUE_HPP_

#include <queue>
#include <list>
#include <mutex>
#include <thread>
#include <cstdint>
#include <condition_variable>



/** A thread-safe asynchronous queue */
template <class T, class Container = std::list<T>>
class SafeQueue
{

    typedef typename Container::value_type value_type;
    typedef typename Container::size_type size_type;
    typedef Container container_type;

  public:

    /*! Create safe queue. */
    SafeQueue() = default;
    SafeQueue (SafeQueue&& sq)
    {
      m_queue = std::move (sq.m_queue);
    }
    SafeQueue (const SafeQueue& sq)
    {
      std::lock_guard<std::mutex> lock (sq.m_mutex);
      m_queue = sq.m_queue;
    }

    /*! Destroy safe queue. */
    ~SafeQueue()
    {
      std::lock_guard<std::mutex> lock (m_mutex);
    }

    /**
     * Sets the maximum number of items in the queue. Defaults is 0: No limit
     * \param[in] item An item.
     */
    void set_max_num_items (unsigned int max_num_items)
    {
      m_max_num_items = max_num_items;
    }

    /**
     *  Pushes the item into the queue.
     * \param[in] item An item.
     * \return true if an item was pushed into the queue
     */
    bool push (const value_type& item)
    {
      std::lock_guard<std::mutex> lock (m_mutex);

      if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
        return false;

      m_queue.push (item);
      m_condition.notify_one();
      return true;
    }

    /**
     *  Pushes the item into the queue.
     * \param[in] item An item.
     * \return true if an item was pushed into the queue
     */
    bool push (const value_type&& item)
    {
      std::lock_guard<std::mutex> lock (m_mutex);

      if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
        return false;

      m_queue.push (item);
      m_condition.notify_one();
      return true;
    }

    /**
     *  Pops item from the queue. If queue is empty, this function blocks until item becomes available.
     * \param[out] item The item.
     */
    void pop (value_type& item)
    {
      std::unique_lock<std::mutex> lock (m_mutex);
      m_condition.wait (lock, [this]() // Lambda funct
      {
        return !m_queue.empty() || m_Unlock;
      });

      if (!m_queue.empty())
      {
          item = m_queue.front();
          m_queue.pop();
      }

    }

    /**
     *  Pops item from the queue using the contained type's move assignment operator, if it has one..
     *  This method is identical to the pop() method if that type has no move assignment operator.
     *  If queue is empty, this function blocks until item becomes available.
     * \param[out] item The item.
     */
    void move_pop (value_type& item)
    {
      std::unique_lock<std::mutex> lock (m_mutex);
      m_condition.wait (lock, [this]() // Lambda funct
      {
        return !m_queue.empty() || m_Unlock;
      });

      if (!m_queue.empty())
      {
          item = std::move(m_queue.front());
          m_queue.pop();
      }

    }

    /**
     *  Tries to pop item from the queue.
     * \param[out] item The item.
     * \return False is returned if no item is available.
     */
    bool try_pop (value_type& item)
    {
      std::unique_lock<std::mutex> lock (m_mutex);

      if (m_queue.empty())
        return false;

      item = m_queue.front();
      m_queue.pop();
      return true;
    }

    /**
     *  Tries to pop item from the queue using the contained type's move assignment operator, if it has one..
     *  This method is identical to the try_pop() method if that type has no move assignment operator.
     * \param[out] item The item.
     * \return False is returned if no item is available.
     */
    bool try_move_pop (value_type& item)
    {
      std::unique_lock<std::mutex> lock (m_mutex);

      if (m_queue.empty())
        return false;

      item = std::move (m_queue.front());
      m_queue.pop();
      return true;
    }

    /**
     *  Pops item from the queue. If the queue is empty, blocks for timeout microseconds, or until item becomes available.
     * \param[out] t An item.
     * \param[in] timeout The number of microseconds to wait.
     * \return true if get an item from the queue, false if no item is received before the timeout.
     */
    bool timeout_pop (value_type& item, std::uint64_t timeout)
    {
      std::unique_lock<std::mutex> lock (m_mutex);

      if (m_queue.empty())
        {
          if (timeout == 0)
            return false;

          if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
            return false;
        }

      item = m_queue.front();
      m_queue.pop();
      return true;
    }

    /**
     *  Pops item from the queue using the contained type's move assignment operator, if it has one..
     *  If the queue is empty, blocks for timeout microseconds, or until item becomes available.
     *  This method is identical to the try_pop() method if that type has no move assignment operator.
     * \param[out] t An item.
     * \param[in] timeout The number of microseconds to wait.
     * \return true if get an item from the queue, false if no item is received before the timeout.
     */
    bool timeout_move_pop (value_type& item, std::uint64_t timeout)
    {
      std::unique_lock<std::mutex> lock (m_mutex);

      if (m_queue.empty())
        {
          if (timeout == 0)
            return false;

          if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
            return false;
        }

      item = std::move (m_queue.front());
      m_queue.pop();
      return true;
    }

    /**
     *  Gets the number of items in the queue.
     * \return Number of items in the queue.
     */
    size_type size() const
    {
      std::lock_guard<std::mutex> lock (m_mutex);
      return m_queue.size();
    }

    /**
     *  Check if the queue is empty.
     * \return true if queue is empty.
     */
    bool empty() const
    {
      std::lock_guard<std::mutex> lock (m_mutex);
      return m_queue.empty();
    }

    /**
     *  Swaps the contents.
     * \param[out] sq The SafeQueue to swap with 'this'.
     */
    void swap (SafeQueue& sq)
    {
      if (this != &sq)
        {
          std::lock_guard<std::mutex> lock1 (m_mutex);
          std::lock_guard<std::mutex> lock2 (sq.m_mutex);
          m_queue.swap (sq.m_queue);

          if (!m_queue.empty())
            m_condition.notify_all();

          if (!sq.m_queue.empty())
            sq.m_condition.notify_all();
        }
    }

    /*! The copy assignment operator */
    SafeQueue& operator= (const SafeQueue& sq)
    {
      if (this != &sq)
        {
          std::lock_guard<std::mutex> lock1 (m_mutex);
          std::lock_guard<std::mutex> lock2 (sq.m_mutex);
          std::queue<T, Container> temp {sq.m_queue};
          m_queue.swap (temp);

          if (!m_queue.empty())
            m_condition.notify_all();
        }

      return *this;
    }

    /*! The move assignment operator */
    SafeQueue& operator= (SafeQueue && sq)
    {
      std::lock_guard<std::mutex> lock (m_mutex);
      m_queue = std::move (sq.m_queue);

      if (!m_queue.empty())  m_condition.notify_all();

      return *this;
    }

    void lock()
    {
        m_Unlock = false;
    }

    void unlock()
    {
        m_Unlock = true;
        m_condition.notify_all();
    }

    void clear()
    {
        std::lock_guard<std::mutex> lock(m_mutex);

        while (!m_queue.empty())
        {
            m_queue.pop();
        }
    }

  private:

    std::queue<T, Container> m_queue;
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;
    unsigned int m_max_num_items = 0;
    std::atomic<bool> m_Unlock = false;
};

/*! Swaps the contents of two SafeQueue objects. */
template <class T, class Container>
void swap (SafeQueue<T, Container>& q1, SafeQueue<T, Container>& q2)
{
  q1.swap (q2);
}
#endif /* SAFEQUEUE_HPP_ */

2.3 线程池

ThreadPool.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <thread>
#include <atomic>

#include "SafeQueue.hpp"
#include "ThreadWorker.h"
#include "Singleton.h"

class ThreadPool
{
public:
    ThreadPool():
        m_bThreadPoolStop(false)
    {

    }

    static std::shared_ptr<ThreadPool> GetSingleton()
    {
        return Singleton<ThreadPool>::GetInstance();
    }

    void CreateThreads(unsigned int worker_num)
    {
        for (int i = 0; i < worker_num; i++)
        {
            std::shared_ptr<ThreadWorker> pThreadWorker = std::make_shared<ThreadWorker>(i);

            std::shared_ptr<std::thread> pThread = std::make_shared<std::thread>(&ThreadWorker::Run, pThreadWorker);

            m_ThreadWorkers.push_back(pThreadWorker);

            m_Threads.push_back(pThread);
        }
    }

    virtual~ThreadPool()
    {
        m_TaskQueue.clear();
    }

    void AddTask(const std::string& task_str)
    {
        m_TaskQueue.push(task_str);
    }

    bool IsStop()
    {
        return m_bThreadPoolStop;
    }

    void Stop()
    {
        m_bThreadPoolStop = true;
        m_TaskQueue.unlock();

        for (int i = 0; i < m_Threads.size(); ++i)
        {
            if (m_Threads[i]->joinable())
            {
                m_Threads[i]->join();
            }
        }
    }

    SafeQueue<std::string>& GetTaskQueue()
    {
        return m_TaskQueue;
    }

private:
    std::atomic<bool> m_bThreadPoolStop;
    SafeQueue<std::string> m_TaskQueue;
    std::vector<std::shared_ptr<ThreadWorker>> m_ThreadWorkers;
    std::vector<std::shared_ptr<std::thread>> m_Threads;

};

#endif // !THREAD_POOL_H

2.4 工作线程

2.4.1 ThreadWorker.h

#ifndef THREAD_WORKER_H
#define THREAD_WORKER_H

class ThreadPool;
class ThreadWorker
{
public:
    ThreadWorker();

    ThreadWorker(int thread_index);

    virtual~ThreadWorker();

    void Run();

private:
    int m_ThreadIndex;
};

#endif // !THREAD_WORKER_H

2.4.2 ThreadWorker.cpp

#include <iostream>
#include "ThreadWorker.h"
#include "ThreadPool.h"

ThreadWorker::ThreadWorker()
{
}

ThreadWorker::ThreadWorker(int thread_index)
    :m_ThreadIndex(-1)
{
    m_ThreadIndex = thread_index;

    std::cout << "线程" << m_ThreadIndex << "被创建" << std::endl;
}

ThreadWorker::~ThreadWorker()
{
}

void ThreadWorker::Run()
{
    while (!ThreadPool::GetSingleton()->IsStop())
    {
        std::string task_str = "";
        ThreadPool::GetSingleton()->GetTaskQueue().pop(task_str);
        if (!task_str.empty())
        {
            std::cout << "线程" << m_ThreadIndex << "处理了" << task_str << std::endl;
        }       
    }

    std::cout << "子线程"<<m_ThreadIndex<< "退出" << std::endl;
}

2.5 线程池测试代码

main.cpp

#include <vld.h>
#include <iostream>
#include <string>

#include "ThreadPool.h"


int main()
{


    ThreadPool::GetSingleton()->CreateThreads(10);

    std::thread prod_thread1([&]() {
        for(int i=0;i<3000;i++)
        {
            ThreadPool::GetSingleton()->AddTask(std::to_string(i));
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }

        std::cout << "生产线程1退出" << std::endl;
        });

    std::thread prod_thread2([&]() {
        for (int i = 0; i < 3000; i++)
        {
            ThreadPool::GetSingleton()->AddTask(std::to_string(i));
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }

        std::cout << "生产线程2退出" << std::endl;
        });

    if(prod_thread1.joinable())
        prod_thread1.join();

    if (prod_thread2.joinable())
        prod_thread2.join();

    ThreadPool::GetSingleton()->Stop();


    std::cout << "执行完成" << std::endl;

    return 0;
}


执行结果:
C++11 – 构建一个符合实际应用要求的线程池-StubbornHuang Blog