Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache client state #198

Merged
merged 8 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-198.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: feature
feature:
description: Client state is now cached and shared between clients.
links:
- https://github.com/palantir/conjure-rust-runtime/pull/198
10 changes: 5 additions & 5 deletions conjure-runtime-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl ServiceConfig {
}

/// Security configuration used to communicate with a service.
#[derive(Debug, Clone, PartialEq, Default, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[staged_builder]
#[builder(update)]
Expand Down Expand Up @@ -247,7 +247,7 @@ impl SecurityConfig {
}

/// Proxy configuration used to connect to a service.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
#[serde(rename_all = "kebab-case", tag = "type")]
#[non_exhaustive]
pub enum ProxyConfig {
Expand All @@ -265,7 +265,7 @@ impl Default for ProxyConfig {
}

/// Configuration for an HTTP proxy.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[staged_builder]
#[builder(update)]
Expand All @@ -288,7 +288,7 @@ impl HttpProxyConfig {
}

/// A host and port identifier of a server.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HostAndPort {
host: String,
port: u16,
Expand Down Expand Up @@ -343,7 +343,7 @@ impl HostAndPort {
}

/// Credentials used to authenticate with an HTTP proxy.
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)]
pub struct BasicCredentials {
username: String,
password: String,
Expand Down
1 change: 1 addition & 0 deletions conjure-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ hyper-rustls = { version = "0.24.0", default-features = false, features = [
"tls12",
] }
hyper = { version = "0.14", features = ["http1", "http2", "client", "tcp"] }
linked-hash-map = "0.5"
once_cell = "1.0"
parking_lot = "0.12"
percent-encoding = "2.1"
Expand Down
10 changes: 5 additions & 5 deletions conjure-runtime/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ where
}

