Skip to content

Commit

Permalink
refactor(571): Observe queue length on semaphore blocking and acquisi…
Browse files Browse the repository at this point in the history
…tion

Fixes #571.
  • Loading branch information
joshua-spacetime committed Nov 20, 2023
1 parent adcff98 commit be452a3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 113 deletions.
40 changes: 12 additions & 28 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ use crate::identity::Identity;
use crate::json::client_api::{SubscriptionUpdateJson, TableRowOperationJson, TableUpdateJson};
use crate::protobuf::client_api::{table_row_operation, SubscriptionUpdate, TableRowOperation, TableUpdate};
use crate::subscription::module_subscription_actor::ModuleSubscriptionManager;
use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed, WaiterGauge};
use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed};
use crate::util::notify_once::NotifyOnce;
use crate::util::prometheus_handle::{GaugeInc, IntGaugeExt};
use crate::worker_metrics::WORKER_METRICS;
use base64::{engine::general_purpose::STANDARD as BASE_64_STD, Engine as _};
use futures::{Future, FutureExt};
use indexmap::IndexMap;
Expand Down Expand Up @@ -336,7 +334,7 @@ impl fmt::Debug for ModuleHost {
trait DynModuleHost: Send + Sync + 'static {
async fn get_instance(
&self,
waiter_gauge_context: <InstancePoolGauge as WaiterGauge>::Context<'_>,
ctx: (Identity, Hash, Address, &str),
) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule>;
fn inject_logs(&self, log_level: LogLevel, message: &str);
fn one_off_query(
Expand All @@ -353,7 +351,7 @@ trait DynModuleHost: Send + Sync + 'static {
struct HostControllerActor<T: Module> {
module: Arc<T>,
threadpool: Arc<HostThreadpool>,
instance_pool: LendingPool<T::Instance, InstancePoolGauge>,
instance_pool: LendingPool<T::Instance>,
start: NotifyOnce,
}

Expand Down Expand Up @@ -381,38 +379,24 @@ async fn select_first<A: Future, B: Future<Output = ()>>(fut_a: A, fut_b: B) ->
}
}

#[derive(Clone)]
struct InstancePoolGauge;

impl WaiterGauge for InstancePoolGauge {
type Context<'a> = &'a (&'a Identity, &'a Hash, &'a Address, &'a str);
type IncGuard = GaugeInc;
fn inc(&self, &(identity, module_hash, database_address, reducer_symbol): Self::Context<'_>) -> Self::IncGuard {
WORKER_METRICS
.instance_queue_length
.with_label_values(identity, module_hash, database_address, reducer_symbol)
.inc_scope()
}
}

