Skip to content

Commit 5803d12

Browse files
committed
Add distributed::reduce() algorithm
1 parent a1008fe commit 5803d12

File tree

3 files changed

+563
-0
lines changed

3 files changed

+563
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,387 @@
1+
//---------------------------------------------------------------------------//
2+
// Copyright (c) 2016 Jakub Szuppe <j.szuppe@gmail.com>
3+
//
4+
// Distributed under the Boost Software License, Version 1.0
5+
// See accompanying file LICENSE_1_0.txt or copy at
6+
// http://www.boost.org/LICENSE_1_0.txt
7+
//
8+
// See http://boostorg.github.com/compute for more information.
9+
//---------------------------------------------------------------------------//
10+
11+
#ifndef BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP
12+
#define BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP
13+
14+
#include <vector>
15+
16+
#include <boost/utility/enable_if.hpp>
17+
18+
#include <boost/mpl/and.hpp>
19+
#include <boost/mpl/not.hpp>
20+
#include <boost/mpl/or.hpp>
21+
22+
#include <boost/compute/buffer.hpp>
23+
#include <boost/compute/algorithm/reduce.hpp>
24+
#include <boost/compute/algorithm/copy_n.hpp>
25+
#include <boost/compute/iterator/buffer_iterator.hpp>
26+
#include <boost/compute/type_traits/is_device_iterator.hpp>
27+
28+
#include <boost/compute/distributed/command_queue.hpp>
29+
#include <boost/compute/distributed/vector.hpp>
30+
31+
namespace boost {
32+
namespace compute {
33+
namespace distributed {
34+
namespace detail {
35+
36+
template<class OutputIterator>
37+
inline ::boost::compute::command_queue&
38+
final_reduce_queue(OutputIterator result,
39+
command_queue &queue,
40+
typename boost::enable_if_c<
41+
!is_device_iterator<OutputIterator>::value
42+
>::type* = 0)
43+
{
44+
(void) result;
45+
46+
::boost::compute::command_queue& device_queue = queue.get(0);
47+
// CPU device is preferred, however if there is none, the first device
48+
// queue is used
49+
for(size_t i = 0; i < queue.size(); i++)
50+
{
51+
if(queue.get(i).get_device().type() & ::boost::compute::device::cpu)
52+
{
53+
device_queue = queue.get(i);
54+
break;
55+
}
56+
}
57+
return device_queue;
58+
}
59+
60+
template<class OutputIterator>
61+
inline ::boost::compute::command_queue&
62+
final_reduce_queue(OutputIterator result,
63+
command_queue &queue,
64+
typename boost::enable_if_c<
65+
is_device_iterator<OutputIterator>::value
66+
>::type* = 0)
67+
{
68+
// first, find all queues that can be used with result iterator
69+
const ::boost::compute::context& result_context =
70+
result.get_buffer().get_context();
71+
std::vector<size_t> compatible_queues;
72+
for(size_t i = 0; i < queue.size(); i++)
73+
{
74+
if(queue.get(i).get_context() == result_context)
75+
{
76+
compatible_queues.push_back(i);
77+
}
78+
}
79+
BOOST_ASSERT_MSG(
80+
!compatible_queues.empty(),
81+
"There is no device command queue that can be use to copy to result"
82+
);
83+
84+
// then choose device queue from compatible device queues
85+
86+
// CPU device is preferred, however if there is none, the first
87+
// compatible device queue is used
88+
::boost::compute::command_queue& device_queue = queue.get(compatible_queues[0]);
89+
for(size_t i = 0; i < compatible_queues.size(); i++)
90+
{
91+
size_t n = compatible_queues[i];
92+
if(queue.get(n).get_device().type() & ::boost::compute::device::cpu)
93+
{
94+
device_queue = queue.get(n);
95+
break;
96+
}
97+
}
98+
return device_queue;
99+
}
100+
101+
template<
102+
class InputType, weight_func weight, class Alloc,
103+
class OutputIterator,
104+
class BinaryFunction
105+
>
106+
inline void
107+
dispatch_reduce(const vector<InputType, weight, Alloc> &input,
108+
OutputIterator result,
109+
BinaryFunction function,
110+
command_queue &queue)
111+
{
112+
typedef typename
113+
boost::compute::result_of<BinaryFunction(InputType, InputType)>::type
114+
result_type;
115+
116+
// find device queue for the final reduction
117+
::boost::compute::command_queue& device_queue =
118+
final_reduce_queue(result, queue);
119+
120+
::boost::compute::buffer parts_results_device(
121+
device_queue.get_context(), input.parts() * sizeof(result_type)
122+
);
123+
124+
// if all devices queues are in the same OpenCL context we can
125+
// save part reduction directly into parts_results_device buffer
126+
size_t reduced = 0;
127+
if(queue.one_context())
128+
{
129+
// reduce each part of input vector
130+
for(size_t i = 0; i < input.parts(); i++)
131+
{
132+
if(input.begin(i) != input.end(i))
133+
{
134+
// async, because it stores result on device
135+
::boost::compute::reduce(
136+
input.begin(i),
137+
input.end(i),
138+
::boost::compute::make_buffer_iterator<result_type>(
139+
parts_results_device, reduced
140+
),
141+
function,
142+
queue.get(i)
143+
);
144+
reduced++;
145+
}
146+
}
147+
// add marker on every queue that is not device_queue, because
148+
// we need to know when reductions are done
149+
wait_list reduce_markers;
150+
reduce_markers.reserve(reduced);
151+
for(size_t i = 0; i < input.parts(); i++)
152+
{
153+
if(input.begin(i) != input.end(i) && queue.get(i) != device_queue)
154+
{
155+
reduce_markers.insert(queue.get(i).enqueue_marker());
156+
}
157+
}
158+
// if it is possible we enqueue a barrier in device_queue which waits
159+
// for reduce_markers (we can do this since all events are in the same
160+
// context); otherwise, we need to sync. wait for those events
161+
#ifdef CL_VERSION_1_2
162+
if(device_queue.check_device_version(1, 2)) {
163+
device_queue.enqueue_barrier(reduce_markers);
164+
}
165+
#endif
166+
{
167+
reduce_markers.wait();
168+
}
169+
}
170+
else
171+
{
172+
// reduce each part of input vector
173+
std::vector<result_type> parts_results_host(input.parts());
174+
for(size_t i = 0; i < input.parts(); i++)
175+
{
176+
if(input.begin(i) != input.end(i))
177+
{
178+
// sync, because it stores result on host
179+
::boost::compute::reduce(
180+
input.begin(i),
181+
input.end(i),
182+
&parts_results_host[reduced],
183+
function,
184+
queue.get(i)
185+
);
186+
reduced++;
187+
}
188+
}
189+
// sync, because it copies from host to device
190+
::boost::compute::copy_n(
191+
parts_results_host.begin(),
192+
reduced,
193+
::boost::compute::make_buffer_iterator<result_type>(
194+
parts_results_device
195+
),
196+
device_queue
197+
);
198+
}
199+
// final reduction
200+
// async if result is device_iterator, sync otherwise
201+
::boost::compute::reduce(
202+
::boost::compute::make_buffer_iterator<result_type>(
203+
parts_results_device
204+
),
205+
::boost::compute::make_buffer_iterator<result_type>(
206+
parts_results_device, reduced
207+
),
208+
result,
209+
function,
210+
device_queue
211+
);
212+
}
213+
214+
// special case for when OutputIterator is a host iterator
215+
// and binary operator is plus<T>
216+
template<
217+
class InputType, weight_func weight, class Alloc,
218+
class OutputIterator,
219+
class T
220+
>
221+
inline void
222+
dispatch_reduce(const vector<InputType, weight, Alloc> &input,
223+
OutputIterator result,
224+
::boost::compute::plus<T> function,
225+
command_queue &queue,
226+
typename boost::enable_if_c<
227+
!is_device_iterator<OutputIterator>::value
228+
>::type* = 0)
229+
{
230+
// reduce each part of input vector
231+
std::vector<T> parts_results_host(input.parts());
232+
for(size_t i = 0; i < input.parts(); i++)
233+
{
234+
::boost::compute::reduce(
235+
input.begin(i),
236+
input.end(i),
237+
&parts_results_host[i],
238+
function,
239+
queue.get(i)
240+
);
241+
}
242+
243+
// final reduction
244+
*result = parts_results_host[0];
245+
for(size_t i = 1; i < input.parts(); i++)
246+
{
247+
*result += static_cast<T>(parts_results_host[i]);
248+
}
249+
}
250+
251+
// special case for when OutputIterator is a host iterator
252+
// and binary operator is min<T>
253+
template<
254+
class InputType, weight_func weight, class Alloc,
255+
class OutputIterator,
256+
class T
257+
>
258+
inline void
259+
dispatch_reduce(vector<InputType, weight, Alloc> &input,
260+
OutputIterator result,
261+
::boost::compute::min<T> function,
262+
command_queue &queue,
263+
typename boost::enable_if_c<
264+
!is_device_iterator<OutputIterator>::value
265+
>::type* = 0)
266+
{
267+
// reduce each part of input vector
268+
std::vector<T> parts_results_host(input.parts());
269+
for(size_t i = 0; i < input.parts(); i++)
270+
{
271+
::boost::compute::reduce(
272+
input.begin(i),
273+
input.end(i),
274+
&parts_results_host[i],
275+
function,
276+
queue.get(i)
277+
);
278+
}
279+
280+
// final reduction
281+
*result = parts_results_host[0];
282+
for(size_t i = 1; i < input.parts(); i++)
283+
{
284+
*result = (std::min)(static_cast<T>(*result), parts_results_host[i]);
285+
}
286+
}
287+
288+
// special case for when OutputIterator is a host iterator
289+
// and binary operator is max<T>
290+
template<
291+
class InputType, weight_func weight, class Alloc,
292+
class OutputIterator,
293+
class T
294+
>
295+
inline void
296+
dispatch_reduce(const vector<InputType, weight, Alloc> &input,
297+
OutputIterator result,
298+
::boost::compute::max<T> function,
299+
command_queue &queue,
300+
typename boost::enable_if_c<
301+
!is_device_iterator<OutputIterator>::value
302+
>::type* = 0)
303+
{
304+
// reduce each part of input vector
305+
std::vector<T> parts_results_host(input.parts());
306+
for(size_t i = 0; i < input.parts(); i++)
307+
{
308+
::boost::compute::reduce(
309+
input.begin(i),
310+
input.end(i),
311+
&parts_results_host[i],
312+
function,
313+
queue.get(i)
314+
);
315+
}
316+
317+
// final reduction
318+
*result = parts_results_host[0];
319+
for(size_t i = 1; i < input.parts(); i++)
320+
{
321+
*result = (std::max)(static_cast<T>(*result), parts_results_host[i]);
322+
}
323+
}
324+
325+
} // end detail namespace
326+
327+
/// Returns the result of applying \p function to the elements in the
328+
/// \p input vector.
329+
///
330+
/// If no function is specified, \c plus will be used.
331+
///
332+
/// \param input input vector
333+
/// \param result iterator pointing to the output
334+
/// \param function binary reduction function
335+
/// \param queue distributed command queue to perform the operation
336+
///
337+
/// Distributed command queue \p queue has to span same set of compute
338+
/// devices as distributed command queue used to create \p input vector.
339+
///
340+
/// If \p result is a device iterator, its underlying buffer must be allocated
341+
/// in context of at least one device command queue from \p queue.
342+
///
343+
/// The \c reduce() algorithm assumes that the binary reduction function is
344+
/// associative. When used with non-associative functions the result may
345+
/// be non-deterministic and vary in precision. Notably this affects the
346+
/// \c plus<float>() function as floating-point addition is not associative
347+
/// and may produce slightly different results than a serial algorithm.
348+
///
349+
/// This algorithm supports both host and device iterators for the
350+
/// result argument. This allows for values to be reduced and copied
351+
/// to the host all with a single function call.
352+
template<
353+
class InputType, weight_func weight, class Alloc,
354+
class OutputIterator,
355+
class BinaryFunction
356+
>
357+
inline void
358+
reduce(const vector<InputType, weight, Alloc> &input,
359+
OutputIterator result,
360+
BinaryFunction function,
361+
command_queue &queue)
362+
{
363+
if(input.empty()) {
364+
return;
365+
}
366+
367+
detail::dispatch_reduce(input, result, function, queue);
368+
}
369+
370+
/// \overload
371+
template<
372+
class InputType, weight_func weight, class Alloc,
373+
class OutputIterator
374+
>
375+
inline void
376+
reduce(const vector<InputType, weight, Alloc> &input,
377+
OutputIterator result,
378+
command_queue &queue)
379+
{
380+
return reduce(input, result, ::boost::compute::plus<InputType>(), queue);
381+
}
382+
383+
} // end distributed namespace
384+
} // end compute namespace
385+
} // end boost namespace
386+
387+
#endif /* BOOST_COMPUTE_DISTRIBUTED_REDUCE_HPP */

test/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ add_compute_test("distributed.context" test_distributed_context.cpp)
8585
add_compute_test("distributed.command_queue" test_distributed_command_queue.cpp)
8686
add_compute_test("distributed.vector" test_distributed_vector.cpp)
8787
add_compute_test("distributed.copy" test_distributed_copy.cpp)
88+
add_compute_test("distributed.reduce" test_distributed_reduce.cpp)
8889
add_compute_test("distributed.transform" test_distributed_transform.cpp)
8990

9091
add_compute_test("utility.extents" test_extents.cpp)

0 commit comments

Comments
 (0)