diff --git a/Cargo.lock b/Cargo.lock index 856cf93e8..d8c93a9c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5905,8 +5905,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6da452820c715ce78221e8202ccc599b4a52f3e1eb3eedb487b680c81a8e3f3" +source = "git+http://github.com/ArroyoSystems/arrow-rs?branch=object_store_0.10.2/arroyo#659a39f8e22e405f5328bbd2ef2e032537419bab" dependencies = [ "async-trait", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index 6b4f4c5ee..629abdc99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafu datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'} datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'} datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'} +object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.10.2/arroyo' } cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" } cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" } diff --git a/crates/arroyo-state/src/parquet.rs b/crates/arroyo-state/src/parquet.rs index 96813e249..ca772dd01 100644 --- a/crates/arroyo-state/src/parquet.rs +++ b/crates/arroyo-state/src/parquet.rs @@ -16,7 +16,6 @@ use std::collections::{HashMap, HashSet}; use std::ops::RangeInclusive; use std::sync::Arc; use std::time::SystemTime; -use tokio::sync::Mutex; use tracing::{debug, info}; pub const FULL_KEY_RANGE: RangeInclusive = 0..=u64::MAX; @@ -122,7 +121,7 @@ impl BackingStore for ParquetBackend { }) .collect(); - let storage_client = Mutex::new(get_storage_provider().await?); + let storage_client = get_storage_provider().await?; // wait for all of the futures to complete while let Some(result) = futures.next().await { @@ -134,7 +133,7 @@ impl BackingStore for ParquetBackend { epoch_to_remove, &operator_id, )); - storage_client.lock().await.delete_if_present(path).await?; + storage_client.delete_if_present(path).await?; } debug!( message = "Finished cleaning operator", @@ -146,8 +145,6 @@ impl BackingStore for ParquetBackend { for epoch_to_remove in old_min_epoch..min_epoch { storage_client - .lock() - .await .delete_if_present(metadata_path(&base_path(&metadata.job_id, epoch_to_remove))) .await?; }