1 应用场景

在实际的应用过程中,经常会遇到一个大数量的for循环耗时的问题,比如说出现了一个10000000次的for循环,每一次循环处理业务逻辑需要耗时1ms,如:

for (int i = 0; i < 10000000; ++i)
{
    dosomething()
}

那么整个for循环下来需要耗时10000000毫秒,约等于10000秒,167分钟,2.8个小时。这种时间消耗成本是巨大的。当然,上面的假设有点夸张,正常的业务场景下也不会出现这么大数目的循环。当时对于即时网络接口请求,可能会出现50循环,每个循环耗时50ms的,那么也需要还是2500ms,约2.5秒才能从服务返回处理数据,这种时间成本其实也是比较大的。

那么如何缩短for循环下的业务处理时间,加快响应速度呢?

如果能对for循环进行切片并行化,比如将for循环修改为4个并行的子for循环,那么原本耗时2.5秒的业务处理是不是只需要四分之一的时间,约0.63秒就能成功返回。

2 std::threadstd::shared_futurestd::promise并行化/多线程化for循环

下面将介绍一种多线程化for循环的示例方法,使用std::thread对每一次循环做处理,然后使用std::shared_future,std::promise获取单次循环处理的结果。

2.1 无多线程化下的for循环处理

使用ProcessPoint函数处理每一次循环,为模拟逻辑处理要求,每一次循环都睡眠1秒钟,循环10次总共至少需要耗费10秒钟,示例程序如下:

#include <iostream>
#include <vector>

struct Point
{
    double x;
    double y;
    double z;

    Point()
    {
        x = 0.0;
        y = 0.0;
        z = 0.0;
    }
};

std::pair<int, Point> ProcessPoint(int index, const Point& point)
{
    int index_copy = index;

    Point point_copy;

    point_copy.x = index;

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    return std::make_pair(index_copy, point_copy);
}

int main()
{
    auto ProcessPoint_beforeTime = std::chrono::steady_clock::now();
    for (int i = 0; i < 10; ++i)
    {
        Point tempPoint;
        ProcessPoint(i, tempPoint);
    }
    auto ProcessPoint_afterTime = std::chrono::steady_clock::now();
    double ProcessPoint_duration_millsecond = std::chrono::duration<double, std::milli>(ProcessPoint_afterTime - ProcessPoint_beforeTime).count();
    std::cout << "for循环单线程处理耗时:" << ProcessPoint_duration_millsecond << "毫秒" << std::endl;

    return 0;
}

2.2 for循环的多线程化/并行化

在下面的示例程序中,我们对for循环中的每一次循环都开启一个子线程处理业务逻辑,为了确保子线程函数处理的结果能够在主线程获取,使用了std::promise和std::shared_future对子线程处理结果进行绑定,通过std::pair的方式进行返回,在std::pair中也保留了循环的索引,方便主线程在获取到所有的子线程处理结果之后按索引对结果进行排序。

#include <iostream>
#include <thread>
#include <future>
#include <vector>

struct Point
{
    double x;
    double y;
    double z;

    Point()
    {
        x = 0.0;
        y = 0.0;
        z = 0.0;
    }
};

std::pair<int, Point> ProcessPoint(int index, const Point& point)
{
    int index_copy = index;

    Point point_copy;

    point_copy.x = index;

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    return std::make_pair(index_copy, point_copy);
}

void ProcessPointMultiThread(int index,const Point& point, std::promise<std::pair<int,Point>> promise)
{
    int index_copy = index;

    Point point_copy;

    point_copy.x = index;

    promise.set_value(std::make_pair(index_copy, point_copy));

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}


int main()
{
    /*----- 1 未多线程 -----*/
    auto ProcessPoint_beforeTime = std::chrono::steady_clock::now();
    for (int i = 0; i < 10; ++i)
    {
        Point tempPoint;
        ProcessPoint(i, tempPoint);
    }
    auto ProcessPoint_afterTime = std::chrono::steady_clock::now();
    double ProcessPoint_duration_millsecond = std::chrono::duration<double, std::milli>(ProcessPoint_afterTime - ProcessPoint_beforeTime).count();
    std::cout << "for循环单线程处理耗时:" << ProcessPoint_duration_millsecond << "毫秒" << std::endl;

    /*----- 2 多线程 -----*/
    auto ProcessPointMultiThread_beforeTime = std::chrono::steady_clock::now();
    std::vector<std::shared_ptr<std::thread>> threadVector;
    std::vector<std::shared_future<std::pair<int, Point>>> futureVector;

    for (int i = 0; i < 10; ++i)
    {
        std::promise<std::pair<int, Point>> promise;
        std::shared_future<std::pair<int, Point>> result = std::move(promise.get_future());
        Point tempPoint;

        std::shared_ptr<std::thread> pThread = std::make_shared<std::thread>(ProcessPointMultiThread, i, tempPoint, std::move(promise));

        if (pThread != nullptr)
        {
            threadVector.emplace_back(pThread);
            futureVector.push_back(result);
        }   
    }

    for (int i = 0; i < threadVector.size(); ++i)
    {
        if (threadVector[i] != nullptr)
        {
            if (threadVector[i]->joinable())
            {
                threadVector[i]->join();
            }
        }
    }

    for (int i = 0; i < futureVector.size(); ++i)
    {
        std::pair<int, Point> tempPair = futureVector[i].get();
        std::cout << tempPair.first << std::endl;
        std::cout << tempPair.second.x << std::endl;
    }

    auto ProcessPointMultiThread_afterTime = std::chrono::steady_clock::now();
    double ProcessPointMultiThread_duration_millsecond = std::chrono::duration<double, std::milli>(ProcessPointMultiThread_afterTime - ProcessPointMultiThread_beforeTime).count();
    std::cout << "for循环多线程处理耗时:" << ProcessPointMultiThread_duration_millsecond << "毫秒" << std::endl;

    return 0;
}

2.3 单线程for循环与多线程for循环的时间效率对比

2.2小节中的示例程序,每个函数都循环10次,单线程for循环需要处理大约10秒钟,多线程for循环只需要1秒钟。当然这种并行化方式在每一次循环都要新建线程开销较大,这个地方可带优化。不过可参照这种模式,对不同粒度的for循环可使用不同的切片方案,比如说不对每一次循环都新开线程,而是每一个for循环只是用std::thread::hardware_concurrency()个线程进行处理,减少cpu线程调度开销。

C++11 – 使用std::thread,std::shared_future,std::promise并行化/多线程化for循环,提升处理速度-StubbornHuang Blog