From efb799bf5218f501438bff460f0222d34c156345 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 25 Jul 2024 16:14:56 -0700 Subject: [PATCH] Avoid recreating storage providers --- crates/arroyo-server-common/src/lib.rs | 4 +- crates/arroyo-state/src/lib.rs | 27 +++++++- crates/arroyo-state/src/parquet.rs | 20 ++---- .../arroyo-state/src/tables/table_manager.rs | 18 +----- crates/arroyo-storage/src/aws.rs | 3 +- crates/arroyo-storage/src/lib.rs | 63 +------------------ 6 files changed, 38 insertions(+), 97 deletions(-) diff --git a/crates/arroyo-server-common/src/lib.rs b/crates/arroyo-server-common/src/lib.rs index 0606476ec..72899cc1e 100644 --- a/crates/arroyo-server-common/src/lib.rs +++ b/crates/arroyo-server-common/src/lib.rs @@ -61,7 +61,9 @@ pub fn init_logging_with_filter(_name: &str, filter: EnvFilter) -> WorkerGuard { eprintln!("Failed to initialize log tracer {:?}", e); } - let filter = filter.add_directive("refinery_core=warn".parse().unwrap()); + let filter = filter + .add_directive("refinery_core=warn".parse().unwrap()) + .add_directive("aws_config::profile::credentials=warn".parse().unwrap()); let (nonblocking, guard) = tracing_appender::non_blocking(std::io::stderr()); diff --git a/crates/arroyo-state/src/lib.rs b/crates/arroyo-state/src/lib.rs index 0149dc1da..1b1fc23f3 100644 --- a/crates/arroyo-state/src/lib.rs +++ b/crates/arroyo-state/src/lib.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use arrow_array::RecordBatch; use arroyo_rpc::grpc::rpc::{ CheckpointMetadata, ExpiringKeyedTimeTableConfig, GlobalKeyedTableConfig, @@ -9,12 +9,15 @@ use async_trait::async_trait; use bincode::config::Configuration; use bincode::{Decode, Encode}; +use arroyo_rpc::config::config; use arroyo_rpc::df::ArroyoSchema; +use arroyo_storage::StorageProvider; use prost::Message; use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::ops::RangeInclusive; +use std::sync::Arc; use std::time::{Duration, SystemTime}; pub mod checkpoint_state; @@ -160,3 +163,25 @@ pub fn hash_key(key: &K) -> u64 { key.hash(&mut hasher); hasher.finish() } + +static STORAGE_PROVIDER: tokio::sync::OnceCell> = + tokio::sync::OnceCell::const_new(); + +pub(crate) async fn get_storage_provider() -> Result<&'static Arc> { + // TODO: this should be encoded in the config so that the controller doesn't need + // to be synchronized with the workers + + STORAGE_PROVIDER + .get_or_try_init(|| async { + let storage_url = &config().checkpoint_url; + + StorageProvider::for_url(storage_url) + .await + .context(format!( + "failed to construct checkpoint backend for URL {}", + storage_url + )) + .map(Arc::new) + }) + .await +} diff --git a/crates/arroyo-state/src/parquet.rs b/crates/arroyo-state/src/parquet.rs index ef4312385..4b7c94974 100644 --- a/crates/arroyo-state/src/parquet.rs +++ b/crates/arroyo-state/src/parquet.rs @@ -1,12 +1,11 @@ use crate::tables::expiring_time_key_map::ExpiringTimeKeyTable; use crate::tables::global_keyed_map::GlobalKeyedTable; use crate::tables::{CompactionConfig, ErasedTable}; -use crate::BackingStore; -use anyhow::{bail, Context, Result}; +use crate::{get_storage_provider, BackingStore}; +use anyhow::{bail, Result}; use arroyo_rpc::grpc::rpc::{ CheckpointMetadata, OperatorCheckpointMetadata, TableCheckpointMetadata, }; -use arroyo_storage::StorageProvider; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -23,17 +22,6 @@ use tracing::{debug, info}; pub const FULL_KEY_RANGE: RangeInclusive = 0..=u64::MAX; pub const GENERATIONS_TO_COMPACT: u32 = 1; // only compact generation 0 files -async fn get_storage_provider() -> anyhow::Result { - // TODO: this should be encoded in the config so that the controller doesn't need - // to be synchronized with the workers - let storage_url = &config().checkpoint_url; - - StorageProvider::for_url(storage_url).await.context(format!( - "failed to construct checkpoint backend for URL {}", - storage_url - )) -} - pub struct ParquetBackend; fn base_path(job_id: &str, epoch: u32) -> String { @@ -178,11 +166,11 @@ impl ParquetBackend { Self::load_operator_metadata(&job_id, &operator_id, epoch) .await? .expect("expect operator metadata to still be present"); - let storage_provider = Arc::new(get_storage_provider().await?); + let storage_provider = get_storage_provider().await?; let compaction_config = CompactionConfig { - storage_provider, compact_generations: vec![0].into_iter().collect(), min_compaction_epochs: min_files_to_compact, + storage_provider: Arc::clone(storage_provider), }; let operator_metadata = operator_checkpoint_metadata.operator_metadata.unwrap(); diff --git a/crates/arroyo-state/src/tables/table_manager.rs b/crates/arroyo-state/src/tables/table_manager.rs index 39d98f495..7a5c8bf0d 100644 --- a/crates/arroyo-state/src/tables/table_manager.rs +++ b/crates/arroyo-state/src/tables/table_manager.rs @@ -21,7 +21,7 @@ use tokio::sync::{ use arroyo_rpc::config::config; use tracing::{debug, error, info, warn}; -use crate::{tables::global_keyed_map::GlobalKeyedTable, StateMessage}; +use crate::{get_storage_provider, tables::global_keyed_map::GlobalKeyedTable, StateMessage}; use crate::{CheckpointMessage, TableData}; use super::expiring_time_key_map::{ @@ -225,20 +225,6 @@ impl BackendWriter { } } -async fn get_storage_provider() -> anyhow::Result { - // TODO: this should be encoded in the config so that the controller doesn't need - // to be synchronized with the workers - - Ok(Arc::new( - StorageProvider::for_url(&config().checkpoint_url) - .await - .context(format!( - "failed to construct checkpoint backend for URL {}", - config().checkpoint_url - ))?, - )) -} - impl TableManager { pub async fn new( task_info: TaskInfoRef, @@ -320,7 +306,7 @@ impl TableManager { tables, writer, task_info, - storage, + storage: Arc::clone(storage), caches: HashMap::new(), }) } diff --git a/crates/arroyo-storage/src/aws.rs b/crates/arroyo-storage/src/aws.rs index 7b2ee6c6e..e5bba4d90 100644 --- a/crates/arroyo-storage/src/aws.rs +++ b/crates/arroyo-storage/src/aws.rs @@ -1,10 +1,9 @@ +use crate::StorageError; use aws_config::BehaviorVersion; use aws_credential_types::provider::ProvideCredentials; use object_store::{aws::AwsCredential, CredentialProvider}; use std::sync::Arc; -use crate::StorageError; - pub struct ArroyoCredentialProvider { provider: aws_credential_types::provider::SharedCredentialsProvider, } diff --git a/crates/arroyo-storage/src/lib.rs b/crates/arroyo-storage/src/lib.rs index da6e2db83..89d4f808a 100644 --- a/crates/arroyo-storage/src/lib.rs +++ b/crates/arroyo-storage/src/lib.rs @@ -16,12 +16,9 @@ use object_store::multipart::PartId; use object_store::path::Path; use object_store::{aws::AmazonS3Builder, local::LocalFileSystem, ObjectStore}; use object_store::{CredentialProvider, MultipartId}; -use once_cell::sync::Lazy; use regex::{Captures, Regex}; -use std::time::{Duration, Instant}; use thiserror::Error; -use tokio::sync::RwLock; -use tracing::{debug, error, trace}; +use tracing::{debug, error}; mod aws; @@ -301,18 +298,6 @@ pub async fn get_current_credentials() -> Result, StorageErro Ok(credentials) } -static OBJECT_STORE_CACHE: Lazy>>>> = - Lazy::new(Default::default); - -struct CacheEntry { - value: T, - inserted_at: Instant, -} - -// The bearer token should last for 3600 seconds, -// but regenerating it every 5 minutes to avoid token expiry -const GCS_CACHE_TTL: Duration = Duration::from_secs(5 * 60); - impl StorageProvider { pub async fn for_url(url: &str) -> Result { Self::for_url_with_options(url, HashMap::new()).await @@ -360,11 +345,6 @@ impl StorageProvider { Ok(key.clone()) } - pub async fn url_exists(url: &str) -> Result { - let provider = Self::for_url(url).await?; - provider.exists("").await - } - async fn construct_s3( mut config: S3Config, options: HashMap, @@ -443,45 +423,6 @@ impl StorageProvider { }) } - async fn get_or_create_object_store( - builder: GoogleCloudStorageBuilder, - bucket: &str, - ) -> Result, StorageError> { - let mut cache = OBJECT_STORE_CACHE.write().await; - - if let Some(entry) = cache.get(bucket) { - if entry.inserted_at.elapsed() < GCS_CACHE_TTL { - trace!( - "Cache hit - using cached object store for bucket {}", - bucket - ); - return Ok(entry.value.clone()); - } else { - debug!( - "Cache expired - constructing new object store for bucket {}", - bucket - ); - } - } else { - debug!( - "Cache miss - constructing new object store for bucket {}", - bucket - ); - } - - let new_store = Arc::new(builder.build().map_err(Into::::into)?); - - cache.insert( - bucket.to_string(), - CacheEntry { - value: new_store.clone(), - inserted_at: Instant::now(), - }, - ); - - Ok(new_store) - } - async fn construct_gcs(config: GCSConfig) -> Result { let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(&config.bucket); @@ -497,7 +438,7 @@ impl StorageProvider { let object_store_base_url = format!("https://{}.storage.googleapis.com", config.bucket); - let object_store = Self::get_or_create_object_store(builder, &config.bucket).await?; + let object_store = Arc::new(builder.build()?); Ok(Self { config: BackendConfig::GCS(config),