diff --git a/sycl/plugins/cuda/pi_cuda.cpp b/sycl/plugins/cuda/pi_cuda.cpp index 2b1014e125122..103f7df5bed89 100644 --- a/sycl/plugins/cuda/pi_cuda.cpp +++ b/sycl/plugins/cuda/pi_cuda.cpp @@ -300,7 +300,11 @@ pi_result enqueueEventsWait(pi_queue command_queue, CUstream stream, auto result = forLatestEvents( event_wait_list, num_events_in_wait_list, [stream](pi_event event) -> pi_result { - return PI_CHECK_ERROR(cuStreamWaitEvent(stream, event->get(), 0)); + if (event->get_stream() == stream) { + return PI_SUCCESS; + } else { + return PI_CHECK_ERROR(cuStreamWaitEvent(stream, event->get(), 0)); + } }); if (result != PI_SUCCESS) { @@ -367,18 +371,58 @@ pi_result cuda_piEventRetain(pi_event event); /// \endcond -CUstream _pi_queue::get_next_compute_stream() { - if (num_compute_streams_ < compute_streams_.size()) { - // the check above is for performance - so as not to lock mutex every time - std::lock_guard guard(compute_stream_mutex_); - // The second check is done after mutex is locked so other threads can not - // change num_compute_streams_ after that +CUstream _pi_queue::get_next_compute_stream(pi_uint32 *stream_token) { + pi_uint32 stream_i; + while (true) { if (num_compute_streams_ < compute_streams_.size()) { - PI_CHECK_ERROR( - cuStreamCreate(&compute_streams_[num_compute_streams_++], flags_)); + // the check above is for performance - so as not to lock mutex every time + std::lock_guard guard(compute_stream_mutex_); + // The second check is done after mutex is locked so other threads can not + // change num_compute_streams_ after that + if (num_compute_streams_ < compute_streams_.size()) { + PI_CHECK_ERROR( + cuStreamCreate(&compute_streams_[num_compute_streams_++], flags_)); + } + } + stream_i = compute_stream_idx_++; + // if a stream has been reused before it was next selected round-robin + // fashion, we want to delay its next use and instead select another one + // that is more likely to have completed all the enqueued work. + if (delay_compute_[stream_i % compute_streams_.size()]) { + delay_compute_[stream_i % compute_streams_.size()] = false; + } else { + break; } } - return compute_streams_[compute_stream_idx_++ % compute_streams_.size()]; + if (stream_token) { + *stream_token = stream_i; + } + return compute_streams_[stream_i % compute_streams_.size()]; +} + +CUstream _pi_queue::get_next_compute_stream(pi_uint32 num_events_in_wait_list, + const pi_event *event_wait_list, + _pi_stream_guard &guard, + pi_uint32 *stream_token) { + for (pi_uint32 i = 0; i < num_events_in_wait_list; i++) { + pi_uint32 token = event_wait_list[i]->get_stream_token(); + if (event_wait_list[i]->get_queue() == this && can_reuse_stream(token)) { + std::unique_lock compute_sync_guard( + compute_stream_sync_mutex_); + // redo the check after lock to avoid data races on + // last_sync_compute_streams_ + if (can_reuse_stream(token)) { + delay_compute_[token % delay_compute_.size()] = true; + if (stream_token) { + *stream_token = token; + } + guard = _pi_stream_guard{std::move(compute_sync_guard)}; + return event_wait_list[i]->get_stream(); + } + } + } + guard = {}; + return get_next_compute_stream(stream_token); } CUstream _pi_queue::get_next_transfer_stream() { @@ -399,9 +443,10 @@ CUstream _pi_queue::get_next_transfer_stream() { } _pi_event::_pi_event(pi_command_type type, pi_context context, pi_queue queue, - CUstream stream) + CUstream stream, pi_uint32 stream_token) : commandType_{type}, refCount_{1}, hasBeenWaitedOn_{false}, - isRecorded_{false}, isStarted_{false}, evEnd_{nullptr}, evStart_{nullptr}, + isRecorded_{false}, isStarted_{false}, + streamToken_{stream_token}, evEnd_{nullptr}, evStart_{nullptr}, evQueued_{nullptr}, queue_{queue}, stream_{stream}, context_{context} { bool profilingEnabled = queue_->properties_ & PI_QUEUE_PROFILING_ENABLE; @@ -2838,7 +2883,10 @@ pi_result cuda_piEnqueueKernelLaunch( std::unique_ptr<_pi_event> retImplEv{nullptr}; - CUstream cuStream = command_queue->get_next_compute_stream(); + pi_uint32 stream_token; + _pi_stream_guard guard; + CUstream cuStream = command_queue->get_next_compute_stream( + num_events_in_wait_list, event_wait_list, guard, &stream_token); CUfunction cuFunc = kernel->get(); retError = enqueueEventsWait(command_queue, cuStream, @@ -2863,8 +2911,9 @@ pi_result cuda_piEnqueueKernelLaunch( auto &argIndices = kernel->get_arg_indices(); if (event) { - retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue, cuStream)); + retImplEv = std::unique_ptr<_pi_event>( + _pi_event::make_native(PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue, + cuStream, stream_token)); retImplEv->start(); } @@ -3699,7 +3748,12 @@ pi_result cuda_piEnqueueEventsWaitWithBarrier(pi_queue command_queue, auto result = forLatestEvents(event_wait_list, num_events_in_wait_list, [command_queue](pi_event event) -> pi_result { - return enqueueEventWait(command_queue, event); + if (event->get_queue()->has_been_synchronized( + event->get_stream_token())) { + return PI_SUCCESS; + } else { + return enqueueEventWait(command_queue, event); + } }); if (result != PI_SUCCESS) { @@ -3708,8 +3762,12 @@ pi_result cuda_piEnqueueEventsWaitWithBarrier(pi_queue command_queue, } if (event) { + pi_uint32 stream_token; + _pi_stream_guard guard; + CUstream cuStream = command_queue->get_next_compute_stream( + num_events_in_wait_list, event_wait_list, guard, &stream_token); *event = _pi_event::make_native(PI_COMMAND_TYPE_MARKER, command_queue, - command_queue->get_next_compute_stream()); + cuStream, stream_token); (*event)->start(); (*event)->record(); } @@ -4767,12 +4825,15 @@ pi_result cuda_piextUSMEnqueueMemset(pi_queue queue, void *ptr, pi_int32 value, try { ScopedContext active(queue->get_context()); - CUstream cuStream = queue->get_next_compute_stream(); + pi_uint32 stream_token; + _pi_stream_guard guard; + CUstream cuStream = queue->get_next_compute_stream( + num_events_in_waitlist, events_waitlist, guard, &stream_token); result = enqueueEventsWait(queue, cuStream, num_events_in_waitlist, events_waitlist); if (event) { event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue, cuStream)); + PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue, cuStream, stream_token)); event_ptr->start(); } result = PI_CHECK_ERROR(cuMemsetD8Async( diff --git a/sycl/plugins/cuda/pi_cuda.hpp b/sycl/plugins/cuda/pi_cuda.hpp index 24d9ac9827d2f..f47f515ffdafe 100644 --- a/sycl/plugins/cuda/pi_cuda.hpp +++ b/sycl/plugins/cuda/pi_cuda.hpp @@ -55,6 +55,8 @@ pi_result cuda_piKernelGetGroupInfo(pi_kernel kernel, pi_device device, /// \endcond } +using _pi_stream_guard = std::unique_lock; + /// A PI platform stores all known PI devices, /// in the CUDA plugin this is just a vector of /// available devices since initialization is done @@ -382,6 +384,11 @@ struct _pi_queue { std::vector compute_streams_; std::vector transfer_streams_; + // delay_compute_ keeps track of which streams have been recently reused and + // their next use should be delayed. If a stream has been recently reused it + // will be skipped the next time it would be selected round-robin style. When + // skipped, its delay flag is cleared. + std::vector delay_compute_; _pi_context *context_; _pi_device *device_; pi_queue_properties properties_; @@ -394,6 +401,10 @@ struct _pi_queue { unsigned int last_sync_compute_streams_; unsigned int last_sync_transfer_streams_; unsigned int flags_; + // When compute_stream_sync_mutex_ and compute_stream_mutex_ both need to be + // locked at the same time, compute_stream_sync_mutex_ should be locked first + // to avoid deadlocks + std::mutex compute_stream_sync_mutex_; std::mutex compute_stream_mutex_; std::mutex transfer_stream_mutex_; @@ -402,7 +413,8 @@ struct _pi_queue { _pi_device *device, pi_queue_properties properties, unsigned int flags) : compute_streams_{std::move(compute_streams)}, - transfer_streams_{std::move(transfer_streams)}, context_{context}, + transfer_streams_{std::move(transfer_streams)}, + delay_compute_(compute_streams_.size(), false), context_{context}, device_{device}, properties_{properties}, refCount_{1}, eventCount_{0}, compute_stream_idx_{0}, transfer_stream_idx_{0}, num_compute_streams_{0}, num_transfer_streams_{0}, @@ -419,10 +431,47 @@ struct _pi_queue { // get_next_compute/transfer_stream() functions return streams from // appropriate pools in round-robin fashion - native_type get_next_compute_stream(); + native_type get_next_compute_stream(pi_uint32 *stream_token = nullptr); + // this overload tries select a stream that was used by one of dependancies. + // If that is not possible returns a new stream. If a stream is reused it + // returns a lock that needs to remain locked as long as the stream is in use + native_type get_next_compute_stream(pi_uint32 num_events_in_wait_list, + const pi_event *event_wait_list, + _pi_stream_guard &guard, + pi_uint32 *stream_token = nullptr); native_type get_next_transfer_stream(); native_type get() { return get_next_compute_stream(); }; + bool has_been_synchronized(pi_uint32 stream_token) { + // stream token not associated with one of the compute streams + if (stream_token == std::numeric_limits::max()) { + return false; + } + return last_sync_compute_streams_ >= stream_token; + } + + bool can_reuse_stream(pi_uint32 stream_token) { + // stream token not associated with one of the compute streams + if (stream_token == std::numeric_limits::max()) { + return true; + } + // If the command represented by the stream token was not the last command + // enqueued to the stream we can not reuse the stream - we need to allow for + // commands enqueued after it and the one we are about to enqueue to run + // concurrently + bool is_last_command = + (compute_stream_idx_ - stream_token) <= compute_streams_.size(); + // If there was a barrier enqueued to the queue after the command + // represented by the stream token we should not reuse the stream, as we can + // not take that stream into account for the bookkeeping for the next + // barrier - such a stream would not be synchronized with. Performance-wise + // it does not matter that we do not reuse the stream, as the work + // represented by the stream token is guaranteed to be complete by the + // barrier before any work we are about to enqueue to the stream will start, + // so the event does not need to be synchronized with. + return is_last_command && !has_been_synchronized(stream_token); + } + template void for_each_stream(T &&f) { { std::lock_guard compute_guard(compute_stream_mutex_); @@ -445,14 +494,23 @@ struct _pi_queue { } template void sync_streams(T &&f) { - auto sync = [&f](const std::vector &streams, unsigned int start, - unsigned int stop) { + auto sync_compute = [&f, &streams = compute_streams_, + &delay = delay_compute_](unsigned int start, + unsigned int stop) { + for (unsigned int i = start; i < stop; i++) { + f(streams[i]); + delay[i] = false; + } + }; + auto sync_transfer = [&f, &streams = transfer_streams_](unsigned int start, + unsigned int stop) { for (unsigned int i = start; i < stop; i++) { f(streams[i]); } }; { unsigned int size = static_cast(compute_streams_.size()); + std::lock_guard compute_sync_guard(compute_stream_sync_mutex_); std::lock_guard compute_guard(compute_stream_mutex_); unsigned int start = last_sync_compute_streams_; unsigned int end = num_compute_streams_ < size @@ -460,15 +518,15 @@ struct _pi_queue { : compute_stream_idx_.load(); last_sync_compute_streams_ = end; if (end - start >= size) { - sync(compute_streams_, 0, size); + sync_compute(0, size); } else { start %= size; end %= size; if (start < end) { - sync(compute_streams_, start, end); + sync_compute(start, end); } else { - sync(compute_streams_, start, size); - sync(compute_streams_, 0, end); + sync_compute(start, size); + sync_compute(0, end); } } } @@ -482,15 +540,15 @@ struct _pi_queue { : transfer_stream_idx_.load(); last_sync_transfer_streams_ = end; if (end - start >= size) { - sync(transfer_streams_, 0, size); + sync_transfer(0, size); } else { start %= size; end %= size; if (start < end) { - sync(transfer_streams_, start, end); + sync_transfer(start, end); } else { - sync(transfer_streams_, start, size); - sync(transfer_streams_, 0, end); + sync_transfer(start, size); + sync_transfer(0, end); } } } @@ -530,6 +588,8 @@ struct _pi_event { CUstream get_stream() const noexcept { return stream_; } + pi_uint32 get_stream_token() const noexcept { return streamToken_; } + pi_command_type get_command_type() const noexcept { return commandType_; } pi_uint32 get_reference_count() const noexcept { return refCount_; } @@ -573,9 +633,11 @@ struct _pi_event { pi_uint64 get_end_time() const; // construct a native CUDA. This maps closely to the underlying CUDA event. - static pi_event make_native(pi_command_type type, pi_queue queue, - CUstream stream) { - return new _pi_event(type, queue->get_context(), queue, stream); + static pi_event + make_native(pi_command_type type, pi_queue queue, CUstream stream, + pi_uint32 stream_token = std::numeric_limits::max()) { + return new _pi_event(type, queue->get_context(), queue, stream, + stream_token); } pi_result release(); @@ -586,7 +648,7 @@ struct _pi_event { // This constructor is private to force programmers to use the make_native / // make_user static members in order to create a pi_event for CUDA. _pi_event(pi_command_type type, pi_context context, pi_queue queue, - CUstream stream); + CUstream stream, pi_uint32 stream_token); pi_command_type commandType_; // The type of command associated with event. @@ -602,6 +664,7 @@ struct _pi_event { // PI event has started or not // + pi_uint32 streamToken_; pi_uint32 eventId_; // Queue identifier of the event. native_type evEnd_; // CUDA event handle. If this _pi_event represents a user