#[async_trait::async_trait]
impl<T: Module> DynModuleHost for HostControllerActor<T> {
async fn get_instance(
&self,
waiter_gauge_context: <InstancePoolGauge as WaiterGauge>::Context<'_>,
ctx: (Identity, Hash, Address, &str),
) -> Result<(&HostThreadpool, Box<dyn ModuleInstance>), NoSuchModule> {
self.start.notified().await;
// in the future we should do something like in the else branch here -- add more instances based on load.
// we need to do write-skew retries first - right now there's only ever once instance per module.
let inst = if true {
self.instance_pool
.request_with_context(waiter_gauge_context)
.request_with_context(ctx)
.await
.map_err(|_| NoSuchModule)?
} else {
const GET_INSTANCE_TIMEOUT: Duration = Duration::from_millis(500);
select_first(
self.instance_pool.request_with_context(waiter_gauge_context),
self.instance_pool.request_with_context(ctx),
tokio::time::sleep(GET_INSTANCE_TIMEOUT).map(|()| self.spinup_new_instance()),
)
.await
Expand Down Expand Up @@ -500,7 +484,7 @@ pub enum InitDatabaseError {
impl ModuleHost {
pub fn new(threadpool: Arc<HostThreadpool>, mut module: impl Module) -> Self {
let info = module.info();
let instance_pool = LendingPool::with_gauge(InstancePoolGauge);
let instance_pool = LendingPool::new();
instance_pool.add_multiple(module.initial_instances()).unwrap();
let inner = Arc::new(HostControllerActor {
module: Arc::new(module),
Expand Down Expand Up @@ -530,13 +514,13 @@ impl ModuleHost {
F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static,
R: Send + 'static,
{
let waiter_gauge_context = (
&self.info.identity,
&self.info.module_hash,
&self.info.address,
let context = (
self.info.identity,
self.info.module_hash,
self.info.address,
reducer_name,
);
let (threadpool, mut inst) = self.inner.get_instance(&waiter_gauge_context).await?;
let (threadpool, mut inst) = self.inner.get_instance(context).await?;

let (tx, rx) = oneshot::channel();
threadpool.spawn(move || {
Expand Down
124 changes: 40 additions & 84 deletions crates/core/src/util/lending_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,28 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use parking_lot::Mutex;
use prometheus::IntGauge;
use spacetimedb_lib::{Address, Hash, Identity};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;

use super::notify_once::{NotifiedOnce, NotifyOnce};
use super::prometheus_handle::GaugeInc;

pub struct LendingPool<T, G = ()> {
pub struct LendingPool<T> {
sem: Arc<Semaphore>,
waiter_gauge: G,
inner: Arc<LendingPoolInner<T>>,
}

impl<T, G: WaiterGauge + Default> Default for LendingPool<T, G> {
impl<T> Default for LendingPool<T> {
fn default() -> Self {
Self::new()
}
}

impl<T, G: Clone> Clone for LendingPool<T, G> {
impl<T> Clone for LendingPool<T> {
fn clone(&self) -> Self {
Self {
sem: self.sem.clone(),
waiter_gauge: self.waiter_gauge.clone(),
inner: self.inner.clone(),
}
}
Expand All @@ -52,50 +49,38 @@ struct PoolVec<T> {
#[derive(Debug)]
pub struct PoolClosed;

impl<T, G: WaiterGauge> LendingPool<T, G> {
pub fn new() -> Self
where
G: Default,
{
Self::with_gauge(G::default())
}

pub fn with_gauge(waiter_gauge: G) -> Self {
Self::from_iter_with_gauge(std::iter::empty(), waiter_gauge)
}

pub fn from_iter_with_gauge<I: IntoIterator<Item = T>>(iter: I, waiter_gauge: G) -> Self {
let deque = VecDeque::from_iter(iter);
Self {
sem: Arc::new(Semaphore::new(deque.len())),
waiter_gauge,
inner: Arc::new(LendingPoolInner {
closed_notify: NotifyOnce::new(),
vec: Mutex::new(PoolVec {
total_count: deque.len(),
deque: Some(deque),
}),
}),
}
}

pub fn request(&self) -> impl Future<Output = Result<LentResource<T>, PoolClosed>>
where
G: for<'a> WaiterGauge<Context<'a> = ()>,
{
self.request_with_context(())
impl<T> LendingPool<T> {
pub fn new() -> Self {
Self::from_iter(std::iter::empty())
}

pub fn request_with_context(
&self,
context: G::Context<'_>,
(identity, module_hash, database_address, reducer_symbol): (Identity, Hash, Address, &str),
) -> impl Future<Output = Result<LentResource<T>, PoolClosed>> {
let acq = self.sem.clone().acquire_owned();
let pool_inner = self.inner.clone();
let waiter_guard = self.waiter_gauge.inc(context);

let queue_len = WORKER_METRICS.instance_queue_length.with_label_values(
&identity,
&module_hash,
&database_address,
reducer_symbol,
);
let queue_len_histogram = WORKER_METRICS.instance_queue_length_histogram.with_label_values(
&identity,
&module_hash,
&database_address,
reducer_symbol,
);

queue_len.inc();
queue_len_histogram.observe(queue_len.get() as f64);

async move {
let permit = acq.await.map_err(|_| PoolClosed)?;
drop(waiter_guard);
queue_len.dec();
queue_len_histogram.observe(queue_len.get() as f64);
let resource = pool_inner
.vec
.lock()
Expand Down Expand Up @@ -154,9 +139,19 @@ impl<T, G: WaiterGauge> LendingPool<T, G> {
}
}

impl<T, G: WaiterGauge + Default> FromIterator<T> for LendingPool<T, G> {
impl<T> FromIterator<T> for LendingPool<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self::from_iter_with_gauge(iter, G::default())
let deque = VecDeque::from_iter(iter);
Self {
sem: Arc::new(Semaphore::new(deque.len())),
inner: Arc::new(LendingPoolInner {
closed_notify: NotifyOnce::new(),
vec: Mutex::new(PoolVec {
total_count: deque.len(),
deque: Some(deque),
}),
}),
}
}
}

Expand Down Expand Up @@ -194,17 +189,6 @@ impl<T> DerefMut for LentResource<T> {
}
}

// impl<T> LentResource<T> {
// fn keep(this: Self) -> T {
// let mut this = ManuallyDrop::new(this);
// let resource = unsafe { ManuallyDrop::take(&mut this.resource) };
// let permit = unsafe { ManuallyDrop::take(&mut this.permit) };
// permit.forget();
// let prev_count = this.pool.total_count.fetch_sub(1, SeqCst);
// resource
// }
// }

impl<T> Drop for LentResource<T> {
fn drop(&mut self) {
let resource = unsafe { ManuallyDrop::take(&mut self.resource) };
Expand All @@ -225,31 +209,3 @@ impl<T> Drop for LentResource<T> {
}
}
}

pub trait WaiterGauge {
type Context<'a>;
type IncGuard;
fn inc(&self, context: Self::Context<'_>) -> Self::IncGuard;
}

impl WaiterGauge for () {
type Context<'a> = ();
type IncGuard = ();
fn inc(&self, (): ()) -> Self::IncGuard {}
}

impl<G: WaiterGauge> WaiterGauge for Arc<G> {
type Context<'a> = G::Context<'a>;
type IncGuard = G::IncGuard;
fn inc(&self, context: Self::Context<'_>) -> Self::IncGuard {
(**self).inc(context)
}
}

impl WaiterGauge for IntGauge {
type Context<'a> = ();
type IncGuard = GaugeInc;
fn inc(&self, (): ()) -> Self::IncGuard {
self.inc_scope()
}
}
2 changes: 1 addition & 1 deletion crates/core/src/util/prometheus_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub struct GaugeInc {
impl Drop for GaugeInc {
#[inline]
fn drop(&mut self) {
self.gauge.dec()
self.gauge.dec();
}
}

Expand Down

1 comment on commit be452a3

@github-actions
Copy link

@github-actions github-actions bot commented on be452a3 Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark results

Error when comparing benchmarks:

Caused by:

Please sign in to comment.