Skip to content

Commit

Permalink
[SYCL][CUDA][HIP][PI] Fix barrier (#6490)
Browse files Browse the repository at this point in the history
Fixes a bug in barrier implementation in CUDA and HIP plugins that often
caused barrier not to work. The new implementation is also faster.

Tests in: intel/llvm-test-suite#1122
  • Loading branch information
t4c1 authored Oct 6, 2022
1 parent 9b02506 commit 1c3d598
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 61 deletions.
117 changes: 89 additions & 28 deletions sycl/plugins/cuda/pi_cuda.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,25 @@ pi_result cuda_piEventRetain(pi_event event);

/// \endcond

void _pi_queue::compute_stream_wait_for_barrier_if_needed(CUstream stream,
pi_uint32 stream_i) {
if (barrier_event_ && !compute_applied_barrier_[stream_i]) {
PI_CHECK_ERROR(cuStreamWaitEvent(stream, barrier_event_, 0));
compute_applied_barrier_[stream_i] = true;
}
}

void _pi_queue::transfer_stream_wait_for_barrier_if_needed(CUstream stream,
pi_uint32 stream_i) {
if (barrier_event_ && !transfer_applied_barrier_[stream_i]) {
PI_CHECK_ERROR(cuStreamWaitEvent(stream, barrier_event_, 0));
transfer_applied_barrier_[stream_i] = true;
}
}

CUstream _pi_queue::get_next_compute_stream(pi_uint32 *stream_token) {
pi_uint32 stream_i;
pi_uint32 token;
while (true) {
if (num_compute_streams_ < compute_streams_.size()) {
// the check above is for performance - so as not to lock mutex every time
Expand All @@ -398,40 +415,46 @@ CUstream _pi_queue::get_next_compute_stream(pi_uint32 *stream_token) {
cuStreamCreate(&compute_streams_[num_compute_streams_++], flags_));
}
}
stream_i = compute_stream_idx_++;
token = compute_stream_idx_++;
stream_i = token % compute_streams_.size();
// if a stream has been reused before it was next selected round-robin
// fashion, we want to delay its next use and instead select another one
// that is more likely to have completed all the enqueued work.
if (delay_compute_[stream_i % compute_streams_.size()]) {
delay_compute_[stream_i % compute_streams_.size()] = false;
if (delay_compute_[stream_i]) {
delay_compute_[stream_i] = false;
} else {
break;
}
}
if (stream_token) {
*stream_token = stream_i;
*stream_token = token;
}
return compute_streams_[stream_i % compute_streams_.size()];
CUstream res = compute_streams_[stream_i];
compute_stream_wait_for_barrier_if_needed(res, stream_i);
return res;
}

CUstream _pi_queue::get_next_compute_stream(pi_uint32 num_events_in_wait_list,
const pi_event *event_wait_list,
_pi_stream_guard &guard,
pi_uint32 *stream_token) {
for (pi_uint32 i = 0; i < num_events_in_wait_list; i++) {
pi_uint32 token = event_wait_list[i]->get_stream_token();
pi_uint32 token = event_wait_list[i]->get_compute_stream_token();
if (event_wait_list[i]->get_queue() == this && can_reuse_stream(token)) {
std::unique_lock<std::mutex> compute_sync_guard(
compute_stream_sync_mutex_);
// redo the check after lock to avoid data races on
// last_sync_compute_streams_
if (can_reuse_stream(token)) {
delay_compute_[token % delay_compute_.size()] = true;
pi_uint32 stream_i = token % delay_compute_.size();
delay_compute_[stream_i] = true;
if (stream_token) {
*stream_token = token;
}
guard = _pi_stream_guard{std::move(compute_sync_guard)};
return event_wait_list[i]->get_stream();
CUstream res = event_wait_list[i]->get_stream();
compute_stream_wait_for_barrier_if_needed(res, stream_i);
return res;
}
}
}
Expand All @@ -453,7 +476,10 @@ CUstream _pi_queue::get_next_transfer_stream() {
cuStreamCreate(&transfer_streams_[num_transfer_streams_++], flags_));
}
}
return transfer_streams_[transfer_stream_idx_++ % transfer_streams_.size()];
pi_uint32 stream_i = transfer_stream_idx_++ % transfer_streams_.size();
CUstream res = transfer_streams_[stream_i];
transfer_stream_wait_for_barrier_if_needed(res, stream_i);
return res;
}

