From 7022e069cfe8da2ca0a57505f8e3217455869de3 Mon Sep 17 00:00:00 2001 From: ronnywang Date: Fri, 14 Jul 2023 14:30:38 +0800 Subject: [PATCH] [CustomDevice] add stream safe allocator support (#55393) --- paddle/fluid/memory/allocation/CMakeLists.txt | 3 +- .../memory/allocation/allocator_facade.cc | 127 +++++++++- .../memory/allocation/allocator_facade.h | 9 + .../stream_safe_custom_device_allocator.cc | 232 ++++++++++++++++++ .../stream_safe_custom_device_allocator.h | 90 +++++++ paddle/fluid/platform/device_context.cc | 5 +- 6 files changed, 460 insertions(+), 6 deletions(-) create mode 100644 paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.cc create mode 100644 paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.h diff --git a/paddle/fluid/memory/allocation/CMakeLists.txt b/paddle/fluid/memory/allocation/CMakeLists.txt index 1a39590398911d..21ffde20022afc 100644 --- a/paddle/fluid/memory/allocation/CMakeLists.txt +++ b/paddle/fluid/memory/allocation/CMakeLists.txt @@ -51,7 +51,8 @@ if(UNIX AND NOT APPLE) endif() if(WITH_CUSTOM_DEVICE) - list(APPEND ALLOCATOR_SRCS custom_allocator.cc) + list(APPEND ALLOCATOR_SRCS custom_allocator.cc + stream_safe_custom_device_allocator.cc) endif() if(WITH_XPU) diff --git a/paddle/fluid/memory/allocation/allocator_facade.cc b/paddle/fluid/memory/allocation/allocator_facade.cc index 07e55115ba1308..804136e57b7e19 100644 --- a/paddle/fluid/memory/allocation/allocator_facade.cc +++ b/paddle/fluid/memory/allocation/allocator_facade.cc @@ -61,9 +61,9 @@ #endif #ifdef PADDLE_WITH_CUSTOM_DEVICE -#include "paddle/fluid/memory/allocation/custom_allocator.h" -#include "paddle/fluid/platform/device/device_wrapper.h" +#include "paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.h" #endif + #include "paddle/fluid/platform/flags.h" PADDLE_DEFINE_EXPORTED_int64( @@ -174,6 +174,11 @@ class AllocatorFacadePrivate { std::map>>; #endif +#ifdef PADDLE_WITH_CUSTOM_DEVICE + using CustomDeviceAllocatorMap = + std::map>>; +#endif explicit AllocatorFacadePrivate(bool allow_free_idle_chunk = true) { strategy_ = GetAllocatorStrategy(); @@ -564,6 +569,46 @@ class AllocatorFacadePrivate { } #endif +#ifdef PADDLE_WITH_CUSTOM_DEVICE + bool HasCustomDevice(const platform::CustomPlace& place, + phi::stream::stream_t stream) { + auto it = custom_device_allocators_.find(place); + if (it == custom_device_allocators_.end()) { + return false; + } + auto& allocator_map = it->second; + return allocator_map.find(stream) != allocator_map.end(); + } + + const std::shared_ptr& GetAllocator( + const platform::CustomPlace& place, + phi::stream::stream_t stream, + bool create_if_not_found = false) { + /* shared_lock_guard */ { + std::shared_lock lock_guard( + custom_device_allocator_mutex_); + if (LIKELY(HasCustomDevice(place, stream))) { + return custom_device_allocators_[place][stream]; + } else { + PADDLE_ENFORCE_NE(create_if_not_found, + false, + platform::errors::NotFound( + "No allocator found for stream %s in place %s " + "with create_if_not_found = false", + stream, + place)); + } + } + + /* unique_lock_guard */ { + std::unique_lock lock_guard( + custom_device_allocator_mutex_); + InitStreamSafeCustomDeviceAllocator(place, stream); + return custom_device_allocators_[place][stream]; + } + } +#endif + private: class ZeroSizeAllocator : public Allocator { public: @@ -1008,9 +1053,17 @@ class AllocatorFacadePrivate { allocators_[p] = std::make_shared(p); } + void InitNaiveBestFitCustomDeviceAllocator(platform::CustomPlace p, + phi::stream::stream_t stream) { + custom_device_allocators_[p][stream] = + std::make_shared(p); + } + void InitAutoGrowthCustomDeviceAllocator(platform::CustomPlace p, bool allow_free_idle_chunk) { auto chunk_size = FLAGS_auto_growth_chunk_size_in_mb << 20; + VLOG(4) << "FLAGS_auto_growth_chunk_size_in_mb is " + << FLAGS_auto_growth_chunk_size_in_mb; auto custom_allocator = std::make_shared(p); allocators_[p] = std::make_shared( @@ -1019,6 +1072,40 @@ class AllocatorFacadePrivate { /*chunk_size=*/chunk_size, allow_free_idle_chunk); } + + void InitAutoGrowthCustomDeviceAllocator(platform::CustomPlace p, + phi::stream::stream_t stream) { + auto chunk_size = FLAGS_auto_growth_chunk_size_in_mb << 20; + VLOG(4) << "FLAGS_auto_growth_chunk_size_in_mb is " + << FLAGS_auto_growth_chunk_size_in_mb; + + auto custom_allocator = + std::make_shared(p); + auto alignment = phi::DeviceManager::GetMinChunkSize(p); + custom_device_allocators_[p][stream] = + std::make_shared( + custom_allocator, alignment, chunk_size, allow_free_idle_chunk_); + } + + void WrapStreamSafeCustomDeviceAllocator(platform::CustomPlace p, + phi::stream::stream_t stream) { + std::shared_ptr& allocator = + custom_device_allocators_[p][stream]; + allocator = + std::make_shared(allocator, p, stream); + } + + void InitStreamSafeCustomDeviceAllocator(platform::CustomPlace p, + phi::stream::stream_t stream) { + VLOG(8) << "Init CustomDevice allocator for stream " << stream + << " in place " << p; + if (strategy_ == AllocatorStrategy::kAutoGrowth) { + InitAutoGrowthCustomDeviceAllocator(p, stream); + } else { + InitNaiveBestFitCustomDeviceAllocator(p, stream); + } + WrapStreamSafeCustomDeviceAllocator(p, stream); + } #endif void InitSystemAllocators() { @@ -1161,6 +1248,15 @@ class AllocatorFacadePrivate { std::shared_timed_mutex xpu_allocator_mutex_; #endif +#ifdef PADDLE_WITH_CUSTOM_DEVICE + // a standalone custom device allocator to support multi-stream GC in new + // executor + std::map> + default_stream_safe_custom_device_allocators_; + CustomDeviceAllocatorMap custom_device_allocators_; + std::shared_timed_mutex custom_device_allocator_mutex_; +#endif + AllocatorStrategy strategy_; AllocatorMap allocators_; static AllocatorMap zero_size_allocators_; @@ -1252,6 +1348,16 @@ std::shared_ptr AllocatorFacade::AllocShared( AllocationPtr AllocatorFacade::Alloc(const platform::Place& place, size_t size, const phi::Stream& stream) { +#ifdef PADDLE_WITH_CUSTOM_DEVICE + if (platform::is_custom_place(place)) { + platform::CustomPlace p(place); + phi::stream::stream_t s = + reinterpret_cast(stream.id()); + return GetPrivate() + ->GetAllocator(p, s, /* create_if_not_found = */ true) + ->Allocate(size); + } +#endif #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) AllocatorFacadePrivate* m = GetPrivate(); if (!m->IsStreamSafeCUDAAllocatorUsed()) { @@ -1270,8 +1376,8 @@ AllocationPtr AllocatorFacade::Alloc(const platform::Place& place, #elif defined(PADDLE_WITH_XPU) return GetAllocator(place)->Allocate(size); #else - PADDLE_THROW( - platform::errors::PreconditionNotMet("Not compiled with GPU or XPU.")); + PADDLE_THROW(platform::errors::PreconditionNotMet( + "Not compiled with GPU or XPU or CustomDevice.")); #endif } @@ -1376,6 +1482,19 @@ void AllocatorFacade::RemoveMemoryPoolOfCUDAGraph(int64_t id) { #endif #endif +#ifdef PADDLE_WITH_CUSTOM_DEVICE +const std::shared_ptr& AllocatorFacade::GetAllocator( + const platform::Place& place, phi::stream::stream_t stream) { + AllocatorFacadePrivate* m = GetPrivate(); + if (!FLAGS_use_stream_safe_cuda_allocator) { + return m->GetAllocator(place, + stream, + /*create_if_not_found=*/true); + } + return m->GetAllocator(place, /* A non-zero num to choose allocator_ */ 1); +} +#endif + UNUSED static std::shared_ptr unused_obj = std::make_shared(platform::CPUPlace()); diff --git a/paddle/fluid/memory/allocation/allocator_facade.h b/paddle/fluid/memory/allocation/allocator_facade.h index a1f21a5e69359a..986222a1b03c46 100644 --- a/paddle/fluid/memory/allocation/allocator_facade.h +++ b/paddle/fluid/memory/allocation/allocator_facade.h @@ -25,6 +25,11 @@ #include "paddle/fluid/platform/place.h" #include "paddle/phi/core/stream.h" +#ifdef PADDLE_WITH_CUSTOM_DEVICE +#include "paddle/fluid/memory/allocation/custom_allocator.h" +#include "paddle/phi/backends/device_manager.h" +#endif + namespace paddle { namespace memory { namespace allocation { @@ -91,6 +96,10 @@ class AllocatorFacade { void RemoveMemoryPoolOfCUDAGraph(int64_t id); #endif +#ifdef PADDLE_WITH_CUSTOM_DEVICE + const std::shared_ptr& GetAllocator(const platform::Place& place, + phi::stream::stream_t stream); +#endif // TODO(yy): Allocate a Copy-On-Write allocation? private: AllocatorFacade(); diff --git a/paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.cc b/paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.cc new file mode 100644 index 00000000000000..59f1ec86a56c36 --- /dev/null +++ b/paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.cc @@ -0,0 +1,232 @@ +// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.h" +#include + +#include "paddle/fluid/platform/profiler/event_tracing.h" + +namespace paddle { +namespace memory { +namespace allocation { + +StreamSafeCustomDeviceAllocation::StreamSafeCustomDeviceAllocation( + DecoratedAllocationPtr underlying_allocation, + phi::stream::stream_t owning_stream, + StreamSafeCustomDeviceAllocator* allocator) + : Allocation(underlying_allocation->ptr(), + underlying_allocation->base_ptr(), + underlying_allocation->size(), + underlying_allocation->place()), + underlying_allocation_(std::move(underlying_allocation)), + owning_stream_(std::move(owning_stream)), + allocator_(allocator->shared_from_this()) {} + +void StreamSafeCustomDeviceAllocation::RecordStream( + phi::stream::stream_t stream) { + VLOG(8) << "Try record stream " << stream << " for address " << ptr(); + if (stream == owning_stream_) { + return; + } + std::call_once(once_flag_, [this] { phi::DeviceManager::SetDevice(place_); }); + std::lock_guard lock_guard(outstanding_event_map_lock_); + + auto it = outstanding_event_map_.find(stream); + if (it == outstanding_event_map_.end()) { + outstanding_event_map_[stream].Init(place()); + VLOG(9) << "Create a new event " + << outstanding_event_map_[stream].raw_event(); + auto stream_wrapper = phi::stream::Stream(place(), stream); + VLOG(8) << "Record event " << it->second.raw_event() << " to stream " + << stream; + outstanding_event_map_[stream].Record(&stream_wrapper); + } +} + +void StreamSafeCustomDeviceAllocation::MarkAsWillBeFreed() { + std::call_once(once_flag_, [this] { phi::DeviceManager::SetDevice(place_); }); + std::lock_guard lock_guard(outstanding_event_map_lock_); + if (!will_be_freed_) { + will_be_freed_ = false; + VLOG(8) << "ptr: " << ptr() << " will be freed"; + if (phi::DeviceManager::HasDeviceType(place_.GetDeviceType()) && + outstanding_event_map_.find(owning_stream_) == + outstanding_event_map_.end()) { + outstanding_event_map_[owning_stream_].Init(place_); + VLOG(9) << "Create a new event " + << outstanding_event_map_[owning_stream_].raw_event(); + auto stream_wrapper = phi::stream::Stream(place_, owning_stream_); + VLOG(8) << "Record event " + << outstanding_event_map_[owning_stream_].raw_event() + << " to stream " << owning_stream_; + outstanding_event_map_[owning_stream_].Record(&stream_wrapper); + } + } +} + +bool StreamSafeCustomDeviceAllocation::CanBeFreed() { + std::call_once(once_flag_, [this] { phi::DeviceManager::SetDevice(place_); }); + std::lock_guard lock_guard(outstanding_event_map_lock_); + if (!phi::DeviceManager::HasDeviceType(place_.GetDeviceType())) { + return true; + } + for (auto it = outstanding_event_map_.begin(); + it != outstanding_event_map_.end(); + ++it) { + auto& event = it->second; + if (!event.Query()) { + VLOG(9) << "Event " << event.raw_event() << " for " << ptr() + << " is not completed"; + return false; + } + VLOG(8) << "Destroy event " << event.raw_event(); + outstanding_event_map_.erase(outstanding_event_map_.begin(), it); + event.Destroy(); + } + return true; +} + +phi::stream::stream_t StreamSafeCustomDeviceAllocation::GetOwningStream() + const { + return owning_stream_; +} + +StreamSafeCustomDeviceAllocator::StreamSafeCustomDeviceAllocator( + std::shared_ptr underlying_allocator, + platform::CustomPlace place, + phi::stream::stream_t default_stream) + : underlying_allocator_(std::move(underlying_allocator)), + place_(std::move(place)), + default_stream_(std::move(default_stream)) { + std::lock_guard lock_guard(allocator_map_lock_); + allocator_map_[place].emplace_back(this); +} + +StreamSafeCustomDeviceAllocator::~StreamSafeCustomDeviceAllocator() { + std::lock_guard lock_guard(allocator_map_lock_); + std::vector& allocators = + allocator_map_[place_]; + allocators.erase(std::remove(allocators.begin(), allocators.end(), this), + allocators.end()); +} + +phi::stream::stream_t StreamSafeCustomDeviceAllocator::GetDefaultStream() + const { + return default_stream_; +} + +void StreamSafeCustomDeviceAllocator::SetDefaultStream( + phi::stream::stream_t stream) { + default_stream_ = stream; +} + +phi::Allocation* StreamSafeCustomDeviceAllocator::AllocateImpl(size_t size) { + platform::RecordEvent record("StreamSafeCustomDeviceAllocator::Allocate", + platform::TracerEventType::UserDefined, + 9 /*level*/); + ProcessUnfreedAllocations(); + VLOG(8) << "Try allocate " << size << " bytes"; + AllocationPtr underlying_allocation; + try { + underlying_allocation = underlying_allocator_->Allocate(size); + } catch (BadAlloc&) { + VLOG(4) << "Allocation failed when allocating " << size << " bytes"; + ReleaseImpl(place_); + try { + underlying_allocation = underlying_allocator_->Allocate(size); + } catch (...) { + VLOG(3) + << "Still allocation failed after release memory from all streams"; + throw; + } + } catch (...) { + throw; + } + StreamSafeCustomDeviceAllocation* allocation = + new StreamSafeCustomDeviceAllocation( + static_unique_ptr_cast(std::move(underlying_allocation)), + default_stream_, + this); + VLOG(8) << "Thread " << std::this_thread::get_id() << " Allocate " + << allocation->size() << " bytes at address " << allocation->ptr() + << " , stream: " << default_stream_; + return allocation; +} + +void StreamSafeCustomDeviceAllocator::FreeImpl(phi::Allocation* allocation) { + platform::RecordEvent record("StreamSafeCustomDeviceAllocator::Free", + platform::TracerEventType::UserDefined, + 9 /*level*/); + StreamSafeCustomDeviceAllocation* stream_safe_cuda_allocation = + static_cast(allocation); + + VLOG(8) << "Try free allocation " << stream_safe_cuda_allocation->ptr(); + stream_safe_cuda_allocation->MarkAsWillBeFreed(); + if (stream_safe_cuda_allocation->CanBeFreed()) { + VLOG(9) << "Directly delete allocation"; + delete stream_safe_cuda_allocation; + } else { + VLOG(9) << "Put into unfreed_allocation list"; + std::lock_guard lock_guard(unfreed_allocation_lock_); + unfreed_allocations_.emplace_back(stream_safe_cuda_allocation); + } +} + +uint64_t StreamSafeCustomDeviceAllocator::ReleaseImpl( + const platform::Place& place) { + std::lock_guard lock_guard(allocator_map_lock_); + std::vector& allocators = + allocator_map_[place]; + uint64_t released_size = 0; + for (StreamSafeCustomDeviceAllocator* allocator : allocators) { + released_size += allocator->ProcessUnfreedAllocationsAndRelease(); + } + VLOG(8) << "Release " << released_size << " bytes memory from all streams"; + return released_size; +} + +void StreamSafeCustomDeviceAllocator::ProcessUnfreedAllocations() { + // NOTE(Ruibiao): This condition is to reduce lock competion. It does not need + // to be thread-safe since here occasional misjudgments are permissible. + if (unfreed_allocations_.empty()) { + return; + } + + std::lock_guard lock_guard(unfreed_allocation_lock_); + for (auto it = unfreed_allocations_.begin(); + it != unfreed_allocations_.end();) { + if ((*it)->CanBeFreed()) { + delete *it; + it = unfreed_allocations_.erase(it); + } else { + ++it; + } + } +} + +uint64_t +StreamSafeCustomDeviceAllocator::ProcessUnfreedAllocationsAndRelease() { + ProcessUnfreedAllocations(); + return underlying_allocator_->Release(place_); +} + +thread_local std::once_flag StreamSafeCustomDeviceAllocation::once_flag_; + +std::map> + StreamSafeCustomDeviceAllocator::allocator_map_; +SpinLock StreamSafeCustomDeviceAllocator::allocator_map_lock_; + +} // namespace allocation +} // namespace memory +} // namespace paddle diff --git a/paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.h b/paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.h new file mode 100644 index 00000000000000..997075e124e61f --- /dev/null +++ b/paddle/fluid/memory/allocation/stream_safe_custom_device_allocator.h @@ -0,0 +1,90 @@ +// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "paddle/fluid/memory/allocation/allocator.h" +#include "paddle/fluid/memory/allocation/spin_lock.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/phi/backends/device_manager.h" + +namespace paddle { +namespace memory { +namespace allocation { + +class StreamSafeCustomDeviceAllocator; + +class StreamSafeCustomDeviceAllocation : public Allocation { + public: + StreamSafeCustomDeviceAllocation(DecoratedAllocationPtr underlying_allocation, + phi::stream::stream_t owning_stream, + StreamSafeCustomDeviceAllocator *allocator); + + void RecordStream(phi::stream::stream_t stream); + bool CanBeFreed(); + void MarkAsWillBeFreed(); + phi::stream::stream_t GetOwningStream() const; + + private: + thread_local static std::once_flag once_flag_; + DecoratedAllocationPtr underlying_allocation_; + std::map outstanding_event_map_; + phi::stream::stream_t owning_stream_; + SpinLock outstanding_event_map_lock_; + std::shared_ptr allocator_; + bool will_be_freed_{false}; +}; + +class StreamSafeCustomDeviceAllocator + : public Allocator, + public std::enable_shared_from_this { + public: + StreamSafeCustomDeviceAllocator( + std::shared_ptr underlying_allocator, + platform::CustomPlace place, + phi::stream::stream_t default_stream); + ~StreamSafeCustomDeviceAllocator(); + + bool IsAllocThreadSafe() const override { return true; } + phi::stream::stream_t GetDefaultStream() const; + void SetDefaultStream(phi::stream::stream_t stream); + + protected: + phi::Allocation *AllocateImpl(size_t size) override; + void FreeImpl(phi::Allocation *allocation) override; + uint64_t ReleaseImpl(const platform::Place &place) override; + + private: + void ProcessUnfreedAllocations(); + uint64_t ProcessUnfreedAllocationsAndRelease(); + + static std::map> + allocator_map_; + static SpinLock allocator_map_lock_; + + std::shared_ptr underlying_allocator_; + platform::CustomPlace place_; + phi::stream::stream_t default_stream_; + std::list unfreed_allocations_; + SpinLock unfreed_allocation_lock_; +}; + +} // namespace allocation +} // namespace memory +} // namespace paddle diff --git a/paddle/fluid/platform/device_context.cc b/paddle/fluid/platform/device_context.cc index 456abd55ef68fe..01acd27386a2de 100644 --- a/paddle/fluid/platform/device_context.cc +++ b/paddle/fluid/platform/device_context.cc @@ -108,9 +108,12 @@ inline std::unique_ptr CreateDeviceContext( dev_ctx->SetAllocator(instance.GetAllocator(p).get()); dev_ctx->SetGenerator(phi::DefaultXPUGenerator(p.GetDeviceId()).get()); #endif +#ifdef PADDLE_WITH_CUSTOM_DEVICE } else if (p.GetType() == phi::AllocationType::CUSTOM) { - dev_ctx->SetAllocator(instance.GetAllocator(p).get()); + auto* custom_ctx = dynamic_cast(dev_ctx); + dev_ctx->SetAllocator(instance.GetAllocator(p, custom_ctx->stream()).get()); dev_ctx->SetGenerator(phi::DefaultCustomDeviceGenerator(p).get()); +#endif } else { dev_ctx->SetAllocator(instance.GetAllocator(p).get()); dev_ctx->SetGenerator(phi::DefaultCPUGenerator().get());