使用系统原生Win32_API编写高效好用的线程池


cpu.jpg

软件开发过程中经常需要使用线程池,特别是做并行计算的时候,做并行计算的时候OpenMp并不是一个好的选择,OpenMp并不能充分利用多核心CPU的算力,使用线程池是更好的选择。然而让人苦恼的是C++标准库并没有提供线程池,我不得不自己写了一个线程池,为了能管理线程,连线程类也一并写了。

ThreadPool.h


class ThreadPool
{
public:
    ThreadPool(int maxThread, std::function<void(void*)> threadProc);
    virtual ~ThreadPool();
public:
    void addTask(void* data);
private:
    static unsigned long __stdcall _threadProc(void* parameter);
private:
    std::function<void(void*)> mThreadProc;
    std::vector<HANDLE> mThreads;
    std::vector<void*> mDatas;
    CRITICAL_SECTION mDatasLock;
    HANDLE mTaskAddedEvent = 0;
    volatile bool mExitThread = false;
};

ThreadPool.cpp

#include "windows.h"
#include <functional>
#include <vector>
#include "ThreadPool.h"

ThreadPool::ThreadPool(int maxThread, std::function<void(void*)> threadProc)
    : mThreadProc(threadProc)
{
    InitializeCriticalSection(&mDatasLock);
    mTaskAddedEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    mThreads.resize(maxThread);
    for (int i = 0; i < maxThread; i++)
    {
        mThreads[i] = CreateThread(NULL, 0, _threadProc, this, 0, NULL);
    }
}
ThreadPool::~ThreadPool()
{
    mExitThread = true;
    for (size_t i = 0; i < mThreads.size(); i++)
    {
        WaitForSingleObject(mThreads[i], INFINITE);
    }
    mThreads.clear();

    DeleteCriticalSection(&mDatasLock);
    if (mTaskAddedEvent)
    {
        CloseHandle(mTaskAddedEvent);
        mTaskAddedEvent = 0;
    }
}

void ThreadPool::addTask(void* data)
{
    EnterCriticalSection(&mDatasLock);
    mDatas.push_back(data);
    SetEvent(mTaskAddedEvent);
    LeaveCriticalSection(&mDatasLock);
}

unsigned long __stdcall ThreadPool::_threadProc(void* parameter)
{
    ThreadPool* pThis = (ThreadPool*)parameter;
    while (pThis->mExitThread == false)
    {
        DWORD waitResult = WaitForSingleObject(pThis->mTaskAddedEvent, 100);
        if (waitResult != WAIT_OBJECT_0)
        {
            continue;
        }
        EnterCriticalSection(&pThis->mDatasLock);
        if (pThis->mDatas.empty())
        {
            ResetEvent(pThis->mTaskAddedEvent);
            LeaveCriticalSection(&pThis->mDatasLock);
            continue;
        }
        void* data = pThis->mDatas.back();
        pThis->mDatas.pop_back();
        if (pThis->mDatas.empty())
        {
            ResetEvent(pThis->mTaskAddedEvent);
        }
        LeaveCriticalSection(&pThis->mDatasLock);

        pThis->mThreadProc(data);
    }

    return 0;
}

使用方法

这个线程池被设计为使用多个线程按同样的方式处理不同的数据,线程数量由构造函数的maxThread参数指定。处理数据的函数由构造函数的threadProc参数指定,运行的时侯threadProc会被多次调用,每次调用都将传入不同的参数。当没有数据需要处理时线程池创建的所有线程都将进入休眠状态等待数据,不会浪费CPU算力。具体使用方法参考代码:

在函数中使用

int main()
{
    // 创建线程池
    ThreadPool pool(12, [](void* data)
    {
        printf("%d", *(int*)data);
        // 释放new分配的内存
        delete (int*)data;
    });
    // 添加要处理的数据
    for (int i = 0; i < 1000; i++)
    {
        int* data = new int;
        *data = rand();
        pool.addTask(data);
    }
    return 0;
}

在类中使用

 class ThreadPoolDemo
 {
 private:
     ThreadPool* mThreadPool = nullptr;
 public:
     ThreadPoolDemo()
     {
         // 创建线程池
         mThreadPool = new ThreadPool(12, [this](void* data)
         {
             processData(data);
         });
     }
     ~ThreadPoolDemo()
     {
         if (mThreadPool)
         {
             delete mThreadPool;
             mThreadPool = nullptr;
         }
     }
 public:
     void processData(void* data)
     {
         printf("%d", *(int*)data);
         // 释放new分配的内存
         delete (int*)data;
     }
 public:
     void execute()
     {
         // 添加要处理的数据
         for (int i = 0; i < 1000; i++)
         {
             int* data = new int;
             *data = rand();
            mThreadPool->addTask(data);
         }
     }
 };

 int main()
 {
     ThreadPoolDemo demo;
     demo.execute();

     return 0;
 }

上述所有代码在Visual Studio 2019中编译通过。


芸芸小站首发,阅读原文:


最后编辑:2021年05月10日 ©版权所有,转载须保留原文链接