FixedThreadPool 固定线程池:从原理到工业级实现

张开发
2026/4/15 15:38:13 15 分钟阅读

分享文章

FixedThreadPool 固定线程池:从原理到工业级实现
前言在高并发后端服务中固定线程池FixedThreadPool是最常用、最稳定的线程池实现之一。它通过预先创建固定数量的工作线程、复用线程执行任务、统一管理任务队列完美解决了频繁创建销毁线程的性能问题是异步任务、批量计算、IO 并发等场景的工业级标准方案。本文基于半同步 / 半异步架构从需求分析、同步队列设计、线程池实现到性能优化带你从零实现一个可直接上线、带命名空间、支持日志、支持批量任务、支持优雅停止的 C11 FixedThreadPool。一、FixedThreadPool 核心需求与架构1.1 什么是 FixedThreadPoolFixedThreadPool 是固定大小的线程池核心特性创建时指定线程数量线程池生命周期内线程数量不变任务提交时若有空闲线程则立即执行无空闲线程则任务入队等待线程执行完任务后不销毁归还线程池等待下一个任务支持任务队列上限控制避免内存暴涨支持优雅停止确保任务不丢失、线程安全退出1.2 三层架构半同步 / 半异步模式FixedThreadPool 采用经典的生产者 - 消费者三层架构同步服务层生产者业务线程提交任务将任务加入同步队列排队层同步队列 SyncQueue线程安全的任务队列负责任务缓存、限流、线程同步异步服务层消费者预先创建的工作线程从队列中取出任务并行执行二、核心组件SyncQueue 同步队列设计同步队列是线程池的核心中间层是线程安全的保障也是性能优化的关键。2.1 同步队列核心需求线程安全多线程并发添加 / 取出任务无数据竞争任务同步空队列时阻塞消费者线程满队列时阻塞生产者线程队列限流设置队列上限避免任务过多导致内存溢出优雅停止支持停止队列唤醒所有等待线程安全退出性能优化减少加锁次数避免数据拷贝提升吞吐量2.2 核心技术选型std::mutex互斥锁保护队列操作std::condition_variable条件变量实现线程阻塞 / 唤醒std::unique_lockRAII 锁管理避免死锁std::move / std::forward移动语义 完美转发避免数据拷贝std::atomic_bool原子变量实现停止标志线程安全std::deque双端队列头尾 O (1) 操作适合任务存储三、完整工程化代码实现你的原版代码3.1 SyncQueue_1.hpp 同步队列#include list #include vector #include deque #includequeue #include mutex #include condition_variable #include iostream using namespace std; #include Logger.hpp #ifndef SYNC_QUEUE_1_HPP #define SYNC_QUEUE_1_HPP namespace tulun { static const size_t MaxTaskCount 500; template class T//Task 队列里存的类型 class SyncQueue { private: std::dequeT m_queue; // 双端队列存任务头尾O(1) std::mutex m_mutex; // 互斥锁保证多线程安全 std::condition_variable m_notEmpty; // 条件队列不为空消费者唤醒 std::condition_variable m_notFull; // 条件队列不满生产者唤醒 int m_maxSize; // 队列最大容量 bool m_needStop; // true停止标记初始为false bool IsFull() const { bool full m_queue.size() m_maxSize; LOG_INFO full: full; return full; } bool IsEmpty() const { bool empty m_queue.size(); LOG_INFO empty: empty; return empty; } template class F// F 传进来的参数类型 void Add(F task) { std::unique_lockstd::mutex locker(m_mutex); // 加锁 // 队列满 不停止 → 等待 while (!m_needStop IsFull()) { m_notFull.wait(locker); // 阻塞等待不满 } if (m_needStop) { return; } m_queue.push_back(std::forwardF(task)); // 放入队列 m_notEmpty.notify_all(); // 唤醒消费者有活干了 } public: SyncQueue(int maxsize MaxTaskCount) : m_maxSize(maxsize), m_needStop(false) { } ~SyncQueue() { if (!m_needStop) { Stop(); } } SyncQueue(const SyncQueue ) delete; SyncQueue operator(const SyncQueue ) delete; // 生产者 Put放任务 void Put(const T task) { Add(task); } void Put(T task) { Add(std::forwardT(task)); } // 消费者 Take取任务 void Take(T task) { std::unique_lockstd::mutex locker(m_mutex); // 加锁 // 队列空 不停止 → 等待 while (!m_needStop IsEmpty()) { m_notEmpty.wait(locker); // 阻塞等待不为空 } if (m_needStop) { return; } task m_queue.front(); m_queue.pop_front(); // 取出任务 m_notFull.notify_all(); // 唤醒生产者有空位啦 } //批量取任务高性能 void Task(std::queueT tqu) { std::unique_lockstd::mutex locker(m_mutex); while (!m_needStop IsEmpty()) { m_notEmpty.wait(locker); } if (m_needStop) { return; } tqu std::move(m_queue);//【一次性拿走所有任务】 m_notFull.notify_all(); } void Stop() { { std::unique_lockstd::mutex locker(m_mutex); m_needStop true; } // 唤醒所有等待的线程 m_notEmpty.notify_all(); m_notFull.notify_all(); } bool Empty() const { std::unique_lockstd::mutex locker(m_mutex); return m_queue.empty(); } bool Full() const { std::unique_lockstd::mutex locker(m_mutex); return m_queue.size() m_maxSize; } size_t Size() const { std::unique_lockstd::mutex locker(m_mutex); return m_queue.size(); } size_t Count() const { return m_queue.size(); } }; } // namespace tulun #endif3.2 FixedThreadPool.hpp 线程池定义#include SyncQueue_1.hpp #include functional #include thread #include vector #include queue #include list #include memory #include atomic using namespace std; #ifndef FIXED_THREAD_POOL_HPP #define FIXED_THREAD_POOL_HPP namespace tulun { class FixedThreadPool { public: using TaskType std::functionvoid(void); // std::bind; private: std::liststd::shared_ptrstd::thread m_threadgroup; tulun::SyncQueueTaskType m_queue; std::atomicbool m_running; std::once_flag m_flag; void Start(int numthreads); void RunInThread(); void StopThreadGroup(); public: FixedThreadPool(size_t m_TaskQueSize 500, int numthreads std::thread::hardware_concurrency()); ~FixedThreadPool(); void Stop(); void AddTask(TaskType task); void AddTask(const TaskType task); }; } // namespace tulun #endif3.3 FixedThreadPool.cpp 线程池实现#include FixedThreadPool.hpp namespace tulun { void FixedThreadPool::Start(int numthreads) { m_running true; for (int i 0; i numthreads; i) { // std::shared_ptrstd::thread tha( // new std::thread(FixedThreadPool::RunInThread,this)); m_threadgroup.push_back( std::shared_ptrstd::thread( new std::thread(FixedThreadPool::RunInThread, this))); } } void FixedThreadPool::RunInThread() { while (m_running) { TaskType task; m_queue.Take(task); if (m_running task) { task(); } } } void FixedThreadPool::StopThreadGroup() { m_queue.Stop(); m_running false; for (auto tha : m_threadgroup) { tha-join(); } } FixedThreadPool::FixedThreadPool(size_t m_TaskQueSize ,int numthreads) : m_queue(m_TaskQueSize), m_running(false) { Start(numthreads); } FixedThreadPool::~FixedThreadPool() { Stop(); } void FixedThreadPool::Stop() { std::call_once(m_flag, FixedThreadPool::StopThreadGroup, this); } void FixedThreadPool::AddTask(TaskType task) { m_queue.Put(std::forwardTaskType(task)); } void FixedThreadPool::AddTask(const TaskType task) { m_queue.Put(task); } } // namespace tulun3.4 main.cpp 测试用例#includeFixedThreadPool.hpp #includeiostream #includethread using namespace std; void funa() { coutfunaendl; } void funb() { coutfunbendl; } int main() { tulun::FixedThreadPool mythpool; mythpool.AddTask(funa); mythpool.AddTask(funb); std::this_thread::sleep_for(std::chrono::milliseconds(100)); return 0; }四、代码核心亮点解析4.1 工程化规范使用命名空间tulun封装避免命名冲突分离头文件 / 源文件符合工业级项目规范支持日志打印方便调试与问题定位禁止拷贝构造与赋值保证线程池单例安全4.2 线程管理设计使用std::shared_ptrstd::thread管理线程生命周期构造函数自动创建线程析构函数自动停止线程线程数量默认 CPU 核心数自动适配机器性能4.3 任务队列设计基于std::deque实现头尾操作 O (1)支持单个任务取放 批量任务取放满队列阻塞生产者空队列阻塞消费者完美转发 移动语义零拷贝任务传递4.4 优雅停止机制使用std::atomicbool保证停止标志线程安全使用std::once_flagstd::call_once保证只停止一次停止时唤醒所有等待线程等待线程执行完毕再退出无资源泄漏、无死锁、无任务丢失4.5 高性能优化批量取任务一次加锁拿走所有任务加锁次数 O (1)锁外通知减少线程竞争提升唤醒效率条件变量等待避免空轮询降低 CPU 占用五、核心功能运行流程5.1 线程池启动流程构造函数传入线程数、队列大小Start()创建指定数量工作线程工作线程进入RunInThread()循环等待任务5.2 任务提交流程业务调用AddTask()提交任务任务进入SyncQueue队列满则阻塞唤醒等待的工作线程取任务执行5.3 任务执行流程工作线程从队列取任务队列空则阻塞等待新任务取到任务后执行执行完毕继续等待收到停止信号后退出循环5.4 优雅停止流程调用Stop()触发停止逻辑队列停止唤醒所有等待线程线程执行完当前任务后退出主线程join所有工作线程安全回收资源六、适用场景场景选型理由CPU 密集型计算线程数 CPU 核心数最大化并行效率批量异步任务固定线程数稳定可控无线程爆炸IO 密集型并发线程复用避免频繁创建销毁后端服务接口保护系统资源防止过载日志 / 数据处理任务排队执行有序稳定

更多文章