Skip to content

Latest commit

 

History

History
78 lines (55 loc) · 4.73 KB

File metadata and controls

78 lines (55 loc) · 4.73 KB

2.4 确定线程数量

std::thread::hardware_concurrency()在新版C++中非常有用,其会返回并发线程的数量。例如,多核系统中,返回值可以是CPU核芯的数量。返回值也仅仅是一个标识,当无法获取时,函数返回0。

代码2.9实现了并行版的std::accumulate。代码将整体工作拆分成小任务,交给每个线程去做,并设置最小任务数,避免产生太多的线程,程序会在操作数量为0时抛出异常。比如,std::thread无法启动线程,就会抛出异常。

代码2.9 并行版的std::accumulate

template<typename Iterator,typename T>
struct accumulate_block
{
  void operator()(Iterator first,Iterator last,T& result)
  {
    result=std::accumulate(first,last,result);
  }
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
  unsigned long const length=std::distance(first,last);

  if(!length) // 1
    return init;

  unsigned long const min_per_thread=25;
  unsigned long const max_threads=
      (length+min_per_thread-1)/min_per_thread; // 2

  unsigned long const hardware_threads=
      std::thread::hardware_concurrency();

  unsigned long const num_threads=  // 3
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);

  unsigned long const block_size=length/num_threads; // 4

  std::vector<T> results(num_threads);
  std::vector<std::thread> threads(num_threads-1);  // 5

  Iterator block_start=first;
  for(unsigned long i=0; i < (num_threads-1); ++i)
  {
    Iterator block_end=block_start;
    std::advance(block_end,block_size);  // 6
    threads[i]=std::thread(     // 7
        accumulate_block<Iterator,T>(),
        block_start,block_end,std::ref(results[i]));
    block_start=block_end;  // 8
  }
  accumulate_block<Iterator,T>()(
      block_start,last,results[num_threads-1]); // 9
      
  for (auto& entry : threads)
    entry.join();  // 10

  return std::accumulate(results.begin(),results.end(),init); // 11
}

函数看起来很长,但不复杂。如果输入的范围为空①,就会得到init的值。如果范围内的元素多于一个时,需要用范围内元素的总数量除以线程(块)中最小任务数,从而确定启动线程的最大数量②。

因为上下文频繁切换会降低线程的性能,所以计算量的最大值和硬件支持线程数,较小的值为启动线程的数量③。std::thread::hardware_concurrency()返回0时,可以选择一个合适的数字。在本例中,我选择了"2"。

每个线程中处理的元素数量,是范围中元素的总量除以线程的个数得出的④,分配是否得当会在后面讨论。

现在,确定了线程个数,创建一个std::vector<T>容器存放中间结果,并为线程创建一个std::vector<std::thread>容器⑤。因为在启动之前已经有了一个线程(主线程),所以启动的线程数必须比num_threads少1。

使用循环来启动线程:block_end迭代器指向当前块的末尾⑥,并启动一个新线程为当前块累加结果⑦。当迭代器指向当前块的末尾时,启动下一个块⑧。

启动所有线程后,⑨中的线程会处理最终块的结果。因为知道最终块是哪一个,所以最终块中有多少个元素就无所谓了。

累加最终块的结果后,可等待std::for_each⑩创建线程(如同在代码2.8中做的那样),之后使用std::accumulate将所有结果进行累加⑪。

结束这个例子之前,需要明确:T类型的加法不满足结合律(比如,对于float型或double型,在进行加法操作时,系统很可能会做截断操作),因为对范围中元素的分组,会导致parallel_accumulate得到的结果可能与std::accumulate的结果不同。同样的,这里对迭代器的要求更加严格:必须是前向迭代器。对于results容器,需要保证T有默认构造函数。可以需要根据算法本身的特性,选择不同的并行方式。算法并行会在第8章更加深入的进行讨论,并在第10章中会介绍C++17中支持的并行算法(其中std::reduce操作等价于这里的parallel_accumulate)。因为不能直接从一个线程中返回值,所以需要传递results容器的引用到线程中去。另一个办法,通过地址来获取线程执行的结果(第4章中,我们将使用future完成这种方案)。

当线程运行时,所有必要的信息都需要传入到线程中去,包括存储计算结果的位置。有时候可以传递一个标识数,例如代码2.8中的i。不过,需要标识的函数在调用栈的底层,同时其他线程也可调用该函数,那么标识数就会变成累赘。好消息是在设计C++的线程库时,就有预见了这种情况,实现中给每个线程附加了唯一标识符。