Skip to content

Commit

Permalink
Update object_store to pull in GCS token refresh fix (#770)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Oct 29, 2024
1 parent 545a52b commit 201ae6d
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 7 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafu
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.10.2/arroyo' }

cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
7 changes: 2 additions & 5 deletions crates/arroyo-state/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::{HashMap, HashSet};
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Mutex;
use tracing::{debug, info};

pub const FULL_KEY_RANGE: RangeInclusive<u64> = 0..=u64::MAX;
Expand Down Expand Up @@ -122,7 +121,7 @@ impl BackingStore for ParquetBackend {
})
.collect();

let storage_client = Mutex::new(get_storage_provider().await?);
let storage_client = get_storage_provider().await?;

// wait for all of the futures to complete
while let Some(result) = futures.next().await {
Expand All @@ -134,7 +133,7 @@ impl BackingStore for ParquetBackend {
epoch_to_remove,
&operator_id,
));
storage_client.lock().await.delete_if_present(path).await?;
storage_client.delete_if_present(path).await?;
}
debug!(
message = "Finished cleaning operator",
Expand All @@ -146,8 +145,6 @@ impl BackingStore for ParquetBackend {

for epoch_to_remove in old_min_epoch..min_epoch {
storage_client
.lock()
.await
.delete_if_present(metadata_path(&base_path(&metadata.job_id, epoch_to_remove)))
.await?;
}
Expand Down

0 comments on commit 201ae6d

Please sign in to comment.