/// Specifies the beahavior of client-side sympathetic rate limiting.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ClientQos {
/// Enable client side rate limiting.
Expand All @@ -568,7 +568,7 @@ pub enum ClientQos {
/// Specifies the behavior of a client in response to a `QoS` error from a server.
///
/// QoS errors have status codes 429 or 503.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ServerQos {
/// The client will automatically retry the request when possible in response to a QoS error.
Expand All @@ -586,7 +586,7 @@ pub enum ServerQos {
/// Specifies the behavior of the client in response to a service error from a server.
///
/// Service errors are encoded as responses with a 4xx or 5xx response code and a body containing a `SerializableError`.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ServiceError {
/// The service error will be propagated as a new internal service error.
Expand All @@ -605,7 +605,7 @@ pub enum ServiceError {
}

/// Specifies the manner in which the client decides if a request is idempotent or not.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum Idempotency {
/// All requests are assumed to be idempotent.
Expand All @@ -622,7 +622,7 @@ pub enum Idempotency {
}

/// Specifies the strategy used to select a node of a service to use for a request attempt.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum NodeSelectionStrategy {
/// Pin to a single host as long as it continues to successfully respond to requests.
Expand Down
7 changes: 6 additions & 1 deletion conjure-runtime/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::client_cache::CacheEvictor;
// Copyright 2020 Palantir Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -65,6 +66,7 @@ pub(crate) type BaseBody<B> = WaitForSpansBody<DecodedBody<B>>;

pub(crate) struct ClientState<T> {
service: BaseService<T>,
pub(crate) evictor: Option<CacheEvictor>,
}

impl<T> ClientState<T> {
Expand Down Expand Up @@ -93,7 +95,10 @@ impl<T> ClientState<T> {
.layer(MapErrorLayer)
.service(client);

Ok(ClientState { service })
Ok(ClientState {
service,
evictor: None,
})
}
}

Expand Down
149 changes: 149 additions & 0 deletions conjure-runtime/src/client_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2024 Palantir Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
sync::{Arc, Weak},
time::Duration,
};

use conjure_error::Error;
use conjure_runtime_config::{ProxyConfig, SecurityConfig};
use linked_hash_map::LinkedHashMap;
use parking_lot::Mutex;
use url::Url;

use crate::{
raw::DefaultRawClient, Builder, ClientQos, ClientState, Idempotency, NodeSelectionStrategy,
ServerQos, ServiceError, UserAgent,
};

const MAX_CACHED_CHANNELS: usize = 1_000;

struct CachedState {
state: Weak<ClientState<DefaultRawClient>>,
id: usize,
}

struct Inner {
cache: LinkedHashMap<Arc<CacheKey>, CachedState>,
next_id: usize,
}

#[derive(Clone)]
pub struct ClientCache {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth adding some tests for the ClientCache logic, especially since the evictor logic is a bit tricky

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think the best route is probably to make the cache generic so I can use a different type than DefaultRawClient for testing?

Copy link

@TheKeveloper TheKeveloper Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that sounds right. If you make the cache key generic it might also make it easier to move that closer to the builder logic, where the builder comes with a method to generate the cache key.

inner: Arc<Mutex<Inner>>,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of having this inner field on the public struct? Does it hide your Inner cache implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!


impl ClientCache {
pub fn new() -> Self {
ClientCache {
inner: Arc::new(Mutex::new(Inner {
cache: LinkedHashMap::new(),
next_id: 0,
})),
}
}

pub fn get(&self, builder: &Builder) -> Result<Arc<ClientState<DefaultRawClient>>, Error> {
let key = Arc::new(CacheKey {
service: builder.get_service().to_string(),
user_agent: builder.get_user_agent().clone(),
uris: builder.get_uris().to_vec(),
security: builder.get_security().clone(),
proxy: builder.get_proxy().clone(),
connect_timeout: builder.get_connect_timeout(),
read_timeout: builder.get_read_timeout(),
write_timeout: builder.get_write_timeout(),
backoff_slot_size: builder.get_backoff_slot_size(),
max_num_retries: builder.get_max_num_retries(),
client_qos: builder.get_client_qos(),
server_qos: builder.get_server_qos(),
service_error: builder.get_service_error(),
idempotency: builder.get_idempotency(),
node_selection_strategy: builder.get_node_selection_strategy(),
rng_seed: builder.get_rng_seed(),
});

let mut inner = self.inner.lock();
if let Some(state) = inner
.cache
.get_refresh(&key)
.and_then(|w| w.state.upgrade())
{
return Ok(state.clone());
}

let mut state = ClientState::new(builder)?;
let id = inner.next_id;
inner.next_id += 1;
state.evictor = Some(CacheEvictor {
inner: Arc::downgrade(&self.inner),
key: key.clone(),
id,
});
let state = Arc::new(state);
let cached_state = CachedState {
state: Arc::downgrade(&state),
id,
};
inner.cache.insert(key, cached_state);

while inner.cache.len() > MAX_CACHED_CHANNELS {
inner.cache.pop_front();
}

Ok(state)
}
}

#[derive(Clone, PartialEq, Eq, Hash)]
struct CacheKey {
service: String,
user_agent: UserAgent,
uris: Vec<Url>,
security: SecurityConfig,
proxy: ProxyConfig,
connect_timeout: Duration,
read_timeout: Duration,
write_timeout: Duration,
backoff_slot_size: Duration,
max_num_retries: u32,
client_qos: ClientQos,
server_qos: ServerQos,
service_error: ServiceError,
idempotency: Idempotency,
node_selection_strategy: NodeSelectionStrategy,
rng_seed: Option<u64>,
}

pub struct CacheEvictor {
inner: Weak<Mutex<Inner>>,
key: Arc<CacheKey>,
id: usize,
}

impl Drop for CacheEvictor {
fn drop(&mut self) {
let Some(inner) = self.inner.upgrade() else {
return;
};
let mut inner = inner.lock();

if let Some(cached_state) = inner.cache.get(&self.key) {
if cached_state.id == self.id {
inner.cache.remove(&self.key);
}
}
}
}
19 changes: 15 additions & 4 deletions conjure-runtime/src/client_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.
//! The client factory.
use crate::blocking;
use crate::client::ClientState;
use crate::client_cache::ClientCache;
use crate::config::{ServiceConfig, ServicesConfig};
use crate::{
Client, ClientQos, HostMetricsRegistry, Idempotency, NodeSelectionStrategy, ServerQos,
Expand Down Expand Up @@ -52,6 +52,7 @@ pub struct Complete {
idempotency: Idempotency,
node_selection_strategy: NodeSelectionStrategy,
blocking_handle: Option<Handle>,
cache: ClientCache,
}

impl Default for ClientFactory<ConfigStage> {
Expand Down Expand Up @@ -95,11 +96,17 @@ impl ClientFactory<UserAgentStage> {
idempotency: Idempotency::ByMethod,
node_selection_strategy: NodeSelectionStrategy::PinUntilError,
blocking_handle: None,
cache: ClientCache::new(),
})
}
}

impl ClientFactory {
// Some state can't be tracked in the cache key, so we instead swap to a new cache.
fn swap_cache(&mut self) {
self.0.cache = ClientCache::new();
}

/// Sets the user agent sent by clients.
#[inline]
pub fn user_agent(mut self, user_agent: UserAgent) -> Self {
Expand Down Expand Up @@ -199,6 +206,7 @@ impl ClientFactory {
#[inline]
pub fn metrics(mut self, metrics: Arc<MetricRegistry>) -> Self {
self.0.metrics = Some(metrics);
self.swap_cache();
self
}

Expand All @@ -214,6 +222,7 @@ impl ClientFactory {
#[inline]
pub fn host_metrics(mut self, host_metrics: Arc<HostMetricsRegistry>) -> Self {
self.0.host_metrics = Some(host_metrics);
self.swap_cache();
self
}

Expand All @@ -231,6 +240,7 @@ impl ClientFactory {
#[inline]
pub fn blocking_handle(mut self, blocking_handle: Handle) -> Self {
self.0.blocking_handle = Some(blocking_handle);
self.swap_cache();
self
}

Expand Down Expand Up @@ -275,6 +285,7 @@ impl ClientFactory {
let service_error = self.0.service_error;
let idempotency = self.0.idempotency;
let node_selection_strategy = self.0.node_selection_strategy;
let cache = self.0.cache.clone();

let make_state = move |config: &ServiceConfig| {
let mut builder = Client::builder()
Expand All @@ -295,17 +306,17 @@ impl ClientFactory {
builder = builder.host_metrics(host_metrics);
}

ClientState::new(&builder)
cache.get(&builder)
};

let state = make_state(&service_config.get())?;
let state = Arc::new(ArcSwap::new(Arc::new(state)));
let state = Arc::new(ArcSwap::new(state));

let subscription = service_config.subscribe({
let state = state.clone();
move |config| {
let new_state = make_state(config)?;
state.store(Arc::new(new_state));
state.store(new_state);
Ok(())
}
})?;
Expand Down
1 change: 1 addition & 0 deletions conjure-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ pub mod blocking;
mod body;
pub mod builder;
mod client;
mod client_cache;
pub mod client_factory;
pub mod errors;
mod host_metrics;
Expand Down
4 changes: 2 additions & 2 deletions conjure-runtime/src/user_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ static VALID_VERSION: Lazy<Regex> =
const DEFAULT_VERSION: &str = "0.0.0";

/// A representation of an HTTP `User-Agent` header value.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct UserAgent {
node_id: Option<String>,
primary: Agent,
Expand Down Expand Up @@ -91,7 +91,7 @@ impl UserAgent {
}

/// A component of a [`UserAgent`].
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Agent {
name: String,
version: String,
Expand Down