From 56f788be524be5d4854b5ff788a8980871e02819 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sat, 23 Mar 2024 21:24:30 -0400 Subject: [PATCH 1/8] Cache client state --- conjure-runtime-config/src/lib.rs | 10 +- conjure-runtime/Cargo.toml | 1 + conjure-runtime/src/builder.rs | 10 +- conjure-runtime/src/client.rs | 7 +- conjure-runtime/src/client_cache.rs | 149 ++++++++++++++++++++++++++ conjure-runtime/src/client_factory.rs | 19 +++- conjure-runtime/src/lib.rs | 1 + conjure-runtime/src/user_agent.rs | 4 +- 8 files changed, 184 insertions(+), 17 deletions(-) create mode 100644 conjure-runtime/src/client_cache.rs diff --git a/conjure-runtime-config/src/lib.rs b/conjure-runtime-config/src/lib.rs index a0e4ef77..33dcb856 100644 --- a/conjure-runtime-config/src/lib.rs +++ b/conjure-runtime-config/src/lib.rs @@ -209,7 +209,7 @@ impl ServiceConfig { } /// Security configuration used to communicate with a service. -#[derive(Debug, Clone, PartialEq, Default, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, Deserialize)] #[serde(rename_all = "kebab-case")] #[staged_builder] #[builder(update)] @@ -247,7 +247,7 @@ impl SecurityConfig { } /// Proxy configuration used to connect to a service. -#[derive(Debug, Clone, PartialEq, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] #[serde(rename_all = "kebab-case", tag = "type")] #[non_exhaustive] pub enum ProxyConfig { @@ -265,7 +265,7 @@ impl Default for ProxyConfig { } /// Configuration for an HTTP proxy. -#[derive(Debug, Clone, PartialEq, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] #[serde(rename_all = "kebab-case")] #[staged_builder] #[builder(update)] @@ -288,7 +288,7 @@ impl HttpProxyConfig { } /// A host and port identifier of a server. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct HostAndPort { host: String, port: u16, @@ -343,7 +343,7 @@ impl HostAndPort { } /// Credentials used to authenticate with an HTTP proxy. -#[derive(Debug, Clone, PartialEq, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] pub struct BasicCredentials { username: String, password: String, diff --git a/conjure-runtime/Cargo.toml b/conjure-runtime/Cargo.toml index 62a44f4e..c77dd708 100644 --- a/conjure-runtime/Cargo.toml +++ b/conjure-runtime/Cargo.toml @@ -29,6 +29,7 @@ hyper-rustls = { version = "0.24.0", default-features = false, features = [ "tls12", ] } hyper = { version = "0.14", features = ["http1", "http2", "client", "tcp"] } +linked-hash-map = "0.5" once_cell = "1.0" parking_lot = "0.12" percent-encoding = "2.1" diff --git a/conjure-runtime/src/builder.rs b/conjure-runtime/src/builder.rs index dfc70a35..8446b3f9 100644 --- a/conjure-runtime/src/builder.rs +++ b/conjure-runtime/src/builder.rs @@ -550,7 +550,7 @@ where } /// Specifies the beahavior of client-side sympathetic rate limiting. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[non_exhaustive] pub enum ClientQos { /// Enable client side rate limiting. @@ -568,7 +568,7 @@ pub enum ClientQos { /// Specifies the behavior of a client in response to a `QoS` error from a server. /// /// QoS errors have status codes 429 or 503. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[non_exhaustive] pub enum ServerQos { /// The client will automatically retry the request when possible in response to a QoS error. @@ -586,7 +586,7 @@ pub enum ServerQos { /// Specifies the behavior of the client in response to a service error from a server. /// /// Service errors are encoded as responses with a 4xx or 5xx response code and a body containing a `SerializableError`. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[non_exhaustive] pub enum ServiceError { /// The service error will be propagated as a new internal service error. @@ -605,7 +605,7 @@ pub enum ServiceError { } /// Specifies the manner in which the client decides if a request is idempotent or not. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[non_exhaustive] pub enum Idempotency { /// All requests are assumed to be idempotent. @@ -622,7 +622,7 @@ pub enum Idempotency { } /// Specifies the strategy used to select a node of a service to use for a request attempt. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[non_exhaustive] pub enum NodeSelectionStrategy { /// Pin to a single host as long as it continues to successfully respond to requests. diff --git a/conjure-runtime/src/client.rs b/conjure-runtime/src/client.rs index 9c1df2bf..2740b76b 100644 --- a/conjure-runtime/src/client.rs +++ b/conjure-runtime/src/client.rs @@ -1,3 +1,4 @@ +use crate::client_cache::CacheEvictor; // Copyright 2020 Palantir Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -65,6 +66,7 @@ pub(crate) type BaseBody = WaitForSpansBody>; pub(crate) struct ClientState { service: BaseService, + pub(crate) evictor: Option, } impl ClientState { @@ -93,7 +95,10 @@ impl ClientState { .layer(MapErrorLayer) .service(client); - Ok(ClientState { service }) + Ok(ClientState { + service, + evictor: None, + }) } } diff --git a/conjure-runtime/src/client_cache.rs b/conjure-runtime/src/client_cache.rs new file mode 100644 index 00000000..1fea493a --- /dev/null +++ b/conjure-runtime/src/client_cache.rs @@ -0,0 +1,149 @@ +// Copyright 2024 Palantir Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + sync::{Arc, Weak}, + time::Duration, +}; + +use conjure_error::Error; +use conjure_runtime_config::{ProxyConfig, SecurityConfig}; +use linked_hash_map::LinkedHashMap; +use parking_lot::Mutex; +use url::Url; + +use crate::{ + raw::DefaultRawClient, Builder, ClientQos, ClientState, Idempotency, NodeSelectionStrategy, + ServerQos, ServiceError, UserAgent, +}; + +const MAX_CACHED_CHANNELS: usize = 1_000; + +struct CachedState { + state: Weak>, + id: usize, +} + +struct Inner { + cache: LinkedHashMap, CachedState>, + next_id: usize, +} + +#[derive(Clone)] +pub struct ClientCache { + inner: Arc>, +} + +impl ClientCache { + pub fn new() -> Self { + ClientCache { + inner: Arc::new(Mutex::new(Inner { + cache: LinkedHashMap::new(), + next_id: 0, + })), + } + } + + pub fn get(&self, builder: &Builder) -> Result>, Error> { + let key = Arc::new(CacheKey { + service: builder.get_service().to_string(), + user_agent: builder.get_user_agent().clone(), + uris: builder.get_uris().to_vec(), + security: builder.get_security().clone(), + proxy: builder.get_proxy().clone(), + connect_timeout: builder.get_connect_timeout(), + read_timeout: builder.get_read_timeout(), + write_timeout: builder.get_write_timeout(), + backoff_slot_size: builder.get_backoff_slot_size(), + max_num_retries: builder.get_max_num_retries(), + client_qos: builder.get_client_qos(), + server_qos: builder.get_server_qos(), + service_error: builder.get_service_error(), + idempotency: builder.get_idempotency(), + node_selection_strategy: builder.get_node_selection_strategy(), + rng_seed: builder.get_rng_seed(), + }); + + let mut inner = self.inner.lock(); + if let Some(state) = inner + .cache + .get_refresh(&key) + .and_then(|w| w.state.upgrade()) + { + return Ok(state.clone()); + } + + let mut state = ClientState::new(builder)?; + let id = inner.next_id; + inner.next_id += 1; + state.evictor = Some(CacheEvictor { + inner: Arc::downgrade(&self.inner), + key: key.clone(), + id, + }); + let state = Arc::new(state); + let cached_state = CachedState { + state: Arc::downgrade(&state), + id, + }; + inner.cache.insert(key, cached_state); + + while inner.cache.len() > MAX_CACHED_CHANNELS { + inner.cache.pop_front(); + } + + Ok(state) + } +} + +#[derive(Clone, PartialEq, Eq, Hash)] +struct CacheKey { + service: String, + user_agent: UserAgent, + uris: Vec, + security: SecurityConfig, + proxy: ProxyConfig, + connect_timeout: Duration, + read_timeout: Duration, + write_timeout: Duration, + backoff_slot_size: Duration, + max_num_retries: u32, + client_qos: ClientQos, + server_qos: ServerQos, + service_error: ServiceError, + idempotency: Idempotency, + node_selection_strategy: NodeSelectionStrategy, + rng_seed: Option, +} + +pub struct CacheEvictor { + inner: Weak>, + key: Arc, + id: usize, +} + +impl Drop for CacheEvictor { + fn drop(&mut self) { + let Some(inner) = self.inner.upgrade() else { + return; + }; + let mut inner = inner.lock(); + + if let Some(cached_state) = inner.cache.get(&self.key) { + if cached_state.id == self.id { + inner.cache.remove(&self.key); + } + } + } +} diff --git a/conjure-runtime/src/client_factory.rs b/conjure-runtime/src/client_factory.rs index cc27ab98..67faf7f7 100644 --- a/conjure-runtime/src/client_factory.rs +++ b/conjure-runtime/src/client_factory.rs @@ -13,7 +13,7 @@ // limitations under the License. //! The client factory. use crate::blocking; -use crate::client::ClientState; +use crate::client_cache::ClientCache; use crate::config::{ServiceConfig, ServicesConfig}; use crate::{ Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos, @@ -52,6 +52,7 @@ pub struct Complete { idempotency: Idempotency, node_selection_strategy: NodeSelectionStrategy, blocking_handle: Option, + cache: ClientCache, } impl Default for ClientFactory { @@ -95,11 +96,17 @@ impl ClientFactory { idempotency: Idempotency::ByMethod, node_selection_strategy: NodeSelectionStrategy::PinUntilError, blocking_handle: None, + cache: ClientCache::new(), }) } } impl ClientFactory { + // Some state can't be tracked in the cache key, so we instead swap to a new cache. + fn swap_cache(&mut self) { + self.0.cache = ClientCache::new(); + } + /// Sets the user agent sent by clients. #[inline] pub fn user_agent(mut self, user_agent: UserAgent) -> Self { @@ -199,6 +206,7 @@ impl ClientFactory { #[inline] pub fn metrics(mut self, metrics: Arc) -> Self { self.0.metrics = Some(metrics); + self.swap_cache(); self } @@ -214,6 +222,7 @@ impl ClientFactory { #[inline] pub fn host_metrics(mut self, host_metrics: Arc) -> Self { self.0.host_metrics = Some(host_metrics); + self.swap_cache(); self } @@ -231,6 +240,7 @@ impl ClientFactory { #[inline] pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self { self.0.blocking_handle = Some(blocking_handle); + self.swap_cache(); self } @@ -275,6 +285,7 @@ impl ClientFactory { let service_error = self.0.service_error; let idempotency = self.0.idempotency; let node_selection_strategy = self.0.node_selection_strategy; + let cache = self.0.cache.clone(); let make_state = move |config: &ServiceConfig| { let mut builder = Client::builder() @@ -295,17 +306,17 @@ impl ClientFactory { builder = builder.host_metrics(host_metrics); } - ClientState::new(&builder) + cache.get(&builder) }; let state = make_state(&service_config.get())?; - let state = Arc::new(ArcSwap::new(Arc::new(state))); + let state = Arc::new(ArcSwap::new(state)); let subscription = service_config.subscribe({ let state = state.clone(); move |config| { let new_state = make_state(config)?; - state.store(Arc::new(new_state)); + state.store(new_state); Ok(()) } })?; diff --git a/conjure-runtime/src/lib.rs b/conjure-runtime/src/lib.rs index 60db39e2..25ecd51d 100644 --- a/conjure-runtime/src/lib.rs +++ b/conjure-runtime/src/lib.rs @@ -204,6 +204,7 @@ pub mod blocking; mod body; pub mod builder; mod client; +mod client_cache; pub mod client_factory; pub mod errors; mod host_metrics; diff --git a/conjure-runtime/src/user_agent.rs b/conjure-runtime/src/user_agent.rs index 558ff86d..e9aee9f2 100644 --- a/conjure-runtime/src/user_agent.rs +++ b/conjure-runtime/src/user_agent.rs @@ -24,7 +24,7 @@ static VALID_VERSION: Lazy = const DEFAULT_VERSION: &str = "0.0.0"; /// A representation of an HTTP `User-Agent` header value. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct UserAgent { node_id: Option, primary: Agent, @@ -91,7 +91,7 @@ impl UserAgent { } /// A component of a [`UserAgent`]. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Agent { name: String, version: String, From 9739c5b78ce8d4158ff1dd3fc33f962f68b436fc Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Thu, 28 Mar 2024 15:30:10 +0000 Subject: [PATCH 2/8] Add generated changelog entries --- changelog/@unreleased/pr-198.v2.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/@unreleased/pr-198.v2.yml diff --git a/changelog/@unreleased/pr-198.v2.yml b/changelog/@unreleased/pr-198.v2.yml new file mode 100644 index 00000000..b64e6d9c --- /dev/null +++ b/changelog/@unreleased/pr-198.v2.yml @@ -0,0 +1,5 @@ +type: feature +feature: + description: Client state is now cached and shared between clients. + links: + - https://github.com/palantir/conjure-rust-runtime/pull/198 From 680b66a617b23f9f0d5a5be1010eb18e58338ee9 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sat, 20 Apr 2024 19:12:08 -0400 Subject: [PATCH 3/8] Explicitly partition cachable and uncachable config --- conjure-runtime/src/builder.rs | 184 +++++++++++++------------- conjure-runtime/src/client_cache.rs | 55 +------- conjure-runtime/src/client_factory.rs | 66 +++++---- 3 files changed, 140 insertions(+), 165 deletions(-) diff --git a/conjure-runtime/src/builder.rs b/conjure-runtime/src/builder.rs index 8446b3f9..f388ac71 100644 --- a/conjure-runtime/src/builder.rs +++ b/conjure-runtime/src/builder.rs @@ -39,8 +39,8 @@ pub struct UserAgentStage { service: String, } -/// The complete builder stage. -pub struct Complete { +#[derive(Clone, PartialEq, Eq, Hash)] +pub(crate) struct CachedConfig { service: String, user_agent: UserAgent, uris: Vec, @@ -56,11 +56,21 @@ pub struct Complete { service_error: ServiceError, idempotency: Idempotency, node_selection_strategy: NodeSelectionStrategy, - metrics: Option>, - host_metrics: Option>, rng_seed: Option, - blocking_handle: Option, - raw_client_builder: T, +} + +#[derive(Clone)] +pub(crate) struct UncachedConfig { + pub(crate) metrics: Option>, + pub(crate) host_metrics: Option>, + pub(crate) blocking_handle: Option, + pub(crate) raw_client_builder: T, +} + +/// The complete builder stage. +pub struct Complete { + cached: CachedConfig, + uncached: UncachedConfig, } impl Default for Builder { @@ -93,26 +103,30 @@ impl Builder { #[inline] pub fn user_agent(self, user_agent: UserAgent) -> Builder { Builder(Complete { - service: self.0.service, - user_agent, - uris: vec![], - security: SecurityConfig::builder().build(), - proxy: ProxyConfig::Direct, - connect_timeout: Duration::from_secs(10), - read_timeout: Duration::from_secs(5 * 60), - write_timeout: Duration::from_secs(5 * 60), - backoff_slot_size: Duration::from_millis(250), - max_num_retries: 4, - client_qos: ClientQos::Enabled, - server_qos: ServerQos::AutomaticRetry, - service_error: ServiceError::WrapInNewError, - idempotency: Idempotency::ByMethod, - node_selection_strategy: NodeSelectionStrategy::PinUntilError, - metrics: None, - host_metrics: None, - rng_seed: None, - blocking_handle: None, - raw_client_builder: DefaultRawClientBuilder, + cached: CachedConfig { + service: self.0.service, + user_agent, + uris: vec![], + security: SecurityConfig::builder().build(), + proxy: ProxyConfig::Direct, + connect_timeout: Duration::from_secs(10), + read_timeout: Duration::from_secs(5 * 60), + write_timeout: Duration::from_secs(5 * 60), + backoff_slot_size: Duration::from_millis(250), + max_num_retries: 4, + client_qos: ClientQos::Enabled, + server_qos: ServerQos::AutomaticRetry, + service_error: ServiceError::WrapInNewError, + idempotency: Idempotency::ByMethod, + node_selection_strategy: NodeSelectionStrategy::PinUntilError, + rng_seed: None, + }, + uncached: UncachedConfig { + metrics: None, + host_metrics: None, + blocking_handle: None, + raw_client_builder: DefaultRawClientBuilder, + }, }) } } @@ -129,6 +143,10 @@ impl Builder { } impl Builder> { + pub(crate) fn cached_config(&self) -> &CachedConfig { + &self.0.cached + } + /// Applies configuration settings from a `ServiceConfig` to the builder. #[inline] pub fn from_config(mut self, config: &ServiceConfig) -> Self { @@ -168,13 +186,13 @@ impl Builder> { /// Returns the builder's configured service name. #[inline] pub fn get_service(&self) -> &str { - &self.0.service + &self.0.cached.service } /// Returns the builder's configured user agent. #[inline] pub fn get_user_agent(&self) -> &UserAgent { - &self.0.user_agent + &self.0.cached.user_agent } /// Appends a URI to the URIs list. @@ -182,7 +200,7 @@ impl Builder> { /// Defaults to an empty list. #[inline] pub fn uri(mut self, uri: Url) -> Self { - self.0.uris.push(uri); + self.0.cached.uris.push(uri); self } @@ -191,14 +209,14 @@ impl Builder> { /// Defaults to an empty list. #[inline] pub fn uris(mut self, uris: Vec) -> Self { - self.0.uris = uris; + self.0.cached.uris = uris; self } /// Returns the builder's configured URIs list. #[inline] pub fn get_uris(&self) -> &[Url] { - &self.0.uris + &self.0.cached.uris } /// Sets the security configuration. @@ -206,14 +224,14 @@ impl Builder> { /// Defaults to an empty configuration. #[inline] pub fn security(mut self, security: SecurityConfig) -> Self { - self.0.security = security; + self.0.cached.security = security; self } /// Returns the builder's configured security configuration. #[inline] pub fn get_security(&self) -> &SecurityConfig { - &self.0.security + &self.0.cached.security } /// Sets the proxy configuration. @@ -221,14 +239,14 @@ impl Builder> { /// Defaults to `ProxyConfig::Direct` (i.e. no proxy). #[inline] pub fn proxy(mut self, proxy: ProxyConfig) -> Self { - self.0.proxy = proxy; + self.0.cached.proxy = proxy; self } /// Returns the builder's configured proxy configuration. #[inline] pub fn get_proxy(&self) -> &ProxyConfig { - &self.0.proxy + &self.0.cached.proxy } /// Sets the connect timeout. @@ -236,14 +254,14 @@ impl Builder> { /// Defaults to 10 seconds. #[inline] pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self { - self.0.connect_timeout = connect_timeout; + self.0.cached.connect_timeout = connect_timeout; self } /// Returns the builder's configured connect timeout. #[inline] pub fn get_connect_timeout(&self) -> Duration { - self.0.connect_timeout + self.0.cached.connect_timeout } /// Sets the read timeout. @@ -253,14 +271,14 @@ impl Builder> { /// Defaults to 5 minutes. #[inline] pub fn read_timeout(mut self, read_timeout: Duration) -> Self { - self.0.read_timeout = read_timeout; + self.0.cached.read_timeout = read_timeout; self } /// Returns the builder's configured read timeout. #[inline] pub fn get_read_timeout(&self) -> Duration { - self.0.read_timeout + self.0.cached.read_timeout } /// Sets the write timeout. @@ -270,14 +288,14 @@ impl Builder> { /// Defaults to 5 minutes. #[inline] pub fn write_timeout(mut self, write_timeout: Duration) -> Self { - self.0.write_timeout = write_timeout; + self.0.cached.write_timeout = write_timeout; self } /// Returns the builder's configured write timeout. #[inline] pub fn get_write_timeout(&self) -> Duration { - self.0.write_timeout + self.0.cached.write_timeout } /// Sets the backoff slot size. @@ -288,14 +306,14 @@ impl Builder> { /// Defaults to 250 milliseconds. #[inline] pub fn backoff_slot_size(mut self, backoff_slot_size: Duration) -> Self { - self.0.backoff_slot_size = backoff_slot_size; + self.0.cached.backoff_slot_size = backoff_slot_size; self } /// Returns the builder's configured backoff slot size. #[inline] pub fn get_backoff_slot_size(&self) -> Duration { - self.0.backoff_slot_size + self.0.cached.backoff_slot_size } /// Sets the maximum number of times a request attempt will be retried before giving up. @@ -303,14 +321,14 @@ impl Builder> { /// Defaults to 4. #[inline] pub fn max_num_retries(mut self, max_num_retries: u32) -> Self { - self.0.max_num_retries = max_num_retries; + self.0.cached.max_num_retries = max_num_retries; self } /// Returns the builder's configured maximum number of retries. #[inline] pub fn get_max_num_retries(&self) -> u32 { - self.0.max_num_retries + self.0.cached.max_num_retries } /// Sets the client's internal rate limiting behavior. @@ -318,14 +336,14 @@ impl Builder> { /// Defaults to `ClientQos::Enabled`. #[inline] pub fn client_qos(mut self, client_qos: ClientQos) -> Self { - self.0.client_qos = client_qos; + self.0.cached.client_qos = client_qos; self } /// Returns the builder's configured internal rate limiting behavior. #[inline] pub fn get_client_qos(&self) -> ClientQos { - self.0.client_qos + self.0.cached.client_qos } /// Sets the client's behavior in response to a QoS error from the server. @@ -333,14 +351,14 @@ impl Builder> { /// Defaults to `ServerQos::AutomaticRetry`. #[inline] pub fn server_qos(mut self, server_qos: ServerQos) -> Self { - self.0.server_qos = server_qos; + self.0.cached.server_qos = server_qos; self } /// Returns the builder's configured server QoS behavior. #[inline] pub fn get_server_qos(&self) -> ServerQos { - self.0.server_qos + self.0.cached.server_qos } /// Sets the client's behavior in response to a service error from the server. @@ -348,14 +366,14 @@ impl Builder> { /// Defaults to `ServiceError::WrapInNewError`. #[inline] pub fn service_error(mut self, service_error: ServiceError) -> Self { - self.0.service_error = service_error; + self.0.cached.service_error = service_error; self } /// Returns the builder's configured service error handling behavior. #[inline] pub fn get_service_error(&self) -> ServiceError { - self.0.service_error + self.0.cached.service_error } /// Sets the client's behavior to determine if a request is idempotent or not. @@ -365,14 +383,14 @@ impl Builder> { /// Defaults to `Idempotency::ByMethod`. #[inline] pub fn idempotency(mut self, idempotency: Idempotency) -> Self { - self.0.idempotency = idempotency; + self.0.cached.idempotency = idempotency; self } /// Returns the builder's configured idempotency handling behavior. #[inline] pub fn get_idempotency(&self) -> Idempotency { - self.0.idempotency + self.0.cached.idempotency } /// Sets the client's strategy for selecting a node for a request. @@ -383,14 +401,14 @@ impl Builder> { mut self, node_selection_strategy: NodeSelectionStrategy, ) -> Self { - self.0.node_selection_strategy = node_selection_strategy; + self.0.cached.node_selection_strategy = node_selection_strategy; self } /// Returns the builder's configured node selection strategy. #[inline] pub fn get_node_selection_strategy(&self) -> NodeSelectionStrategy { - self.0.node_selection_strategy + self.0.cached.node_selection_strategy } /// Sets the metric registry used to register client metrics. @@ -398,14 +416,14 @@ impl Builder> { /// Defaults to no registry. #[inline] pub fn metrics(mut self, metrics: Arc) -> Self { - self.0.metrics = Some(metrics); + self.0.uncached.metrics = Some(metrics); self } /// Returns the builder's configured metric registry. #[inline] pub fn get_metrics(&self) -> Option<&Arc> { - self.0.metrics.as_ref() + self.0.uncached.metrics.as_ref() } /// Sets the host metrics registry used to track host performance. @@ -413,14 +431,14 @@ impl Builder> { /// Defaults to no registry. #[inline] pub fn host_metrics(mut self, host_metrics: Arc) -> Self { - self.0.host_metrics = Some(host_metrics); + self.0.uncached.host_metrics = Some(host_metrics); self } /// Returns the builder's configured host metrics registry. #[inline] pub fn get_host_metrics(&self) -> Option<&Arc> { - self.0.host_metrics.as_ref() + self.0.uncached.host_metrics.as_ref() } /// Sets a seed used to initialize the client's random number generators. @@ -432,14 +450,14 @@ impl Builder> { /// Defaults to no seed. #[inline] pub fn rng_seed(mut self, rng_seed: u64) -> Self { - self.0.rng_seed = Some(rng_seed); + self.0.cached.rng_seed = Some(rng_seed); self } /// Returns the builder's configured RNG seed. #[inline] pub fn get_rng_seed(&self) -> Option { - self.0.rng_seed + self.0.cached.rng_seed } /// Returns the `Handle` to the tokio `Runtime` to be used by blocking clients. @@ -449,14 +467,14 @@ impl Builder> { /// Defaults to a `conjure-runtime` internal `Runtime`. #[inline] pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self { - self.0.blocking_handle = Some(blocking_handle); + self.0.uncached.blocking_handle = Some(blocking_handle); self } /// Returns the builder's configured blocking handle. #[inline] pub fn get_blocking_handle(&self) -> Option<&Handle> { - self.0.blocking_handle.as_ref() + self.0.uncached.blocking_handle.as_ref() } /// Sets the raw client builder. @@ -465,37 +483,25 @@ impl Builder> { #[inline] pub fn raw_client_builder(self, raw_client_builder: U) -> Builder> { Builder(Complete { - service: self.0.service, - user_agent: self.0.user_agent, - uris: self.0.uris, - security: self.0.security, - proxy: self.0.proxy, - connect_timeout: self.0.connect_timeout, - read_timeout: self.0.read_timeout, - write_timeout: self.0.write_timeout, - backoff_slot_size: self.0.backoff_slot_size, - max_num_retries: self.0.max_num_retries, - client_qos: self.0.client_qos, - server_qos: self.0.server_qos, - service_error: self.0.service_error, - idempotency: self.0.idempotency, - node_selection_strategy: self.0.node_selection_strategy, - metrics: self.0.metrics, - host_metrics: self.0.host_metrics, - rng_seed: self.0.rng_seed, - blocking_handle: self.0.blocking_handle, - raw_client_builder, + cached: self.0.cached, + uncached: UncachedConfig { + metrics: self.0.uncached.metrics, + host_metrics: self.0.uncached.host_metrics, + blocking_handle: self.0.uncached.blocking_handle, + raw_client_builder, + }, }) } /// Returns the builder's configured raw client builder. #[inline] pub fn get_raw_client_builder(&self) -> &T { - &self.0.raw_client_builder + &self.0.uncached.raw_client_builder } pub(crate) fn mesh_mode(&self) -> bool { self.0 + .cached .uris .iter() .any(|uri| uri.scheme().starts_with(MESH_PREFIX)) @@ -503,12 +509,12 @@ impl Builder> { pub(crate) fn postprocessed_uris(&self) -> Result, Error> { if self.mesh_mode() { - if self.0.uris.len() != 1 { + if self.0.cached.uris.len() != 1 { return Err(Error::internal_safe("mesh mode expects exactly one URI") - .with_safe_param("uris", &self.0.uris)); + .with_safe_param("uris", &self.0.cached.uris)); } - let uri = self.0.uris[0] + let uri = self.0.cached.uris[0] .as_str() .strip_prefix(MESH_PREFIX) .unwrap() @@ -517,7 +523,7 @@ impl Builder> { Ok(Cow::Owned(vec![uri])) } else { - Ok(Cow::Borrowed(&self.0.uris)) + Ok(Cow::Borrowed(&self.0.cached.uris)) } } } @@ -544,7 +550,7 @@ where pub fn build_blocking(&self) -> Result, Error> { self.build().map(|client| blocking::Client { client, - handle: self.0.blocking_handle.clone(), + handle: self.0.uncached.blocking_handle.clone(), }) } } diff --git a/conjure-runtime/src/client_cache.rs b/conjure-runtime/src/client_cache.rs index 1fea493a..ae269f78 100644 --- a/conjure-runtime/src/client_cache.rs +++ b/conjure-runtime/src/client_cache.rs @@ -12,21 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ - sync::{Arc, Weak}, - time::Duration, -}; +use std::sync::{Arc, Weak}; use conjure_error::Error; -use conjure_runtime_config::{ProxyConfig, SecurityConfig}; use linked_hash_map::LinkedHashMap; use parking_lot::Mutex; -use url::Url; -use crate::{ - raw::DefaultRawClient, Builder, ClientQos, ClientState, Idempotency, NodeSelectionStrategy, - ServerQos, ServiceError, UserAgent, -}; +use crate::{builder::CachedConfig, raw::DefaultRawClient, Builder, ClientState}; const MAX_CACHED_CHANNELS: usize = 1_000; @@ -36,7 +28,7 @@ struct CachedState { } struct Inner { - cache: LinkedHashMap, CachedState>, + cache: LinkedHashMap, CachedState>, next_id: usize, } @@ -56,24 +48,7 @@ impl ClientCache { } pub fn get(&self, builder: &Builder) -> Result>, Error> { - let key = Arc::new(CacheKey { - service: builder.get_service().to_string(), - user_agent: builder.get_user_agent().clone(), - uris: builder.get_uris().to_vec(), - security: builder.get_security().clone(), - proxy: builder.get_proxy().clone(), - connect_timeout: builder.get_connect_timeout(), - read_timeout: builder.get_read_timeout(), - write_timeout: builder.get_write_timeout(), - backoff_slot_size: builder.get_backoff_slot_size(), - max_num_retries: builder.get_max_num_retries(), - client_qos: builder.get_client_qos(), - server_qos: builder.get_server_qos(), - service_error: builder.get_service_error(), - idempotency: builder.get_idempotency(), - node_selection_strategy: builder.get_node_selection_strategy(), - rng_seed: builder.get_rng_seed(), - }); + let key = Arc::new(builder.cached_config().clone()); let mut inner = self.inner.lock(); if let Some(state) = inner @@ -107,29 +82,9 @@ impl ClientCache { } } -#[derive(Clone, PartialEq, Eq, Hash)] -struct CacheKey { - service: String, - user_agent: UserAgent, - uris: Vec, - security: SecurityConfig, - proxy: ProxyConfig, - connect_timeout: Duration, - read_timeout: Duration, - write_timeout: Duration, - backoff_slot_size: Duration, - max_num_retries: u32, - client_qos: ClientQos, - server_qos: ServerQos, - service_error: ServiceError, - idempotency: Idempotency, - node_selection_strategy: NodeSelectionStrategy, - rng_seed: Option, -} - pub struct CacheEvictor { inner: Weak>, - key: Arc, + key: Arc, id: usize, } diff --git a/conjure-runtime/src/client_factory.rs b/conjure-runtime/src/client_factory.rs index 67faf7f7..9aae22c9 100644 --- a/conjure-runtime/src/client_factory.rs +++ b/conjure-runtime/src/client_factory.rs @@ -13,8 +13,10 @@ // limitations under the License. //! The client factory. use crate::blocking; +use crate::builder::UncachedConfig; use crate::client_cache::ClientCache; use crate::config::{ServiceConfig, ServicesConfig}; +use crate::raw::DefaultRawClientBuilder; use crate::{ Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos, ServiceError, UserAgent, @@ -39,20 +41,34 @@ pub struct UserAgentStage { config: Arc>, } +#[derive(Clone)] +struct CacheManager { + uncached_inner: UncachedConfig, + cache: ClientCache, +} + +impl CacheManager { + fn uncached(&self) -> &UncachedConfig { + &self.uncached_inner + } + + fn uncached_mut(&mut self) -> &mut UncachedConfig { + self.cache = ClientCache::new(); + &mut self.uncached_inner + } +} + /// The complete builder stage. #[derive(Clone)] pub struct Complete { config: Arc>, user_agent: UserAgent, - metrics: Option>, - host_metrics: Option>, client_qos: ClientQos, server_qos: ServerQos, service_error: ServiceError, idempotency: Idempotency, node_selection_strategy: NodeSelectionStrategy, - blocking_handle: Option, - cache: ClientCache, + cache_manager: CacheManager, } impl Default for ClientFactory { @@ -88,25 +104,26 @@ impl ClientFactory { ClientFactory(Complete { config: self.0.config, user_agent, - metrics: None, - host_metrics: None, client_qos: ClientQos::Enabled, server_qos: ServerQos::AutomaticRetry, service_error: ServiceError::WrapInNewError, idempotency: Idempotency::ByMethod, node_selection_strategy: NodeSelectionStrategy::PinUntilError, - blocking_handle: None, - cache: ClientCache::new(), + cache_manager: CacheManager { + uncached_inner: UncachedConfig { + metrics: None, + host_metrics: None, + + blocking_handle: None, + raw_client_builder: DefaultRawClientBuilder, + }, + cache: ClientCache::new(), + }, }) } } impl ClientFactory { - // Some state can't be tracked in the cache key, so we instead swap to a new cache. - fn swap_cache(&mut self) { - self.0.cache = ClientCache::new(); - } - /// Sets the user agent sent by clients. #[inline] pub fn user_agent(mut self, user_agent: UserAgent) -> Self { @@ -205,15 +222,14 @@ impl ClientFactory { /// Defaults to no registry. #[inline] pub fn metrics(mut self, metrics: Arc) -> Self { - self.0.metrics = Some(metrics); - self.swap_cache(); + self.0.cache_manager.uncached_mut().metrics = Some(metrics); self } /// Returns the configured metrics registry. #[inline] pub fn get_metrics(&self) -> Option<&Arc> { - self.0.metrics.as_ref() + self.0.cache_manager.uncached().metrics.as_ref() } /// Sets the host metrics registry used to track host performance. @@ -221,15 +237,14 @@ impl ClientFactory { /// Defaults to no registry. #[inline] pub fn host_metrics(mut self, host_metrics: Arc) -> Self { - self.0.host_metrics = Some(host_metrics); - self.swap_cache(); + self.0.cache_manager.uncached_mut().host_metrics = Some(host_metrics); self } /// Returns the configured host metrics registry. #[inline] pub fn get_host_metrics(&self) -> Option<&Arc> { - self.0.host_metrics.as_ref() + self.0.cache_manager.uncached().host_metrics.as_ref() } /// Returns the `Handle` to the tokio `Runtime` to be used by blocking clients. @@ -239,15 +254,14 @@ impl ClientFactory { /// Defaults to a `conjure-runtime` internal `Runtime`. #[inline] pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self { - self.0.blocking_handle = Some(blocking_handle); - self.swap_cache(); + self.0.cache_manager.uncached_mut().blocking_handle = Some(blocking_handle); self } /// Returns the configured blocking handle. #[inline] pub fn get_blocking_handle(&self) -> Option<&Handle> { - self.0.blocking_handle.as_ref() + self.0.cache_manager.uncached().blocking_handle.as_ref() } /// Creates a new client for the specified service. @@ -278,14 +292,14 @@ impl ClientFactory { let service = service.to_string(); let user_agent = self.0.user_agent.clone(); - let metrics = self.0.metrics.clone(); - let host_metrics = self.0.host_metrics.clone(); + let metrics = self.0.cache_manager.uncached().metrics.clone(); + let host_metrics = self.0.cache_manager.uncached().host_metrics.clone(); let client_qos = self.0.client_qos; let server_qos = self.0.server_qos; let service_error = self.0.service_error; let idempotency = self.0.idempotency; let node_selection_strategy = self.0.node_selection_strategy; - let cache = self.0.cache.clone(); + let cache = self.0.cache_manager.cache.clone(); let make_state = move |config: &ServiceConfig| { let mut builder = Client::builder() @@ -347,7 +361,7 @@ impl ClientFactory { fn blocking_client_inner(&self, service: &str) -> Result { self.client_inner(service).map(|client| blocking::Client { client, - handle: self.0.blocking_handle.clone(), + handle: self.0.cache_manager.uncached().blocking_handle.clone(), }) } } From e4871e440d679a9fde451f685479a93c3c29e1f0 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sat, 20 Apr 2024 19:22:34 -0400 Subject: [PATCH 4/8] avoid unnecessary clones --- conjure-runtime/src/client_cache.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/conjure-runtime/src/client_cache.rs b/conjure-runtime/src/client_cache.rs index ae269f78..84630a27 100644 --- a/conjure-runtime/src/client_cache.rs +++ b/conjure-runtime/src/client_cache.rs @@ -48,17 +48,14 @@ impl ClientCache { } pub fn get(&self, builder: &Builder) -> Result>, Error> { - let key = Arc::new(builder.cached_config().clone()); + let key = builder.cached_config(); let mut inner = self.inner.lock(); - if let Some(state) = inner - .cache - .get_refresh(&key) - .and_then(|w| w.state.upgrade()) - { + if let Some(state) = inner.cache.get_refresh(key).and_then(|w| w.state.upgrade()) { return Ok(state.clone()); } + let key = Arc::new(key.clone()); let mut state = ClientState::new(builder)?; let id = inner.next_id; inner.next_id += 1; From 168bfe19d27092b0f25f9d0c9b144aa2c1d6d3fa Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sat, 20 Apr 2024 20:20:25 -0400 Subject: [PATCH 5/8] parameterize the cache --- conjure-runtime/src/builder.rs | 6 +- conjure-runtime/src/client.rs | 13 +-- conjure-runtime/src/client_cache.rs | 160 ++++++++++++++++++-------- conjure-runtime/src/client_factory.rs | 10 +- 4 files changed, 130 insertions(+), 59 deletions(-) diff --git a/conjure-runtime/src/builder.rs b/conjure-runtime/src/builder.rs index f388ac71..e235ac77 100644 --- a/conjure-runtime/src/builder.rs +++ b/conjure-runtime/src/builder.rs @@ -14,6 +14,7 @@ //! The client builder. use crate::blocking; use crate::client::ClientState; +use crate::client_cache::Cached; use crate::config::{ProxyConfig, SecurityConfig, ServiceConfig}; use crate::raw::{BuildRawClient, DefaultRawClientBuilder}; use crate::{Client, HostMetricsRegistry, UserAgent}; @@ -539,7 +540,10 @@ where /// Panics if `service` or `user_agent` is not set. pub fn build(&self) -> Result, Error> { let state = ClientState::new(self)?; - Ok(Client::new(Arc::new(ArcSwap::new(Arc::new(state))), None)) + Ok(Client::new( + Arc::new(ArcSwap::new(Arc::new(Cached::uncached(state)))), + None, + )) } /// Creates a new `blocking::Client`. diff --git a/conjure-runtime/src/client.rs b/conjure-runtime/src/client.rs index 2740b76b..0ae00bbe 100644 --- a/conjure-runtime/src/client.rs +++ b/conjure-runtime/src/client.rs @@ -1,4 +1,3 @@ -use crate::client_cache::CacheEvictor; // Copyright 2020 Palantir Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,6 +11,8 @@ use crate::client_cache::CacheEvictor; // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use crate::builder::CachedConfig; +use crate::client_cache::Cached; use crate::raw::{BuildRawClient, DefaultRawClient, RawBody, Service}; use crate::service::gzip::{DecodedBody, GzipLayer}; use crate::service::http_error::HttpErrorLayer; @@ -66,7 +67,6 @@ pub(crate) type BaseBody = WaitForSpansBody>; pub(crate) struct ClientState { service: BaseService, - pub(crate) evictor: Option, } impl ClientState { @@ -95,10 +95,7 @@ impl ClientState { .layer(MapErrorLayer) .service(client); - Ok(ClientState { - service, - evictor: None, - }) + Ok(ClientState { service }) } } @@ -107,7 +104,7 @@ impl ClientState { /// It implements the Conjure `AsyncClient` trait, but also offers a "raw" request interface for use with services that /// don't provide Conjure service definitions. pub struct Client { - state: Arc>>, + state: Arc>>>, subscription: Option>>, } @@ -130,7 +127,7 @@ impl Client { impl Client { pub(crate) fn new( - state: Arc>>, + state: Arc>>>, subscription: Option>, ) -> Client { Client { diff --git a/conjure-runtime/src/client_cache.rs b/conjure-runtime/src/client_cache.rs index 84630a27..f648f594 100644 --- a/conjure-runtime/src/client_cache.rs +++ b/conjure-runtime/src/client_cache.rs @@ -12,32 +12,116 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, Weak}; +use std::{ + hash::Hash, + ops::Deref, + sync::{Arc, Weak}, +}; use conjure_error::Error; use linked_hash_map::LinkedHashMap; use parking_lot::Mutex; -use crate::{builder::CachedConfig, raw::DefaultRawClient, Builder, ClientState}; - const MAX_CACHED_CHANNELS: usize = 1_000; -struct CachedState { - state: Weak>, +pub struct Cached +where + K: Eq + Hash, +{ + value: V, + _evictor: Option>, +} + +impl Cached +where + K: Eq + Hash, +{ + pub fn uncached(value: V) -> Self { + Cached { + value, + _evictor: None, + } + } +} + +impl Deref for Cached +where + K: Eq + Hash, +{ + type Target = V; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.value + } +} + +pub struct CacheEvictor +where + K: Eq + Hash, +{ + inner: Weak>>, + key: Arc, id: usize, } -struct Inner { - cache: LinkedHashMap, CachedState>, +impl Drop for CacheEvictor +where + K: Eq + Hash, +{ + fn drop(&mut self) { + let Some(inner) = self.inner.upgrade() else { + return; + }; + let mut inner = inner.lock(); + + if let Some(cached_state) = inner.cache.get(&self.key) { + if cached_state.id == self.id { + inner.cache.remove(&self.key); + } + } + } +} + +struct CachedValue +where + K: Eq + Hash, +{ + value: Weak>, + id: usize, +} + +struct Inner +where + K: Eq + Hash, +{ + cache: LinkedHashMap, CachedValue>, next_id: usize, } -#[derive(Clone)] -pub struct ClientCache { - inner: Arc>, +pub struct ClientCache +where + K: Eq + Hash, +{ + inner: Arc>>, +} + +impl Clone for ClientCache +where + K: Eq + Hash, +{ + #[inline] + fn clone(&self) -> Self { + ClientCache { + inner: self.inner.clone(), + } + } } -impl ClientCache { +impl ClientCache +where + K: Eq + Hash + Clone, +{ pub fn new() -> Self { ClientCache { inner: Arc::new(Mutex::new(Inner { @@ -47,55 +131,41 @@ impl ClientCache { } } - pub fn get(&self, builder: &Builder) -> Result>, Error> { - let key = builder.cached_config(); + pub fn get( + &self, + builder: &T, + get_key: impl FnOnce(&T) -> &K, + make_value: impl FnOnce(&T) -> Result, + ) -> Result>, Error> { + let key = get_key(builder); let mut inner = self.inner.lock(); - if let Some(state) = inner.cache.get_refresh(key).and_then(|w| w.state.upgrade()) { + if let Some(state) = inner.cache.get_refresh(key).and_then(|w| w.value.upgrade()) { return Ok(state.clone()); } let key = Arc::new(key.clone()); - let mut state = ClientState::new(builder)?; + let value = make_value(builder)?; let id = inner.next_id; inner.next_id += 1; - state.evictor = Some(CacheEvictor { - inner: Arc::downgrade(&self.inner), - key: key.clone(), - id, + let value = Arc::new(Cached { + value, + _evictor: Some(CacheEvictor { + inner: Arc::downgrade(&self.inner), + key: key.clone(), + id, + }), }); - let state = Arc::new(state); - let cached_state = CachedState { - state: Arc::downgrade(&state), + let cached_value = CachedValue { + value: Arc::downgrade(&value), id, }; - inner.cache.insert(key, cached_state); + inner.cache.insert(key, cached_value); while inner.cache.len() > MAX_CACHED_CHANNELS { inner.cache.pop_front(); } - Ok(state) - } -} - -pub struct CacheEvictor { - inner: Weak>, - key: Arc, - id: usize, -} - -impl Drop for CacheEvictor { - fn drop(&mut self) { - let Some(inner) = self.inner.upgrade() else { - return; - }; - let mut inner = inner.lock(); - - if let Some(cached_state) = inner.cache.get(&self.key) { - if cached_state.id == self.id { - inner.cache.remove(&self.key); - } - } + Ok(value) } } diff --git a/conjure-runtime/src/client_factory.rs b/conjure-runtime/src/client_factory.rs index 9aae22c9..4b916dfe 100644 --- a/conjure-runtime/src/client_factory.rs +++ b/conjure-runtime/src/client_factory.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. //! The client factory. -use crate::blocking; -use crate::builder::UncachedConfig; +use crate::builder::{CachedConfig, UncachedConfig}; use crate::client_cache::ClientCache; use crate::config::{ServiceConfig, ServicesConfig}; -use crate::raw::DefaultRawClientBuilder; +use crate::raw::{DefaultRawClient, DefaultRawClientBuilder}; +use crate::{blocking, ClientState}; use crate::{ Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos, ServiceError, UserAgent, @@ -44,7 +44,7 @@ pub struct UserAgentStage { #[derive(Clone)] struct CacheManager { uncached_inner: UncachedConfig, - cache: ClientCache, + cache: ClientCache>, } impl CacheManager { @@ -320,7 +320,7 @@ impl ClientFactory { builder = builder.host_metrics(host_metrics); } - cache.get(&builder) + cache.get(&builder, |b| b.cached_config(), ClientState::new) }; let state = make_state(&service_config.get())?; From f434c93d5e7d2dd78c0f9027804b72202e33d963 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sun, 21 Apr 2024 10:46:02 -0400 Subject: [PATCH 6/8] rename to weak cache and test --- conjure-runtime/src/builder.rs | 2 +- conjure-runtime/src/client.rs | 2 +- conjure-runtime/src/client_factory.rs | 14 ++-- conjure-runtime/src/lib.rs | 2 +- .../src/{client_cache.rs => weak_cache.rs} | 67 +++++++++++++++---- 5 files changed, 66 insertions(+), 21 deletions(-) rename conjure-runtime/src/{client_cache.rs => weak_cache.rs} (67%) diff --git a/conjure-runtime/src/builder.rs b/conjure-runtime/src/builder.rs index e235ac77..7ffaa305 100644 --- a/conjure-runtime/src/builder.rs +++ b/conjure-runtime/src/builder.rs @@ -14,9 +14,9 @@ //! The client builder. use crate::blocking; use crate::client::ClientState; -use crate::client_cache::Cached; use crate::config::{ProxyConfig, SecurityConfig, ServiceConfig}; use crate::raw::{BuildRawClient, DefaultRawClientBuilder}; +use crate::weak_cache::Cached; use crate::{Client, HostMetricsRegistry, UserAgent}; use arc_swap::ArcSwap; use conjure_error::Error; diff --git a/conjure-runtime/src/client.rs b/conjure-runtime/src/client.rs index 0ae00bbe..743b5fea 100644 --- a/conjure-runtime/src/client.rs +++ b/conjure-runtime/src/client.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. use crate::builder::CachedConfig; -use crate::client_cache::Cached; use crate::raw::{BuildRawClient, DefaultRawClient, RawBody, Service}; use crate::service::gzip::{DecodedBody, GzipLayer}; use crate::service::http_error::HttpErrorLayer; @@ -27,6 +26,7 @@ use crate::service::trace_propagation::TracePropagationLayer; use crate::service::user_agent::UserAgentLayer; use crate::service::wait_for_spans::{WaitForSpansBody, WaitForSpansLayer}; use crate::service::{Identity, Layer, ServiceBuilder, Stack}; +use crate::weak_cache::Cached; use crate::{builder, BodyWriter, Builder, ResponseBody}; use arc_swap::ArcSwap; use async_trait::async_trait; diff --git a/conjure-runtime/src/client_factory.rs b/conjure-runtime/src/client_factory.rs index 4b916dfe..5dc688b5 100644 --- a/conjure-runtime/src/client_factory.rs +++ b/conjure-runtime/src/client_factory.rs @@ -13,10 +13,10 @@ // limitations under the License. //! The client factory. use crate::builder::{CachedConfig, UncachedConfig}; -use crate::client_cache::ClientCache; use crate::config::{ServiceConfig, ServicesConfig}; use crate::raw::{DefaultRawClient, DefaultRawClientBuilder}; -use crate::{blocking, ClientState}; +use crate::weak_cache::WeakCache; +use crate::{blocking, Builder, ClientState}; use crate::{ Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos, ServiceError, UserAgent, @@ -29,6 +29,8 @@ use std::sync::Arc; use tokio::runtime::Handle; use witchcraft_metrics::MetricRegistry; +const STATE_CACHE_CAPACITY: usize = 10_000; + /// A factory type which can create clients that will live-reload in response to configuration updates. #[derive(Clone)] pub struct ClientFactory(T); @@ -44,7 +46,7 @@ pub struct UserAgentStage { #[derive(Clone)] struct CacheManager { uncached_inner: UncachedConfig, - cache: ClientCache>, + cache: WeakCache>, } impl CacheManager { @@ -53,7 +55,7 @@ impl CacheManager { } fn uncached_mut(&mut self) -> &mut UncachedConfig { - self.cache = ClientCache::new(); + self.cache = WeakCache::new(STATE_CACHE_CAPACITY); &mut self.uncached_inner } } @@ -117,7 +119,7 @@ impl ClientFactory { blocking_handle: None, raw_client_builder: DefaultRawClientBuilder, }, - cache: ClientCache::new(), + cache: WeakCache::new(STATE_CACHE_CAPACITY), }, }) } @@ -320,7 +322,7 @@ impl ClientFactory { builder = builder.host_metrics(host_metrics); } - cache.get(&builder, |b| b.cached_config(), ClientState::new) + cache.get(&builder, Builder::cached_config, ClientState::new) }; let state = make_state(&service_config.get())?; diff --git a/conjure-runtime/src/lib.rs b/conjure-runtime/src/lib.rs index 25ecd51d..5150da15 100644 --- a/conjure-runtime/src/lib.rs +++ b/conjure-runtime/src/lib.rs @@ -204,7 +204,6 @@ pub mod blocking; mod body; pub mod builder; mod client; -mod client_cache; pub mod client_factory; pub mod errors; mod host_metrics; @@ -215,6 +214,7 @@ mod service; mod test; mod user_agent; mod util; +mod weak_cache; /// Client configuration. /// diff --git a/conjure-runtime/src/client_cache.rs b/conjure-runtime/src/weak_cache.rs similarity index 67% rename from conjure-runtime/src/client_cache.rs rename to conjure-runtime/src/weak_cache.rs index f648f594..9e7cee5a 100644 --- a/conjure-runtime/src/client_cache.rs +++ b/conjure-runtime/src/weak_cache.rs @@ -22,8 +22,6 @@ use conjure_error::Error; use linked_hash_map::LinkedHashMap; use parking_lot::Mutex; -const MAX_CACHED_CHANNELS: usize = 1_000; - pub struct Cached where K: Eq + Hash, @@ -96,36 +94,38 @@ where K: Eq + Hash, { cache: LinkedHashMap, CachedValue>, + capacity: usize, next_id: usize, } -pub struct ClientCache +pub struct WeakCache where K: Eq + Hash, { inner: Arc>>, } -impl Clone for ClientCache +impl Clone for WeakCache where K: Eq + Hash, { #[inline] fn clone(&self) -> Self { - ClientCache { + WeakCache { inner: self.inner.clone(), } } } -impl ClientCache +impl WeakCache where K: Eq + Hash + Clone, { - pub fn new() -> Self { - ClientCache { + pub fn new(capacity: usize) -> Self { + WeakCache { inner: Arc::new(Mutex::new(Inner { cache: LinkedHashMap::new(), + capacity, next_id: 0, })), } @@ -133,11 +133,11 @@ where pub fn get( &self, - builder: &T, + seed: &T, get_key: impl FnOnce(&T) -> &K, make_value: impl FnOnce(&T) -> Result, ) -> Result>, Error> { - let key = get_key(builder); + let key = get_key(seed); let mut inner = self.inner.lock(); if let Some(state) = inner.cache.get_refresh(key).and_then(|w| w.value.upgrade()) { @@ -145,7 +145,7 @@ where } let key = Arc::new(key.clone()); - let value = make_value(builder)?; + let value = make_value(seed)?; let id = inner.next_id; inner.next_id += 1; let value = Arc::new(Cached { @@ -162,10 +162,53 @@ where }; inner.cache.insert(key, cached_value); - while inner.cache.len() > MAX_CACHED_CHANNELS { + while inner.cache.len() > inner.capacity { inner.cache.pop_front(); } Ok(value) } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn cleanup_after_drop() { + let cache = WeakCache::new(2); + let value1 = cache.get(&(), |_| &0, |_| Ok(1)).unwrap(); + let value2 = cache.get(&(), |_| &0, |_| panic!()).unwrap(); + assert_eq!(**value1, 1); + assert_eq!(**value2, 1); + + drop((value1, value2)); + + let value3 = cache.get(&(), |_| &0, |_| Ok(2)).unwrap(); + assert_eq!(**value3, 2); + } + + #[test] + fn lru_eviction() { + let cache = WeakCache::new(2); + let _value1 = cache.get(&(), |_| &0, |_| Ok(1)).unwrap(); + let _value2 = cache.get(&(), |_| &1, |_| Ok(2)).unwrap(); + + // insert 2, evict 0 + let _value3 = cache.get(&(), |_| &2, |_| Ok(3)).unwrap(); + + // insert 0, evict 1 + let value4 = cache.get(&(), |_| &0, |_| Ok(4)).unwrap(); + assert_eq!(**value4, 4); + + // refresh 2 + cache.get(&(), |_| &2, |_| panic!()).unwrap(); + + // insert 3, evict 0 + cache.get(&(), |_| &3, |_| Ok(5)).unwrap(); + + // insert 0, evict 2 + let value5 = cache.get(&(), |_| &0, |_| Ok(6)).unwrap(); + assert_eq!(**value5, 6); + } +} From 705cc649a14983bfa3b611b050e993f404c5d186 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sun, 21 Apr 2024 10:49:23 -0400 Subject: [PATCH 7/8] newline --- conjure-runtime/src/client_factory.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/conjure-runtime/src/client_factory.rs b/conjure-runtime/src/client_factory.rs index 5dc688b5..5ffb5085 100644 --- a/conjure-runtime/src/client_factory.rs +++ b/conjure-runtime/src/client_factory.rs @@ -115,7 +115,6 @@ impl ClientFactory { uncached_inner: UncachedConfig { metrics: None, host_metrics: None, - blocking_handle: None, raw_client_builder: DefaultRawClientBuilder, }, From fc267fbdeeb11769f9cb1ef13e39fe2cba6d1fcb Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sun, 21 Apr 2024 10:52:23 -0400 Subject: [PATCH 8/8] check that entries are cleaned on drop --- conjure-runtime/src/weak_cache.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/conjure-runtime/src/weak_cache.rs b/conjure-runtime/src/weak_cache.rs index 9e7cee5a..cd2aee4a 100644 --- a/conjure-runtime/src/weak_cache.rs +++ b/conjure-runtime/src/weak_cache.rs @@ -183,6 +183,7 @@ mod test { assert_eq!(**value2, 1); drop((value1, value2)); + assert_eq!(cache.inner.lock().cache.len(), 0); let value3 = cache.get(&(), |_| &0, |_| Ok(2)).unwrap(); assert_eq!(**value3, 2);