_pi_event::_pi_event(pi_command_type type, pi_context context, pi_queue queue,
Expand Down Expand Up @@ -2549,7 +2575,7 @@ pi_result cuda_piQueueFinish(pi_queue command_queue) {
nullptr); // need PI_ERROR_INVALID_EXTERNAL_HANDLE error code
ScopedContext active(command_queue->get_context());

command_queue->sync_streams([&result](CUstream s) {
command_queue->sync_streams</*ResetUsed=*/true>([&result](CUstream s) {
result = PI_CHECK_ERROR(cuStreamSynchronize(s));
});

Expand Down Expand Up @@ -3875,35 +3901,70 @@ pi_result cuda_piEnqueueEventsWaitWithBarrier(pi_queue command_queue,
pi_uint32 num_events_in_wait_list,
const pi_event *event_wait_list,
pi_event *event) {
// This function makes one stream work on the previous work (or work
// represented by input events) and then all future work waits on that stream.
if (!command_queue) {
return PI_ERROR_INVALID_QUEUE;
}

pi_result result;

try {
ScopedContext active(command_queue->get_context());
pi_uint32 stream_token;
_pi_stream_guard guard;
CUstream cuStream = command_queue->get_next_compute_stream(
num_events_in_wait_list, event_wait_list, guard, &stream_token);
{
std::lock_guard(command_queue->barrier_mutex_);
if (command_queue->barrier_event_ == nullptr) {
PI_CHECK_ERROR(cuEventCreate(&command_queue->barrier_event_,
CU_EVENT_DISABLE_TIMING));
}
if (num_events_in_wait_list == 0) { // wait on all work
if (command_queue->barrier_tmp_event_ == nullptr) {
PI_CHECK_ERROR(cuEventCreate(&command_queue->barrier_tmp_event_,
CU_EVENT_DISABLE_TIMING));
}
command_queue->sync_streams(
[cuStream,
tmp_event = command_queue->barrier_tmp_event_](CUstream s) {
if (cuStream != s) {
// record a new CUDA event on every stream and make one stream
// wait for these events
PI_CHECK_ERROR(cuEventRecord(tmp_event, s));
PI_CHECK_ERROR(cuStreamWaitEvent(cuStream, tmp_event, 0));
}
});
} else { // wait just on given events
forLatestEvents(event_wait_list, num_events_in_wait_list,
[cuStream](pi_event event) -> pi_result {
if (event->get_queue()->has_been_synchronized(
event->get_compute_stream_token())) {
return PI_SUCCESS;
} else {
return PI_CHECK_ERROR(
cuStreamWaitEvent(cuStream, event->get(), 0));
}
});
}

if (event_wait_list) {
auto result =
forLatestEvents(event_wait_list, num_events_in_wait_list,
[command_queue](pi_event event) -> pi_result {
if (event->get_queue()->has_been_synchronized(
event->get_stream_token())) {
return PI_SUCCESS;
} else {
return enqueueEventWait(command_queue, event);
}
});

if (result != PI_SUCCESS) {
return result;
result = PI_CHECK_ERROR(
cuEventRecord(command_queue->barrier_event_, cuStream));
for (unsigned int i = 0;
i < command_queue->compute_applied_barrier_.size(); i++) {
command_queue->compute_applied_barrier_[i] = false;
}
for (unsigned int i = 0;
i < command_queue->transfer_applied_barrier_.size(); i++) {
command_queue->transfer_applied_barrier_[i] = false;
}
}
if (result != PI_SUCCESS) {
return result;
}

if (event) {
pi_uint32 stream_token;
_pi_stream_guard guard;
CUstream cuStream = command_queue->get_next_compute_stream(
num_events_in_wait_list, event_wait_list, guard, &stream_token);
*event = _pi_event::make_native(PI_COMMAND_TYPE_MARKER, command_queue,
cuStream, stream_token);
(*event)->start();
Expand Down
27 changes: 22 additions & 5 deletions sycl/plugins/cuda/pi_cuda.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,14 @@ struct _pi_queue {
// will be skipped the next time it would be selected round-robin style. When
// skipped, its delay flag is cleared.
std::vector<bool> delay_compute_;
// keep track of which streams have applied barrier
std::vector<bool> compute_applied_barrier_;
std::vector<bool> transfer_applied_barrier_;
_pi_context *context_;
_pi_device *device_;
pi_queue_properties properties_;
CUevent barrier_event_ = nullptr;
CUevent barrier_tmp_event_ = nullptr;
std::atomic_uint32_t refCount_;
std::atomic_uint32_t eventCount_;
std::atomic_uint32_t compute_stream_idx_;
Expand All @@ -420,6 +425,7 @@ struct _pi_queue {
std::mutex compute_stream_sync_mutex_;
std::mutex compute_stream_mutex_;
std::mutex transfer_stream_mutex_;
std::mutex barrier_mutex_;
bool has_ownership_;

_pi_queue(std::vector<CUstream> &&compute_streams,
Expand All @@ -428,7 +434,9 @@ struct _pi_queue {
unsigned int flags, bool backend_owns = true)
: compute_streams_{std::move(compute_streams)},
transfer_streams_{std::move(transfer_streams)},
delay_compute_(compute_streams_.size(), false), context_{context},
delay_compute_(compute_streams_.size(), false),
compute_applied_barrier_(compute_streams_.size()),
transfer_applied_barrier_(transfer_streams_.size()), context_{context},
device_{device}, properties_{properties}, refCount_{1}, eventCount_{0},
compute_stream_idx_{0}, transfer_stream_idx_{0},
num_compute_streams_{0}, num_transfer_streams_{0},
Expand All @@ -443,6 +451,11 @@ struct _pi_queue {
cuda_piDeviceRelease(device_);
}

void compute_stream_wait_for_barrier_if_needed(CUstream stream,
pi_uint32 stream_i);
void transfer_stream_wait_for_barrier_if_needed(CUstream stream,
pi_uint32 stream_i);

// get_next_compute/transfer_stream() functions return streams from
// appropriate pools in round-robin fashion
native_type get_next_compute_stream(pi_uint32 *stream_token = nullptr);
Expand Down Expand Up @@ -507,7 +520,7 @@ struct _pi_queue {
}
}

template <typename T> void sync_streams(T &&f) {
template <bool ResetUsed = false, typename T> void sync_streams(T &&f) {
auto sync_compute = [&f, &streams = compute_streams_,
&delay = delay_compute_](unsigned int start,
unsigned int stop) {
Expand All @@ -530,7 +543,9 @@ struct _pi_queue {
unsigned int end = num_compute_streams_ < size
? num_compute_streams_
: compute_stream_idx_.load();
last_sync_compute_streams_ = end;
if (ResetUsed) {
last_sync_compute_streams_ = end;
}
if (end - start >= size) {
sync_compute(0, size);
} else {
Expand All @@ -552,7 +567,9 @@ struct _pi_queue {
unsigned int end = num_transfer_streams_ < size
? num_transfer_streams_
: transfer_stream_idx_.load();
last_sync_transfer_streams_ = end;
if (ResetUsed) {
last_sync_transfer_streams_ = end;
}
if (end - start >= size) {
sync_transfer(0, size);
} else {
Expand Down Expand Up @@ -604,7 +621,7 @@ struct _pi_event {

CUstream get_stream() const noexcept { return stream_; }

pi_uint32 get_stream_token() const noexcept { return streamToken_; }
pi_uint32 get_compute_stream_token() const noexcept { return streamToken_; }

pi_command_type get_command_type() const noexcept { return commandType_; }

Expand Down
Loading

0 comments on commit 1c3d598

Please sign in to comment.