Skip to content

Commit

Permalink
Simplify MemoryManager (#4522)
Browse files Browse the repository at this point in the history
* Simplify MemoryManager

* Fix tests

* Add MemoryPool abstraction

* Misc fixes

* Remove MemoryManager

* Tweak doc

* Rename module

* Format

* Review feedback

* Further tweaks

* Fix Drop
  • Loading branch information
tustvold authored Dec 19, 2022
1 parent 30de028 commit dba34fc
Show file tree
Hide file tree
Showing 22 changed files with 747 additions and 1,080 deletions.
30 changes: 20 additions & 10 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ use url::Url;

use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
use uuid::Uuid;

use super::options::{
Expand Down Expand Up @@ -1960,6 +1961,11 @@ impl TaskContext {
self.task_id.clone()
}

/// Return the [`MemoryPool`] associated with this [TaskContext]
pub fn memory_pool(&self) -> &Arc<dyn MemoryPool> {
&self.runtime.memory_pool
}

/// Return the [RuntimeEnv] associated with this [TaskContext]
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
self.runtime.clone()
Expand Down Expand Up @@ -2025,6 +2031,7 @@ mod tests {
use super::*;
use crate::assert_batches_eq;
use crate::execution::context::QueryPlanner;
use crate::execution::memory_pool::MemoryConsumer;
use crate::execution::runtime_env::RuntimeConfig;
use crate::physical_plan::expressions::AvgAccumulator;
use crate::test;
Expand All @@ -2046,24 +2053,27 @@ mod tests {
#[tokio::test]
async fn shared_memory_and_disk_manager() {
// Demonstrate the ability to share DiskManager and
// MemoryManager between two different executions.
// MemoryPool between two different executions.
let ctx1 = SessionContext::new();

// configure with same memory / disk manager
let memory_manager = ctx1.runtime_env().memory_manager.clone();
let memory_pool = ctx1.runtime_env().memory_pool.clone();

let mut reservation = MemoryConsumer::new("test").register(&memory_pool);
reservation.grow(100);

let disk_manager = ctx1.runtime_env().disk_manager.clone();

let ctx2 =
SessionContext::with_config_rt(SessionConfig::new(), ctx1.runtime_env());

assert!(std::ptr::eq(
Arc::as_ptr(&memory_manager),
Arc::as_ptr(&ctx1.runtime_env().memory_manager)
));
assert!(std::ptr::eq(
Arc::as_ptr(&memory_manager),
Arc::as_ptr(&ctx2.runtime_env().memory_manager)
));
assert_eq!(ctx1.runtime_env().memory_pool.reserved(), 100);
assert_eq!(ctx2.runtime_env().memory_pool.reserved(), 100);

drop(reservation);

assert_eq!(ctx1.runtime_env().memory_pool.reserved(), 0);
assert_eq!(ctx2.runtime_env().memory_pool.reserved(), 0);

assert!(std::ptr::eq(
Arc::as_ptr(&disk_manager),
Expand Down
Loading

0 comments on commit dba34fc

Please sign in to comment.