diff --git a/sycl/source/detail/queue_impl.cpp b/sycl/source/detail/queue_impl.cpp index 966b82387db1a..592141e74eae2 100644 --- a/sycl/source/detail/queue_impl.cpp +++ b/sycl/source/detail/queue_impl.cpp @@ -558,6 +558,10 @@ pi_native_handle queue_impl::getNative(int32_t &NativeHandleDesc) const { return Handle; } +void queue_impl::cleanup_fusion_cmd() { + detail::Scheduler::getInstance().cleanUpCmdFusion(this); +} + bool queue_impl::ext_oneapi_empty() const { // If we have in-order queue where events are not discarded then just check // the status of the last event. diff --git a/sycl/source/detail/queue_impl.hpp b/sycl/source/detail/queue_impl.hpp index 071b1a4c8fd59..0b190fd272042 100644 --- a/sycl/source/detail/queue_impl.hpp +++ b/sycl/source/detail/queue_impl.hpp @@ -313,6 +313,7 @@ class queue_impl { #endif throw_asynchronous(); if (!MHostQueue) { + cleanup_fusion_cmd(); getPlugin()->call(MQueues[0]); } } @@ -693,6 +694,9 @@ class queue_impl { } protected: + // Hook to the scheduler to clean up any fusion command held on destruction. + void cleanup_fusion_cmd(); + // template is needed for proper unit testing template void finalizeHandler(HandlerType &Handler, const CG::CGTYPE &Type, diff --git a/sycl/source/detail/scheduler/commands.cpp b/sycl/source/detail/scheduler/commands.cpp index e7e24ecb536f8..d9c5a08c6f538 100644 --- a/sycl/source/detail/scheduler/commands.cpp +++ b/sycl/source/detail/scheduler/commands.cpp @@ -3156,6 +3156,9 @@ pi_int32 KernelFusionCommand::enqueueImp() { waitForPreparedHostEvents(); waitForEvents(MQueue, MPreparedDepsEvents, MEvent->getHandleRef()); + // We need to release the queue here because KernelFusionCommands are + // held back by the scheduler thus prevent the deallocation of the queue. + resetQueue(); return PI_SUCCESS; } @@ -3163,6 +3166,14 @@ void KernelFusionCommand::setFusionStatus(FusionStatus Status) { MStatus = Status; } +void KernelFusionCommand::resetQueue() { + assert(MStatus != FusionStatus::ACTIVE && + "Cannot release the queue attached to the KernelFusionCommand if it " + "is active."); + MQueue.reset(); + MWorkerQueue.reset(); +} + void KernelFusionCommand::emitInstrumentationData() { #ifdef XPTI_ENABLE_INSTRUMENTATION constexpr uint16_t NotificationTraceType = xpti::trace_node_create; diff --git a/sycl/source/detail/scheduler/commands.hpp b/sycl/source/detail/scheduler/commands.hpp index 1b83042b8294f..80d5b42c30d3a 100644 --- a/sycl/source/detail/scheduler/commands.hpp +++ b/sycl/source/detail/scheduler/commands.hpp @@ -727,6 +727,11 @@ class KernelFusionCommand : public Command { /// only be called under the protection of the scheduler write-lock. void setFusionStatus(FusionStatus Status); + /// Reset the queue. This can be required as the command is held in order + /// to maintain events alive, however this prevent the normal destruction of + /// the queue. + void resetQueue(); + bool isActive() const { return MStatus == FusionStatus::ACTIVE; } bool readyForDeletion() const { return MStatus == FusionStatus::DELETED; } diff --git a/sycl/source/detail/scheduler/graph_builder.cpp b/sycl/source/detail/scheduler/graph_builder.cpp index c4e52a76cd8c5..cc1082fce4f25 100644 --- a/sycl/source/detail/scheduler/graph_builder.cpp +++ b/sycl/source/detail/scheduler/graph_builder.cpp @@ -942,7 +942,7 @@ Scheduler::GraphBuildResult Scheduler::GraphBuilder::addCG( // they create any requirement or event dependency on any of the kernels in // the fusion list, this will lead to cancellation of the fusion in the // GraphProcessor. - auto QUniqueID = std::hash()(Queue); + auto QUniqueID = std::hash()(Queue.get()); if (isInFusionMode(QUniqueID) && !NewCmd->isHostTask()) { auto *FusionCmd = findFusionList(QUniqueID)->second.get(); @@ -1349,7 +1349,14 @@ Command *Scheduler::GraphBuilder::connectDepEvent( } void Scheduler::GraphBuilder::startFusion(QueueImplPtr Queue) { - auto QUniqueID = std::hash()(Queue); + cleanUpCmdFusion(Queue.get()); + auto QUniqueID = std::hash()(Queue.get()); + MFusionMap.emplace(QUniqueID, std::make_unique(Queue)); +} + +void Scheduler::GraphBuilder::cleanUpCmdFusion( + sycl::detail::queue_impl *Queue) { + auto QUniqueID = std::hash()(Queue); if (isInFusionMode(QUniqueID)) { throw sycl::exception{sycl::make_error_code(sycl::errc::invalid), "Queue already in fusion mode"}; @@ -1365,7 +1372,6 @@ void Scheduler::GraphBuilder::startFusion(QueueImplPtr Queue) { cleanupCommand(OldFusionCmd->second.release()); MFusionMap.erase(OldFusionCmd); } - MFusionMap.emplace(QUniqueID, std::make_unique(Queue)); } void Scheduler::GraphBuilder::removeNodeFromGraph( @@ -1404,7 +1410,7 @@ void Scheduler::GraphBuilder::removeNodeFromGraph( void Scheduler::GraphBuilder::cancelFusion(QueueImplPtr Queue, std::vector &ToEnqueue) { - auto QUniqueID = std::hash()(Queue); + auto QUniqueID = std::hash()(Queue.get()); if (!isInFusionMode(QUniqueID)) { return; } @@ -1492,7 +1498,7 @@ EventImplPtr Scheduler::GraphBuilder::completeFusion(QueueImplPtr Queue, std::vector &ToEnqueue, const property_list &PropList) { - auto QUniqueID = std::hash()(Queue); + auto QUniqueID = std::hash()(Queue.get()); #if SYCL_EXT_CODEPLAY_KERNEL_FUSION if (!isInFusionMode(QUniqueID)) { auto InactiveFusionList = findFusionList(QUniqueID); diff --git a/sycl/source/detail/scheduler/scheduler.cpp b/sycl/source/detail/scheduler/scheduler.cpp index b674c3a820a00..250be7c17d350 100644 --- a/sycl/source/detail/scheduler/scheduler.cpp +++ b/sycl/source/detail/scheduler/scheduler.cpp @@ -569,13 +569,22 @@ void Scheduler::cleanupAuxiliaryResources(BlockingT Blocking) { void Scheduler::startFusion(QueueImplPtr Queue) { WriteLockT Lock = acquireWriteLock(); + WriteLockT FusionMapLock = acquireFusionWriteLock(); MGraphBuilder.startFusion(Queue); } +void Scheduler::cleanUpCmdFusion(sycl::detail::queue_impl *Queue) { + // No graph lock, we might be called because the graph builder is releasing + // resources. + WriteLockT FusionMapLock = acquireFusionWriteLock(); + MGraphBuilder.cleanUpCmdFusion(Queue); +} + void Scheduler::cancelFusion(QueueImplPtr Queue) { std::vector ToEnqueue; { WriteLockT Lock = acquireWriteLock(); + WriteLockT FusionMapLock = acquireFusionWriteLock(); MGraphBuilder.cancelFusion(Queue, ToEnqueue); } enqueueCommandForCG(nullptr, ToEnqueue); @@ -587,6 +596,7 @@ EventImplPtr Scheduler::completeFusion(QueueImplPtr Queue, EventImplPtr FusedEvent; { WriteLockT Lock = acquireWriteLock(); + WriteLockT FusionMapLock = acquireFusionWriteLock(); FusedEvent = MGraphBuilder.completeFusion(Queue, ToEnqueue, PropList); } enqueueCommandForCG(nullptr, ToEnqueue); @@ -595,7 +605,7 @@ EventImplPtr Scheduler::completeFusion(QueueImplPtr Queue, } bool Scheduler::isInFusionMode(QueueIdT queue) { - ReadLockT Lock = acquireReadLock(); + ReadLockT Lock = acquireFusionReadLock(); return MGraphBuilder.isInFusionMode(queue); } diff --git a/sycl/source/detail/scheduler/scheduler.hpp b/sycl/source/detail/scheduler/scheduler.hpp index 0681f5f80cff2..6c872abc4ef7a 100644 --- a/sycl/source/detail/scheduler/scheduler.hpp +++ b/sycl/source/detail/scheduler/scheduler.hpp @@ -449,6 +449,8 @@ class Scheduler { void startFusion(QueueImplPtr Queue); + void cleanUpCmdFusion(sycl::detail::queue_impl *Queue); + void cancelFusion(QueueImplPtr Queue); EventImplPtr completeFusion(QueueImplPtr Queue, const property_list &); @@ -488,10 +490,33 @@ class Scheduler { return Lock; } + /// Provides exclusive access to std::shared_timed_mutex object with deadlock + /// avoidance to the Fusion map + WriteLockT acquireFusionWriteLock() { +#ifdef _WIN32 + WriteLockT Lock(MFusionMapLock, std::defer_lock); + while (!Lock.try_lock_for(std::chrono::milliseconds(10))) { + // Without yield while loop acts like endless while loop and occupies the + // whole CPU when multiple command groups are created in multiple host + // threads + std::this_thread::yield(); + } +#else + WriteLockT Lock(MFusionMapLock); + // It is a deadlock on UNIX in implementation of lock and lock_shared, if + // try_lock in the loop above will be executed, so using a single lock here +#endif // _WIN32 + return Lock; + } + /// Provides shared access to std::shared_timed_mutex object with deadlock /// avoidance ReadLockT acquireReadLock() { return ReadLockT{MGraphLock}; } + /// Provides shared access to std::shared_timed_mutex object with deadlock + /// avoidance to the Fusion map + ReadLockT acquireFusionReadLock() { return ReadLockT{MFusionMapLock}; } + void cleanupCommands(const std::vector &Cmds); void NotifyHostTaskCompletion(Command *Cmd); @@ -627,6 +652,10 @@ class Scheduler { void startFusion(QueueImplPtr Queue); + /// Clean up the internal fusion commands held for the given queue. + /// @param Queue the queue for which to remove the fusion commands. + void cleanUpCmdFusion(sycl::detail::queue_impl *Queue); + void cancelFusion(QueueImplPtr Queue, std::vector &ToEnqueue); EventImplPtr completeFusion(QueueImplPtr Queue, @@ -870,6 +899,7 @@ class Scheduler { GraphBuilder MGraphBuilder; RWLockT MGraphLock; + RWLockT MFusionMapLock; std::vector MDeferredCleanupCommands; std::mutex MDeferredCleanupMutex;