diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 7a96c86ded20..ab7b11336e99 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -17,11 +17,9 @@ use pageserver::config::PageserverIdentity; use pageserver::control_plane_client::ControlPlaneClient; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; -use pageserver::task_mgr::WALRECEIVER_RUNTIME; +use pageserver::task_mgr::{COMPUTE_REQUEST_RUNTIME, WALRECEIVER_RUNTIME}; use pageserver::tenant::{secondary, TenantSharedResources}; -use pageserver::{ - CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, LibpqEndpointListener, -}; +use pageserver::{CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener}; use remote_storage::GenericRemoteStorage; use tokio::signal::unix::SignalKind; use tokio::time::Instant; @@ -31,11 +29,9 @@ use tracing::*; use metrics::set_build_info_metric; use pageserver::{ config::PageServerConf, - context::{DownloadBehavior, RequestContext}, deletion_queue::DeletionQueue, http, page_cache, page_service, task_mgr, - task_mgr::TaskKind, - task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, + task_mgr::{BACKGROUND_RUNTIME, MGMT_REQUEST_RUNTIME}, tenant::mgr, virtual_file, }; @@ -593,30 +589,13 @@ fn start_pageserver( // Spawn a task to listen for libpq connections. It will spawn further tasks // for each connection. We created the listener earlier already. - let libpq_listener = { - let cancel = CancellationToken::new(); - let libpq_ctx = RequestContext::todo_child( - TaskKind::LibpqEndpointListener, - // listener task shouldn't need to download anything. (We will - // create a separate sub-contexts for each connection, with their - // own download behavior. This context is used only to listen and - // accept connections.) - DownloadBehavior::Error, - ); - - let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( - "libpq listener", - page_service::libpq_listener_main( - tenant_manager.clone(), - pg_auth, - pageserver_listener, - conf.pg_auth_type, - libpq_ctx, - cancel.clone(), - ), - )); - LibpqEndpointListener(CancellableTask { task, cancel }) - }; + let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, { + let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it + pageserver_listener + .set_nonblocking(true) + .context("set listener to nonblocking")?; + tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")? + }); let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard()); @@ -644,7 +623,7 @@ fn start_pageserver( shutdown_pageserver.take(); pageserver::shutdown_pageserver( http_endpoint_listener, - libpq_listener, + page_service, consumption_metrics_tasks, disk_usage_eviction_task, &tenant_manager, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7935aeb5e903..d7da61e7cf98 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -296,6 +296,11 @@ impl From for ApiError { GetActiveTenantError::WaitForActiveTimeout { .. } => { ApiError::ResourceUnavailable(format!("{}", e).into()) } + GetActiveTenantError::SwitchedTenant => { + // in our HTTP handlers, this error doesn't happen + // TODO: separate error types + ApiError::ResourceUnavailable("switched tenant".into()) + } } } } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index d944019641e3..f729cad3c3b2 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -30,7 +30,6 @@ pub mod walingest; pub mod walrecord; pub mod walredo; -use crate::task_mgr::TaskKind; use camino::Utf8Path; use deletion_queue::DeletionQueue; use tenant::{ @@ -63,7 +62,6 @@ pub struct CancellableTask { pub cancel: CancellationToken, } pub struct HttpEndpointListener(pub CancellableTask); -pub struct LibpqEndpointListener(pub CancellableTask); pub struct ConsumptionMetricsTasks(pub CancellableTask); pub struct DiskUsageEvictionTask(pub CancellableTask); impl CancellableTask { @@ -77,7 +75,7 @@ impl CancellableTask { #[allow(clippy::too_many_arguments)] pub async fn shutdown_pageserver( http_listener: HttpEndpointListener, - libpq_listener: LibpqEndpointListener, + page_service: page_service::Listener, consumption_metrics_worker: ConsumptionMetricsTasks, disk_usage_eviction_task: Option, tenant_manager: &TenantManager, @@ -89,8 +87,8 @@ pub async fn shutdown_pageserver( use std::time::Duration; // Shut down the libpq endpoint task. This prevents new connections from // being accepted. - timed( - libpq_listener.0.shutdown(), + let remaining_connections = timed( + page_service.stop_accepting(), "shutdown LibpqEndpointListener", Duration::from_secs(1), ) @@ -108,7 +106,7 @@ pub async fn shutdown_pageserver( // Shut down any page service tasks: any in-progress work for particular timelines or tenants // should already have been canclled via mgr::shutdown_all_tenants timed( - task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None), + remaining_connections.shutdown(), "shutdown PageRequestHandlers", Duration::from_secs(1), ) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6353f713e030..5344b83e0d1d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -4,9 +4,8 @@ use anyhow::Context; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use pageserver_api::key::Key; +use futures::FutureExt; +use once_cell::sync::OnceCell; use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -15,28 +14,23 @@ use pageserver_api::models::{ PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, PagestreamProtocolVersion, }; -use pageserver_api::shard::ShardIndex; -use pageserver_api::shard::ShardNumber; use pageserver_api::shard::TenantShardId; use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError}; use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; -use std::collections::HashMap; use std::io; -use std::net::TcpListener; use std::str; use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; -use std::time::Instant; use std::time::SystemTime; +use std::time::{Duration, Instant}; use tokio::io::AsyncWriteExt; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::sync::gate::GateGuard; use utils::{ auth::{Claims, Scope, SwappableJwtAuth}, id::{TenantId, TimelineId}, @@ -47,61 +41,130 @@ use utils::{ use crate::auth::check_permission; use crate::basebackup; use crate::basebackup::BasebackupError; +use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS}; use crate::pgdatadir_mapping::Version; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; -use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::tenant::mgr::GetActiveTenantError; -use crate::tenant::mgr::GetTenantError; -use crate::tenant::mgr::ShardResolveResult; +use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME}; use crate::tenant::mgr::ShardSelector; use crate::tenant::mgr::TenantManager; -use crate::tenant::timeline::WaitLsnError; +use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult}; +use crate::tenant::timeline::{self, WaitLsnError}; use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; -use crate::tenant::Tenant; use crate::tenant::Timeline; use pageserver_api::key::rel_block_to_key; use pageserver_api::reltag::SlruKind; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; -// How long we may wait for a [`TenantSlot::InProgress`]` and/or a [`Tenant`] which -// is not yet in state [`TenantState::Active`]. +/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which +/// is not yet in state [`TenantState::Active`]. +/// +/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`]. const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); /////////////////////////////////////////////////////////////////////////////// +pub struct Listener { + cancel: CancellationToken, + /// Cancel the listener task through `listen_cancel` to shut down the listener + /// and get a handle on the existing connections. + task: JoinHandle, +} + +pub struct Connections { + cancel: CancellationToken, + tasks: tokio::task::JoinSet, +} + +pub fn spawn( + conf: &'static PageServerConf, + tenant_manager: Arc, + pg_auth: Option>, + tcp_listener: tokio::net::TcpListener, +) -> Listener { + let cancel = CancellationToken::new(); + let libpq_ctx = RequestContext::todo_child( + TaskKind::LibpqEndpointListener, + // listener task shouldn't need to download anything. (We will + // create a separate sub-contexts for each connection, with their + // own download behavior. This context is used only to listen and + // accept connections.) + DownloadBehavior::Error, + ); + let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( + "libpq listener", + libpq_listener_main( + tenant_manager, + pg_auth, + tcp_listener, + conf.pg_auth_type, + libpq_ctx, + cancel.clone(), + ) + .map(anyhow::Ok), + )); + + Listener { cancel, task } +} + +impl Listener { + pub async fn stop_accepting(self) -> Connections { + self.cancel.cancel(); + self.task + .await + .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error") + } +} +impl Connections { + pub async fn shutdown(self) { + let Self { cancel, mut tasks } = self; + cancel.cancel(); + while let Some(res) = tasks.join_next().await { + // the logging done here mimics what was formerly done by task_mgr + match res { + Ok(Ok(())) => {} + Ok(Err(e)) => error!("error in page_service connection task: {:?}", e), + Err(e) => error!("page_service connection task panicked: {:?}", e), + } + } + } +} + /// /// Main loop of the page service. /// /// Listens for connections, and launches a new handler task for each. /// +/// Returns Ok(()) upon cancellation via `cancel`, returning the set of +/// open connections. +/// pub async fn libpq_listener_main( tenant_manager: Arc, auth: Option>, - listener: TcpListener, + listener: tokio::net::TcpListener, auth_type: AuthType, listener_ctx: RequestContext, - cancel: CancellationToken, -) -> anyhow::Result<()> { - listener.set_nonblocking(true)?; - let tokio_listener = tokio::net::TcpListener::from_std(listener)?; + listener_cancel: CancellationToken, +) -> Connections { + let connections_cancel = CancellationToken::new(); + let mut connection_handler_tasks = tokio::task::JoinSet::default(); // Wait for a new connection to arrive, or for server shutdown. while let Some(res) = tokio::select! { biased; - _ = cancel.cancelled() => { + _ = listener_cancel.cancelled() => { // We were requested to shut down. None } - res = tokio_listener.accept() => { + res = listener.accept() => { Some(res) } } { @@ -110,28 +173,16 @@ pub async fn libpq_listener_main( // Connection established. Spawn a new task to handle it. debug!("accepted connection from {}", peer_addr); let local_auth = auth.clone(); - let connection_ctx = listener_ctx .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download); - - // PageRequestHandler tasks are not associated with any particular - // timeline in the task manager. In practice most connections will - // only deal with a particular timeline, but we don't know which one - // yet. - task_mgr::spawn( - &tokio::runtime::Handle::current(), - TaskKind::PageRequestHandler, - None, - None, - "serving compute connection task", - page_service_conn_main( - tenant_manager.clone(), - local_auth, - socket, - auth_type, - connection_ctx, - ), - ); + connection_handler_tasks.spawn(page_service_conn_main( + tenant_manager.clone(), + local_auth, + socket, + auth_type, + connection_ctx, + connections_cancel.child_token(), + )); } Err(err) => { // accept() failed. Log the error, and loop back to retry on next connection. @@ -140,11 +191,16 @@ pub async fn libpq_listener_main( } } - debug!("page_service loop terminated"); + debug!("page_service listener loop terminated"); - Ok(()) + Connections { + cancel: connections_cancel, + tasks: connection_handler_tasks, + } } +type ConnectionHandlerResult = anyhow::Result<()>; + #[instrument(skip_all, fields(peer_addr))] async fn page_service_conn_main( tenant_manager: Arc, @@ -152,7 +208,8 @@ async fn page_service_conn_main( socket: tokio::net::TcpStream, auth_type: AuthType, connection_ctx: RequestContext, -) -> anyhow::Result<()> { + cancel: CancellationToken, +) -> ConnectionHandlerResult { let _guard = LIVE_CONNECTIONS .with_label_values(&["page_service"]) .guard(); @@ -200,13 +257,11 @@ async fn page_service_conn_main( // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. - let mut conn_handler = PageServerHandler::new(tenant_manager, auth, connection_ctx); + let mut conn_handler = + PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone()); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; - match pgbackend - .run(&mut conn_handler, &task_mgr::shutdown_token()) - .await - { + match pgbackend.run(&mut conn_handler, &cancel).await { Ok(()) => { // we've been requested to shut down Ok(()) @@ -223,32 +278,154 @@ async fn page_service_conn_main( } } -/// While a handler holds a reference to a Timeline, it also holds a the -/// timeline's Gate open. -struct HandlerTimeline { - timeline: Arc, - _guard: GateGuard, -} - struct PageServerHandler { auth: Option>, claims: Option, - tenant_manager: Arc, - /// The context created for the lifetime of the connection /// services by this PageServerHandler. /// For each query received over the connection, /// `process_query` creates a child context from this one. connection_ctx: RequestContext, - /// See [`Self::cache_timeline`] for usage. - /// + cancel: CancellationToken, + + timeline_handles: TimelineHandles, +} + +struct TimelineHandles { + wrapper: TenantManagerWrapper, /// Note on size: the typical size of this map is 1. The largest size we expect /// to see is the number of shards divided by the number of pageservers (typically < 2), /// or the ratio used when splitting shards (i.e. how many children created from one) /// parent shard, where a "large" number might be ~8. - shard_timelines: HashMap, + handles: timeline::handle::Cache, +} + +impl TimelineHandles { + fn new(tenant_manager: Arc) -> Self { + Self { + wrapper: TenantManagerWrapper { + tenant_manager, + tenant_id: OnceCell::new(), + }, + handles: Default::default(), + } + } + async fn get( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + shard_selector: ShardSelector, + ) -> Result, GetActiveTimelineError> { + if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id { + return Err(GetActiveTimelineError::Tenant( + GetActiveTenantError::SwitchedTenant, + )); + } + self.handles + .get(timeline_id, shard_selector, &self.wrapper) + .await + .map_err(|e| match e { + timeline::handle::GetError::TenantManager(e) => e, + timeline::handle::GetError::TimelineGateClosed => { + trace!("timeline gate closed"); + GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) + } + timeline::handle::GetError::PerTimelineStateShutDown => { + trace!("per-timeline state shut down"); + GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) + } + }) + } +} + +pub(crate) struct TenantManagerWrapper { + tenant_manager: Arc, + // We do not support switching tenant_id on a connection at this point. + // We can can add support for this later if needed without changing + // the protocol. + tenant_id: once_cell::sync::OnceCell, +} + +#[derive(Debug)] +pub(crate) struct TenantManagerTypes; + +impl timeline::handle::Types for TenantManagerTypes { + type TenantManagerError = GetActiveTimelineError; + type TenantManager = TenantManagerWrapper; + type Timeline = Arc; +} + +impl timeline::handle::ArcTimeline for Arc { + fn gate(&self) -> &utils::sync::gate::Gate { + &self.gate + } + + fn shard_timeline_id(&self) -> timeline::handle::ShardTimelineId { + Timeline::shard_timeline_id(self) + } + + fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState { + &self.handles + } + + fn get_shard_identity(&self) -> &pageserver_api::shard::ShardIdentity { + Timeline::get_shard_identity(self) + } +} + +impl timeline::handle::TenantManager for TenantManagerWrapper { + async fn resolve( + &self, + timeline_id: TimelineId, + shard_selector: ShardSelector, + ) -> Result, GetActiveTimelineError> { + let tenant_id = self.tenant_id.get().expect("we set this in get()"); + let timeout = ACTIVE_TENANT_TIMEOUT; + let wait_start = Instant::now(); + let deadline = wait_start + timeout; + let tenant_shard = loop { + let resolved = self + .tenant_manager + .resolve_attached_shard(tenant_id, shard_selector); + match resolved { + ShardResolveResult::Found(tenant_shard) => break tenant_shard, + ShardResolveResult::NotFound => { + return Err(GetActiveTimelineError::Tenant( + GetActiveTenantError::NotFound(GetTenantError::NotFound(*tenant_id)), + )); + } + ShardResolveResult::InProgress(barrier) => { + // We can't authoritatively answer right now: wait for InProgress state + // to end, then try again + tokio::select! { + _ = barrier.wait() => { + // The barrier completed: proceed around the loop to try looking up again + }, + _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => { + return Err(GetActiveTimelineError::Tenant(GetActiveTenantError::WaitForActiveTimeout { + latest_state: None, + wait_time: timeout, + })); + } + } + } + }; + }; + + tracing::debug!("Waiting for tenant to enter active state..."); + tenant_shard + .wait_to_become_active(deadline.duration_since(Instant::now())) + .await + .map_err(GetActiveTimelineError::Tenant)?; + + let timeline = tenant_shard + .get_timeline(timeline_id, true) + .map_err(GetActiveTimelineError::Timeline)?; + set_tracing_field_shard_id(&timeline); + Ok(timeline) + } } #[derive(thiserror::Error, Debug)] @@ -292,7 +469,11 @@ impl From for PageStreamError { impl From for PageStreamError { fn from(value: GetActiveTimelineError) -> Self { match value { - GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) => Self::Shutdown, + GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled) + | GetActiveTimelineError::Tenant(GetActiveTenantError::WillNotBecomeActive( + TenantState::Stopping { .. }, + )) + | GetActiveTimelineError::Timeline(GetTimelineError::ShuttingDown) => Self::Shutdown, GetActiveTimelineError::Tenant(e) => Self::NotFound(format!("{e}").into()), GetActiveTimelineError::Timeline(e) => Self::NotFound(format!("{e}").into()), } @@ -324,64 +505,17 @@ impl PageServerHandler { tenant_manager: Arc, auth: Option>, connection_ctx: RequestContext, + cancel: CancellationToken, ) -> Self { PageServerHandler { - tenant_manager, auth, claims: None, connection_ctx, - shard_timelines: HashMap::new(), + timeline_handles: TimelineHandles::new(tenant_manager), + cancel, } } - /// Future that completes when we need to shut down the connection. - /// - /// We currently need to shut down when any of the following happens: - /// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled - /// 2. task_mgr requests shutdown of the connection - /// - /// NB on (1): the connection's lifecycle is not actually tied to any of the - /// `shard_timelines`s' lifecycles. But it's _necessary_ in the current - /// implementation to be responsive to timeline cancellation because - /// the connection holds their `GateGuards` open (sored in `shard_timelines`). - /// We currently do the easy thing and terminate the connection if any of the - /// shard_timelines gets cancelled. But really, we cuold spend more effort - /// and simply remove the cancelled timeline from the `shard_timelines`, thereby - /// dropping the guard. - /// - /// NB: keep in sync with [`Self::is_connection_cancelled`] - async fn await_connection_cancelled(&self) { - // A short wait before we expend the cycles to walk our timeline map. This avoids incurring - // that cost every time we check for cancellation. - tokio::time::sleep(Duration::from_millis(10)).await; - - // This function is never called concurrently with code that adds timelines to shard_timelines, - // which is enforced by the borrow checker (the future returned by this function carries the - // immutable &self). So it's fine to evaluate shard_timelines after the sleep, we don't risk - // missing any inserts to the map. - - let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len()); - use futures::future::Either; - cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher())); - cancellation_sources.extend( - self.shard_timelines - .values() - .map(|ht| Either::Right(ht.timeline.cancel.cancelled())), - ); - FuturesUnordered::from_iter(cancellation_sources) - .next() - .await; - } - - /// Checking variant of [`Self::await_connection_cancelled`]. - fn is_connection_cancelled(&self) -> bool { - task_mgr::is_shutdown_requested() - || self - .shard_timelines - .values() - .any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping()) - } - /// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in /// a cancellation token at the next scope up (such as a tenant cancellation token) to ensure we respect /// cancellation if there aren't any timelines in the cache. @@ -400,15 +534,21 @@ impl PageServerHandler { flush_r = pgb.flush() => { Ok(flush_r?) }, - _ = self.await_connection_cancelled() => { - Err(QueryError::Shutdown) - } _ = cancel.cancelled() => { Err(QueryError::Shutdown) } ) } + /// Pagestream sub-protocol handler. + /// + /// It is a simple request-response protocol inside a COPYBOTH session. + /// + /// # Coding Discipline + /// + /// Coding discipline within this function: all interaction with the `pgb` connection + /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`]. + /// This is so that we can shutdown page_service quickly. #[instrument(skip_all)] async fn handle_pagerequests( &mut self, @@ -423,27 +563,27 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - let tenant = self - .get_active_tenant_with_timeout(tenant_id, ShardSelector::First, ACTIVE_TENANT_TIMEOUT) - .await?; - // switch client to COPYBOTH pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; - self.flush_cancellable(pgb, &tenant.cancel).await?; + tokio::select! { + biased; + _ = self.cancel.cancelled() => { + return Err(QueryError::Shutdown) + } + res = pgb.flush() => { + res?; + } + } loop { + // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData) let msg = tokio::select! { biased; - - _ = self.await_connection_cancelled() => { - // We were requested to shut down. - info!("shutdown request received in page handler"); + _ = self.cancel.cancelled() => { return Err(QueryError::Shutdown) } - msg = pgb.read_message() => { msg } }; - let copy_data_bytes = match msg? { Some(FeMessage::CopyData(bytes)) => bytes, Some(FeMessage::Terminate) => break, @@ -458,13 +598,12 @@ impl PageServerHandler { trace!("query: {copy_data_bytes:?}"); fail::fail_point!("ps::handle-pagerequest-message"); + // parse request let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?; - // TODO: We could create a new per-request context here, with unique ID. - // Currently we use the same per-timeline context for all requests - - let (response, span) = match neon_fe_msg { + // invoke handler function + let (handler_result, span) = match neon_fe_msg { PagestreamFeMessage::Exists(req) => { fail::fail_point!("ps::handle-pagerequest-message::exists"); let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); @@ -518,31 +657,26 @@ impl PageServerHandler { } }; - match response { - Err(PageStreamError::Shutdown) => { - // If we fail to fulfil a request during shutdown, which may be _because_ of - // shutdown, then do not send the error to the client. Instead just drop the - // connection. - span.in_scope(|| info!("dropping connection due to shutdown")); - return Err(QueryError::Shutdown); - } - Err(PageStreamError::Reconnect(reason)) => { - span.in_scope(|| info!("handler requested reconnect: {reason}")); - return Err(QueryError::Reconnect); - } - Err(e) if self.is_connection_cancelled() => { - // This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean - // shutdown error, this may be buried inside a PageReconstructError::Other for example. - // - // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet, - // because wait_lsn etc will drop out - // is_stopping(): [`Timeline::flush_and_shutdown`] has entered - // is_canceled(): [`Timeline::shutdown`]` has entered - span.in_scope(|| info!("dropped error response during shutdown: {e:#}")); - return Err(QueryError::Shutdown); - } - r => { - let response_msg = r.unwrap_or_else(|e| { + // Map handler result to protocol behavior. + // Some handler errors cause exit from pagestream protocol. + // Other handler errors are sent back as an error message and we stay in pagestream protocol. + let response_msg = match handler_result { + Err(e) => match &e { + PageStreamError::Shutdown => { + // If we fail to fulfil a request during shutdown, which may be _because_ of + // shutdown, then do not send the error to the client. Instead just drop the + // connection. + span.in_scope(|| info!("dropping connection due to shutdown")); + return Err(QueryError::Shutdown); + } + PageStreamError::Reconnect(reason) => { + span.in_scope(|| info!("handler requested reconnect: {reason}")); + return Err(QueryError::Reconnect); + } + PageStreamError::Read(_) + | PageStreamError::LsnTimeout(_) + | PageStreamError::NotFound(_) + | PageStreamError::BadRequest(_) => { // print the all details to the log with {:#}, but for the client the // error message is enough. Do not log if shutting down, as the anyhow::Error // here includes cancellation which is not an error. @@ -553,10 +687,22 @@ impl PageServerHandler { PagestreamBeMessage::Error(PagestreamErrorResponse { message: e.to_string(), }) - }); + } + }, + Ok(response_msg) => response_msg, + }; - pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - self.flush_cancellable(pgb, &tenant.cancel).await?; + // marshal & transmit response message + pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; + tokio::select! { + biased; + _ = self.cancel.cancelled() => { + // We were requested to shut down. + info!("shutdown request received in page handler"); + return Err(QueryError::Shutdown) + } + res = pgb.flush() => { + res?; } } } @@ -644,7 +790,7 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id, %lsn))] async fn handle_make_lsn_lease( - &self, + &mut self, pgb: &mut PostgresBackend, tenant_shard_id: TenantShardId, timeline_id: TimelineId, @@ -654,10 +800,16 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { - let shard_selector = ShardSelector::Known(tenant_shard_id.to_index()); let timeline = self - .get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector) + .timeline_handles + .get( + tenant_shard_id.tenant_id, + timeline_id, + ShardSelector::Known(tenant_shard_id.to_index()), + ) .await?; + set_tracing_field_shard_id(&timeline); + let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?; let valid_until = lease .valid_until @@ -683,14 +835,17 @@ impl PageServerHandler { req: &PagestreamExistsRequest, ctx: &RequestContext, ) -> Result { - let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; + let timeline = self + .timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .await?; let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetRelExists, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -715,7 +870,10 @@ impl PageServerHandler { req: &PagestreamNblocksRequest, ctx: &RequestContext, ) -> Result { - let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; + let timeline = self + .timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .await?; let _timer = timeline .query_metrics @@ -723,7 +881,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -748,7 +906,10 @@ impl PageServerHandler { req: &PagestreamDbSizeRequest, ctx: &RequestContext, ) -> Result { - let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; + let timeline = self + .timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .await?; let _timer = timeline .query_metrics @@ -756,7 +917,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -774,122 +935,6 @@ impl PageServerHandler { })) } - /// For most getpage requests, we will already have a Timeline to serve the request: this function - /// looks up such a Timeline synchronously and without touching any global state. - fn get_cached_timeline_for_page( - &mut self, - req: &PagestreamGetPageRequest, - ) -> Result<&Arc, Key> { - let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() { - // Fastest path: single sharded case - if first_idx.shard_count.count() == 1 { - return Ok(&first_timeline.timeline); - } - - let key = rel_block_to_key(req.rel, req.blkno); - let shard_num = first_timeline - .timeline - .get_shard_identity() - .get_shard_number(&key); - - // Fast path: matched the first timeline in our local handler map. This case is common if - // only one shard per tenant is attached to this pageserver. - if first_timeline.timeline.get_shard_identity().number == shard_num { - return Ok(&first_timeline.timeline); - } - - let shard_index = ShardIndex { - shard_number: shard_num, - shard_count: first_timeline.timeline.get_shard_identity().count, - }; - - // Fast-ish path: timeline is in the connection handler's local cache - if let Some(found) = self.shard_timelines.get(&shard_index) { - return Ok(&found.timeline); - } - - key - } else { - rel_block_to_key(req.rel, req.blkno) - }; - - Err(key) - } - - /// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable - /// use in future requests without having to traverse [`crate::tenant::mgr::TenantManager`] - /// again. - /// - /// Note that all the Timelines in this cache are for the same timeline_id: they're differ - /// in which shard they belong to. When we serve a getpage@lsn request, we choose a shard - /// based on key. - /// - /// The typical size of this cache is 1, as we generally create shards to distribute work - /// across pageservers, so don't tend to have multiple shards for the same tenant on the - /// same pageserver. - fn cache_timeline( - &mut self, - timeline: Arc, - ) -> Result<&Arc, GetActiveTimelineError> { - let gate_guard = timeline - .gate - .enter() - .map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?; - - let shard_index = timeline.tenant_shard_id.to_index(); - let entry = self - .shard_timelines - .entry(shard_index) - .or_insert(HandlerTimeline { - timeline, - _guard: gate_guard, - }); - - Ok(&entry.timeline) - } - - /// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with - /// a Timeline to serve requests for this key, if such a Timeline is present on this pageserver. If no such - /// Timeline is found, then we will return an error (this indicates that the client is talking to the wrong node). - async fn load_timeline_for_page( - &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, - key: Key, - ) -> anyhow::Result<&Arc, GetActiveTimelineError> { - // Slow path: we must call out to the TenantManager to find the timeline for this Key - let timeline = self - .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key)) - .await?; - - self.cache_timeline(timeline) - } - - async fn get_timeline_shard_zero( - &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, - ) -> anyhow::Result<&Arc, GetActiveTimelineError> { - // This is a borrow-checker workaround: we can't return from inside of the `if let Some` because - // that would be an immutable-borrow-self return, whereas later in the function we will use a mutable - // ref to salf. So instead, we first build a bool, and then return while not borrowing self. - let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() { - idx.shard_number == ShardNumber(0) - } else { - false - }; - - if have_cached { - let entry = self.shard_timelines.iter().next().unwrap(); - Ok(&entry.1.timeline) - } else { - let timeline = self - .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) - .await?; - Ok(self.cache_timeline(timeline)?) - } - } - #[instrument(skip_all, fields(shard_id))] async fn handle_get_page_at_lsn_request( &mut self, @@ -898,33 +943,30 @@ impl PageServerHandler { req: &PagestreamGetPageRequest, ctx: &RequestContext, ) -> Result { - let timeline = match self.get_cached_timeline_for_page(req) { - Ok(tl) => { - set_tracing_field_shard_id(tl); - tl - } - Err(key) => { - match self - .load_timeline_for_page(tenant_id, timeline_id, key) - .await - { - Ok(t) => t, - Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { - // We already know this tenant exists in general, because we resolved it at - // start of connection. Getting a NotFound here indicates that the shard containing - // the requested page is not present on this node: the client's knowledge of shard->pageserver - // mapping is out of date. - // - // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via - // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration - // and talk to a different pageserver. - return Err(PageStreamError::Reconnect( - "getpage@lsn request routed to wrong shard".into(), - )); - } - Err(e) => return Err(e.into()), - } + let timeline = match self + .timeline_handles + .get( + tenant_id, + timeline_id, + ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)), + ) + .await + { + Ok(tl) => tl, + Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + // + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + return Err(PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into(), + )); } + Err(e) => return Err(e.into()), }; let _timer = timeline @@ -933,7 +975,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -958,7 +1000,10 @@ impl PageServerHandler { req: &PagestreamGetSlruSegmentRequest, ctx: &RequestContext, ) -> Result { - let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?; + let timeline = self + .timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) + .await?; let _timer = timeline .query_metrics @@ -966,7 +1011,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - timeline, + &timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -987,6 +1032,15 @@ impl PageServerHandler { /// Full basebackups should only be used for debugging purposes. /// Originally, it was introduced to enable breaking storage format changes, /// but that is not applicable anymore. + /// + /// # Coding Discipline + /// + /// Coding discipline within this function: all interaction with the `pgb` connection + /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`]. + /// This is so that we can shutdown page_service quickly. + /// + /// TODO: wrap the pgb that we pass to the basebackup handler so that it's sensitive + /// to connection cancellation. #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(shard_id, ?lsn, ?prev_lsn, %full_backup))] async fn handle_basebackup_request( @@ -1012,10 +1066,11 @@ impl PageServerHandler { let started = std::time::Instant::now(); - // check that the timeline exists let timeline = self - .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) + .timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { // Backup was requested at a particular LSN. Wait for it to arrive. @@ -1037,7 +1092,7 @@ impl PageServerHandler { // switch client to COPYOUT pgb.write_message_noflush(&BeMessage::CopyOutResponse) .map_err(QueryError::Disconnected)?; - self.flush_cancellable(pgb, &timeline.cancel).await?; + self.flush_cancellable(pgb, &self.cancel).await?; // Send a tarball of the latest layer on the timeline. Compress if not // fullbackup. TODO Compress in that case too (tests need to be updated) @@ -1128,77 +1183,6 @@ impl PageServerHandler { .expect("claims presence already checked"); check_permission(claims, tenant_id).map_err(|e| QueryError::Unauthorized(e.0)) } - - /// Shorthand for getting a reference to a Timeline of an Active tenant. - async fn get_active_tenant_timeline( - &self, - tenant_id: TenantId, - timeline_id: TimelineId, - selector: ShardSelector, - ) -> Result, GetActiveTimelineError> { - let tenant = self - .get_active_tenant_with_timeout(tenant_id, selector, ACTIVE_TENANT_TIMEOUT) - .await - .map_err(GetActiveTimelineError::Tenant)?; - let timeline = tenant.get_timeline(timeline_id, true)?; - set_tracing_field_shard_id(&timeline); - Ok(timeline) - } - - /// Get a shard's [`Tenant`] in its active state, if present. If we don't find the shard and some - /// slots for this tenant are `InProgress` then we will wait. - /// If we find the [`Tenant`] and it's not yet in state [`TenantState::Active`], we will wait. - /// - /// `timeout` is used as a total timeout for the whole wait operation. - async fn get_active_tenant_with_timeout( - &self, - tenant_id: TenantId, - shard_selector: ShardSelector, - timeout: Duration, - ) -> Result, GetActiveTenantError> { - let wait_start = Instant::now(); - let deadline = wait_start + timeout; - - // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is - // for handling the rare case that the slot we're accessing is InProgress. - let tenant_shard = loop { - let resolved = self - .tenant_manager - .resolve_attached_shard(&tenant_id, shard_selector); - match resolved { - ShardResolveResult::Found(tenant_shard) => break tenant_shard, - ShardResolveResult::NotFound => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( - tenant_id, - ))); - } - ShardResolveResult::InProgress(barrier) => { - // We can't authoritatively answer right now: wait for InProgress state - // to end, then try again - tokio::select! { - _ = self.await_connection_cancelled() => { - return Err(GetActiveTenantError::Cancelled) - }, - _ = barrier.wait() => { - // The barrier completed: proceed around the loop to try looking up again - }, - _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => { - return Err(GetActiveTenantError::WaitForActiveTimeout { - latest_state: None, - wait_time: timeout, - }); - } - } - } - }; - }; - - tracing::debug!("Waiting for tenant to enter active state..."); - tenant_shard - .wait_to_become_active(deadline.duration_since(Instant::now())) - .await?; - Ok(tenant_shard) - } } #[async_trait::async_trait] @@ -1505,7 +1489,7 @@ impl From for QueryError { } #[derive(Debug, thiserror::Error)] -enum GetActiveTimelineError { +pub(crate) enum GetActiveTimelineError { #[error(transparent)] Tenant(GetActiveTenantError), #[error(transparent)] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e5ac6725ad4d..76e127e12220 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -386,6 +386,8 @@ impl WalRedoManager { #[derive(Debug, thiserror::Error, PartialEq, Eq)] pub enum GetTimelineError { + #[error("Timeline is shutting down")] + ShuttingDown, #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")] NotActive { tenant_id: TenantShardId, diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 75c8682c97a4..9bf865c7e1d2 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -116,8 +116,6 @@ pub(crate) enum ShardSelector { /// Only return the 0th shard, if it is present. If a non-0th shard is present, /// ignore it. Zero, - /// Pick the first shard we find for the TenantId - First, /// Pick the shard that holds this key Page(Key), /// The shard ID is known: pick the given shard @@ -2090,7 +2088,6 @@ impl TenantManager { }; match selector { - ShardSelector::First => return ShardResolveResult::Found(tenant.clone()), ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => { return ShardResolveResult::Found(tenant.clone()) } @@ -2172,6 +2169,9 @@ pub(crate) enum GetActiveTenantError { /// never happen. #[error("Tenant is broken: {0}")] Broken(String), + + #[error("reconnect to switch tenant id")] + SwitchedTenant, } #[derive(Debug, thiserror::Error)] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 862ca42188a9..c55faa58bd99 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3,6 +3,7 @@ pub(crate) mod compaction; pub mod delete; pub(crate) mod detach_ancestor; mod eviction_task; +pub(crate) mod handle; mod init; pub mod layer_manager; pub(crate) mod logical_size; @@ -17,6 +18,7 @@ use camino::Utf8Path; use chrono::{DateTime, Utc}; use enumset::EnumSet; use fail::fail_point; +use handle::ShardTimelineId; use once_cell::sync::Lazy; use pageserver_api::{ key::{ @@ -443,6 +445,8 @@ pub struct Timeline { pub(crate) extra_test_dense_keyspace: ArcSwap, pub(crate) l0_flush_global_state: L0FlushGlobalState, + + pub(crate) handles: handle::PerTimelineState, } pub struct WalReceiverInfo { @@ -1929,6 +1933,9 @@ impl Timeline { tracing::debug!("Cancelling CancellationToken"); self.cancel.cancel(); + // Ensure Prevent new page service requests from starting. + self.handles.shutdown(); + // Transition the remote_client into a state where it's only useful for timeline deletion. // (The deletion use case is why we can't just hook up remote_client to Self::cancel).) self.remote_client.stop(); @@ -2454,6 +2461,8 @@ impl Timeline { extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())), l0_flush_global_state: resources.l0_flush_global_state, + + handles: Default::default(), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -3723,6 +3732,17 @@ impl Timeline { &self.shard_identity } + #[inline(always)] + pub(crate) fn shard_timeline_id(&self) -> ShardTimelineId { + ShardTimelineId { + shard_index: ShardIndex { + shard_number: self.shard_identity.number, + shard_count: self.shard_identity.count, + }, + timeline_id: self.timeline_id, + } + } + /// /// Get a handle to the latest layer for appending. /// diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs new file mode 100644 index 000000000000..e82559b8b3af --- /dev/null +++ b/pageserver/src/tenant/timeline/handle.rs @@ -0,0 +1,967 @@ +//! An efficient way to keep the timeline gate open without preventing +//! timeline shutdown for longer than a single call to a timeline method. +//! +//! # Motivation +//! +//! On a single page service connection, we're typically serving a single TenantTimelineId. +//! +//! Without sharding, there is a single Timeline object to which we dispatch +//! all requests. For example, a getpage request gets dispatched to the +//! Timeline::get method of the Timeline object that represents the +//! (tenant,timeline) of that connection. +//! +//! With sharding, for each request that comes in on the connection, +//! we first have to perform shard routing based on the requested key (=~ page number). +//! The result of shard routing is a Timeline object. +//! We then dispatch the request to that Timeline object. +//! +//! Regardless of whether the tenant is sharded or not, we want to ensure that +//! we hold the Timeline gate open while we're invoking the method on the +//! Timeline object. +//! +//! However, we want to avoid the overhead of entering the gate for every +//! method invocation. +//! +//! Further, for shard routing, we want to avoid calling the tenant manager to +//! resolve the shard for every request. Instead, we want to cache the +//! routing result so we can bypass the tenant manager for all subsequent requests +//! that get routed to that shard. +//! +//! Regardless of how we accomplish the above, it should not +//! prevent the Timeline from shutting down promptly. +//! +//! # Design +//! +//! There are three user-facing data structures: +//! - `PerTimelineState`: a struct embedded into each Timeline struct. Lifetime == Timeline lifetime. +//! - `Cache`: a struct private to each connection handler; Lifetime == connection lifetime. +//! - `Handle`: a smart pointer that holds the Timeline gate open and derefs to `&Timeline`. +//! Lifetime: for a single request dispatch on the Timeline (i.e., one getpage request) +//! +//! The `Handle` is just a wrapper around an `Arc`. +//! +//! There is one long-lived `Arc`, which is stored in the `PerTimelineState`. +//! The `Cache` stores a `Weak` for each cached Timeline. +//! +//! To dispatch a request, the page service connection calls `Cache::get`. +//! +//! A cache miss means we consult the tenant manager for shard routing, +//! resulting in an `Arc`. We enter its gate _once_ and construct an +//! `Arc`. We store a `Weak` in the cache +//! and the `Arc` in the `PerTimelineState`. +//! +//! For subsequent requests, `Cache::get` will perform a "fast path" shard routing +//! and find the `Weak` in the cache. +//! We upgrade the `Weak` to an `Arc` and wrap it in the user-facing `Handle` type. +//! +//! The request handler dispatches the request to the right `>::$request_method`. +//! It then drops the `Handle`, which drops the `Arc`. +//! +//! # Memory Management / How The Reference Cycle Is Broken +//! +//! The attentive reader may have noticed the strong reference cycle +//! from `Arc` to `PerTimelineState` to `Arc`. +//! +//! This cycle is intentional: while it exists, the `Cache` can upgrade its +//! `Weak` to an `Arc` in a single atomic operation. +//! +//! The cycle is broken by either +//! - `PerTimelineState::shutdown` or +//! - dropping the `Cache`. +//! +//! Concurrently existing `Handle`s will extend the existence of the cycle. +//! However, since `Handle`s are short-lived and new `Handle`s are not +//! handed out after either `PerTimelineState::shutdown` or `Cache` drop, +//! that extension of the cycle is bounded. +//! +//! # Fast Path for Shard Routing +//! +//! The `Cache` has a fast path for shard routing to avoid calling into +//! the tenant manager for every request. +//! +//! The `Cache` maintains a hash map of `ShardTimelineId` to `Weak`. +//! +//! The current implementation uses the first entry in the hash map +//! to determine the `ShardParameters` and derive the correct +//! `ShardIndex` for the requested key. +//! +//! It then looks up the hash map for that `ShardTimelineId := {ShardIndex,TimelineId}`. +//! +//! If the lookup is successful and the `Weak` can be upgraded, +//! it's a hit. +//! +//! ## Cache invalidation +//! +//! The insight is that cache invalidation is sufficient and most efficiently done lazily. +//! The only reasons why an entry in the cache can become stale are: +//! 1. The `PerTimelineState` / Timeline is shutting down e.g. because the shard is +//! being detached, timeline or shard deleted, or pageserver is shutting down. +//! 2. We're doing a shard split and new traffic should be routed to the child shards. +//! +//! Regarding (1), we will eventually fail to upgrade the `Weak` once the +//! timeline has shut down, and when that happens, we remove the entry from the cache. +//! +//! Regarding (2), the insight is that it is toally fine to keep dispatching requests +//! to the parent shard during a shard split. Eventually, the shard split task will +//! shut down the parent => case (1). + +use std::collections::hash_map; +use std::collections::HashMap; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::Weak; + +use pageserver_api::shard::ShardIdentity; +use tracing::instrument; +use tracing::trace; +use utils::id::TimelineId; +use utils::shard::ShardIndex; +use utils::shard::ShardNumber; + +use crate::tenant::mgr::ShardSelector; + +/// The requirement for Debug is so that #[derive(Debug)] works in some places. +pub(crate) trait Types: Sized + std::fmt::Debug { + type TenantManagerError: Sized + std::fmt::Debug; + type TenantManager: TenantManager + Sized; + type Timeline: ArcTimeline + Sized; +} + +/// Uniquely identifies a [`Cache`] instance over the lifetime of the process. +/// Required so [`Cache::drop`] can take out the handles from the [`PerTimelineState`]. +/// Alternative to this would be to allocate [`Cache`] in a `Box` and identify it by the pointer. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +struct CacheId(u64); + +impl CacheId { + fn next() -> Self { + static NEXT_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); + let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if id == 0 { + panic!("CacheId::new() returned 0, overflow"); + } + Self(id) + } +} + +/// See module-level comment. +pub(crate) struct Cache { + id: CacheId, + map: Map, +} + +type Map = HashMap>>; + +impl Default for Cache { + fn default() -> Self { + Self { + id: CacheId::next(), + map: Default::default(), + } + } +} + +#[derive(PartialEq, Eq, Debug, Hash, Clone, Copy)] +pub(crate) struct ShardTimelineId { + pub(crate) shard_index: ShardIndex, + pub(crate) timeline_id: TimelineId, +} + +/// See module-level comment. +pub(crate) struct Handle(Arc>); +struct HandleInner { + shut_down: AtomicBool, + timeline: T::Timeline, + // The timeline's gate held open. + _gate_guard: utils::sync::gate::GateGuard, +} + +/// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`. +/// +/// See module-level comment for details. +pub struct PerTimelineState { + // None = shutting down + handles: Mutex>>>>, +} + +impl Default for PerTimelineState { + fn default() -> Self { + Self { + handles: Mutex::new(Some(Default::default())), + } + } +} + +/// Abstract view of [`crate::tenant::mgr`], for testability. +pub(crate) trait TenantManager { + /// Invoked by [`Cache::get`] to resolve a [`ShardTimelineId`] to a [`Types::Timeline`]. + /// Errors are returned as [`GetError::TenantManager`]. + async fn resolve( + &self, + timeline_id: TimelineId, + shard_selector: ShardSelector, + ) -> Result; +} + +/// Abstract view of an [`Arc`], for testability. +pub(crate) trait ArcTimeline: Clone { + fn gate(&self) -> &utils::sync::gate::Gate; + fn shard_timeline_id(&self) -> ShardTimelineId; + fn get_shard_identity(&self) -> &ShardIdentity; + fn per_timeline_state(&self) -> &PerTimelineState; +} + +/// Errors returned by [`Cache::get`]. +#[derive(Debug)] +pub(crate) enum GetError { + TenantManager(T::TenantManagerError), + TimelineGateClosed, + PerTimelineStateShutDown, +} + +/// Internal type used in [`Cache::get`]. +enum RoutingResult { + FastPath(Handle), + SlowPath(ShardTimelineId), + NeedConsultTenantManager, +} + +impl Cache { + /// See module-level comment for details. + /// + /// Does NOT check for the shutdown state of [`Types::Timeline`]. + /// Instead, the methods of [`Types::Timeline`] that are invoked through + /// the [`Handle`] are responsible for checking these conditions + /// and if so, return an error that causes the page service to + /// close the connection. + #[instrument(level = "trace", skip_all)] + pub(crate) async fn get( + &mut self, + timeline_id: TimelineId, + shard_selector: ShardSelector, + tenant_manager: &T::TenantManager, + ) -> Result, GetError> { + // terminates because each iteration removes an element from the map + loop { + let handle = self + .get_impl(timeline_id, shard_selector, tenant_manager) + .await?; + if handle.0.shut_down.load(Ordering::Relaxed) { + let removed = self + .map + .remove(&handle.0.timeline.shard_timeline_id()) + .expect("invariant of get_impl is that the returned handle is in the map"); + assert!( + Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)), + "shard_timeline_id() incorrect?" + ); + } else { + return Ok(handle); + } + } + } + + #[instrument(level = "trace", skip_all)] + async fn get_impl( + &mut self, + timeline_id: TimelineId, + shard_selector: ShardSelector, + tenant_manager: &T::TenantManager, + ) -> Result, GetError> { + let miss: ShardSelector = { + let routing_state = self.shard_routing(timeline_id, shard_selector); + match routing_state { + RoutingResult::FastPath(handle) => return Ok(handle), + RoutingResult::SlowPath(key) => match self.map.get(&key) { + Some(cached) => match cached.upgrade() { + Some(upgraded) => return Ok(Handle(upgraded)), + None => { + trace!("handle cache stale"); + self.map.remove(&key).unwrap(); + ShardSelector::Known(key.shard_index) + } + }, + None => ShardSelector::Known(key.shard_index), + }, + RoutingResult::NeedConsultTenantManager => shard_selector, + } + }; + self.get_miss(timeline_id, miss, tenant_manager).await + } + + #[inline(always)] + fn shard_routing( + &mut self, + timeline_id: TimelineId, + shard_selector: ShardSelector, + ) -> RoutingResult { + loop { + // terminates because when every iteration we remove an element from the map + let Some((first_key, first_handle)) = self.map.iter().next() else { + return RoutingResult::NeedConsultTenantManager; + }; + let Some(first_handle) = first_handle.upgrade() else { + // TODO: dedup with get() + trace!("handle cache stale"); + let first_key_owned = *first_key; + self.map.remove(&first_key_owned).unwrap(); + continue; + }; + + let first_handle_shard_identity = first_handle.timeline.get_shard_identity(); + let make_shard_index = |shard_num: ShardNumber| ShardIndex { + shard_number: shard_num, + shard_count: first_handle_shard_identity.count, + }; + + let need_idx = match shard_selector { + ShardSelector::Page(key) => { + make_shard_index(first_handle_shard_identity.get_shard_number(&key)) + } + ShardSelector::Zero => make_shard_index(ShardNumber(0)), + ShardSelector::Known(shard_idx) => shard_idx, + }; + let need_shard_timeline_id = ShardTimelineId { + shard_index: need_idx, + timeline_id, + }; + let first_handle_shard_timeline_id = ShardTimelineId { + shard_index: first_handle_shard_identity.shard_index(), + timeline_id: first_handle.timeline.shard_timeline_id().timeline_id, + }; + + if need_shard_timeline_id == first_handle_shard_timeline_id { + return RoutingResult::FastPath(Handle(first_handle)); + } else { + return RoutingResult::SlowPath(need_shard_timeline_id); + } + } + } + + #[instrument(level = "trace", skip_all)] + #[inline(always)] + async fn get_miss( + &mut self, + timeline_id: TimelineId, + shard_selector: ShardSelector, + tenant_manager: &T::TenantManager, + ) -> Result, GetError> { + match tenant_manager.resolve(timeline_id, shard_selector).await { + Ok(timeline) => { + let key = timeline.shard_timeline_id(); + match &shard_selector { + ShardSelector::Zero => assert_eq!(key.shard_index.shard_number, ShardNumber(0)), + ShardSelector::Page(_) => (), // gotta trust tenant_manager + ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index), + } + + let gate_guard = match timeline.gate().enter() { + Ok(guard) => guard, + Err(_) => { + return Err(GetError::TimelineGateClosed); + } + }; + trace!("creating new HandleInner"); + let handle = Arc::new( + // TODO: global metric that keeps track of the number of live HandlerTimeline instances + // so we can identify reference cycle bugs. + HandleInner { + shut_down: AtomicBool::new(false), + _gate_guard: gate_guard, + timeline: timeline.clone(), + }, + ); + let handle = { + let mut lock_guard = timeline + .per_timeline_state() + .handles + .lock() + .expect("mutex poisoned"); + match &mut *lock_guard { + Some(per_timeline_state) => { + let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle)); + assert!(replaced.is_none(), "some earlier code left a stale handle"); + match self.map.entry(key) { + hash_map::Entry::Occupied(_o) => { + // This cannot not happen because + // 1. we're the _miss_ handle, i.e., `self.map` didn't contain an entry and + // 2. we were holding &mut self during .resolve().await above, so, no other thread can have inserted a handle + // while we were waiting for the tenant manager. + unreachable!() + } + hash_map::Entry::Vacant(v) => { + v.insert(Arc::downgrade(&handle)); + handle + } + } + } + None => { + return Err(GetError::PerTimelineStateShutDown); + } + } + }; + Ok(Handle(handle)) + } + Err(e) => Err(GetError::TenantManager(e)), + } + } +} + +impl PerTimelineState { + /// After this method returns, [`Cache::get`] will never again return a [`Handle`] + /// to the [`Types::Timeline`] that embeds this per-timeline state. + /// Even if [`TenantManager::resolve`] would still resolve to it. + /// + /// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive. + /// That's ok because they're short-lived. See module-level comment for details. + #[instrument(level = "trace", skip_all)] + pub(super) fn shutdown(&self) { + let handles = self + .handles + .lock() + .expect("mutex poisoned") + // NB: this .take() sets locked to None. + // That's what makes future `Cache::get` misses fail. + // Cache hits are taken care of below. + .take(); + let Some(handles) = handles else { + trace!("already shut down"); + return; + }; + for handle in handles.values() { + // Make hits fail. + handle.shut_down.store(true, Ordering::Relaxed); + } + drop(handles); + } +} + +impl std::ops::Deref for Handle { + type Target = T::Timeline; + fn deref(&self) -> &Self::Target { + &self.0.timeline + } +} + +#[cfg(test)] +impl Drop for HandleInner { + fn drop(&mut self) { + trace!("HandleInner dropped"); + } +} + +// When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle. +impl Drop for Cache { + fn drop(&mut self) { + for (_, weak) in self.map.drain() { + if let Some(strong) = weak.upgrade() { + // handle is still being kept alive in PerTimelineState + let timeline = strong.timeline.per_timeline_state(); + let mut handles = timeline.handles.lock().expect("mutex poisoned"); + if let Some(handles) = &mut *handles { + let Some(removed) = handles.remove(&self.id) else { + // There could have been a shutdown inbetween us upgrading the weak and locking the mutex. + continue; + }; + assert!(Arc::ptr_eq(&removed, &strong)); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use pageserver_api::{ + key::{rel_block_to_key, Key, DBDIR_KEY}, + models::ShardParameters, + reltag::RelTag, + shard::ShardStripeSize, + }; + use utils::shard::ShardCount; + + use super::*; + + const FOREVER: std::time::Duration = std::time::Duration::from_secs(u64::MAX); + + #[derive(Debug)] + struct TestTypes; + impl Types for TestTypes { + type TenantManagerError = anyhow::Error; + type TenantManager = StubManager; + type Timeline = Arc; + } + + struct StubManager { + shards: Vec>, + } + + struct StubTimeline { + gate: utils::sync::gate::Gate, + id: TimelineId, + shard: ShardIdentity, + per_timeline_state: PerTimelineState, + myself: Weak, + } + + impl StubTimeline { + fn getpage(&self) { + // do nothing + } + } + + impl ArcTimeline for Arc { + fn gate(&self) -> &utils::sync::gate::Gate { + &self.gate + } + + fn shard_timeline_id(&self) -> ShardTimelineId { + ShardTimelineId { + shard_index: self.shard.shard_index(), + timeline_id: self.id, + } + } + + fn get_shard_identity(&self) -> &ShardIdentity { + &self.shard + } + + fn per_timeline_state(&self) -> &PerTimelineState { + &self.per_timeline_state + } + } + + impl TenantManager for StubManager { + async fn resolve( + &self, + timeline_id: TimelineId, + shard_selector: ShardSelector, + ) -> anyhow::Result> { + for timeline in &self.shards { + if timeline.id == timeline_id { + match &shard_selector { + ShardSelector::Zero if timeline.shard.is_shard_zero() => { + return Ok(Arc::clone(timeline)); + } + ShardSelector::Zero => continue, + ShardSelector::Page(key) if timeline.shard.is_key_local(key) => { + return Ok(Arc::clone(timeline)); + } + ShardSelector::Page(_) => continue, + ShardSelector::Known(idx) if idx == &timeline.shard.shard_index() => { + return Ok(Arc::clone(timeline)); + } + ShardSelector::Known(_) => continue, + } + } + } + anyhow::bail!("not found") + } + } + + #[tokio::test(start_paused = true)] + async fn test_timeline_shutdown() { + crate::tenant::harness::setup_logging(); + + let timeline_id = TimelineId::generate(); + let shard0 = Arc::new_cyclic(|myself| StubTimeline { + gate: Default::default(), + id: timeline_id, + shard: ShardIdentity::unsharded(), + per_timeline_state: PerTimelineState::default(), + myself: myself.clone(), + }); + let mgr = StubManager { + shards: vec![shard0.clone()], + }; + let key = DBDIR_KEY; + + let mut cache = Cache::::default(); + + // + // fill the cache + // + assert_eq!( + (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), + (2, 1), + "strong: shard0, mgr; weak: myself" + ); + + let handle: Handle<_> = cache + .get(timeline_id, ShardSelector::Page(key), &mgr) + .await + .expect("we have the timeline"); + let handle_inner_weak = Arc::downgrade(&handle.0); + assert!(Weak::ptr_eq(&handle.myself, &shard0.myself)); + assert_eq!( + ( + Weak::strong_count(&handle_inner_weak), + Weak::weak_count(&handle_inner_weak) + ), + (2, 2), + "strong: handle, per_timeline_state, weak: handle_inner_weak, cache" + ); + assert_eq!(cache.map.len(), 1); + + assert_eq!( + (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), + (3, 1), + "strong: handleinner(per_timeline_state), shard0, mgr; weak: myself" + ); + drop(handle); + assert_eq!( + (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), + (3, 1), + "strong: handleinner(per_timeline_state), shard0, mgr; weak: myself" + ); + + // + // demonstrate that Handle holds up gate closure + // but shutdown prevents new handles from being handed out + // + + tokio::select! { + _ = shard0.gate.close() => { + panic!("cache and per-timeline handler state keep cache open"); + } + _ = tokio::time::sleep(FOREVER) => { + // NB: first poll of close() makes it enter closing state + } + } + + let handle = cache + .get(timeline_id, ShardSelector::Page(key), &mgr) + .await + .expect("we have the timeline"); + assert!(Weak::ptr_eq(&handle.myself, &shard0.myself)); + + // SHUTDOWN + shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown + + assert_eq!( + 1, + Weak::strong_count(&handle_inner_weak), + "through local var handle" + ); + assert_eq!( + cache.map.len(), + 1, + "this is an implementation detail but worth pointing out: we can't clear the cache from shutdown(), it's cleared on first access after" + ); + assert_eq!( + (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), + (3, 1), + "strong: handleinner(via handle), shard0, mgr; weak: myself" + ); + + // this handle is perfectly usable + handle.getpage(); + + cache + .get(timeline_id, ShardSelector::Page(key), &mgr) + .await + .err() + .expect("documented behavior: can't get new handle after shutdown, even if there is an alive Handle"); + assert_eq!( + cache.map.len(), + 0, + "first access after shutdown cleans up the Weak's from the cache" + ); + + tokio::select! { + _ = shard0.gate.close() => { + panic!("handle is keeping gate open"); + } + _ = tokio::time::sleep(FOREVER) => { } + } + + drop(handle); + assert_eq!( + 0, + Weak::strong_count(&handle_inner_weak), + "the HandleInner destructor already ran" + ); + assert_eq!( + (Arc::strong_count(&shard0), Arc::weak_count(&shard0)), + (2, 1), + "strong: shard0, mgr; weak: myself" + ); + + // closing gate succeeds after dropping handle + tokio::select! { + _ = shard0.gate.close() => { } + _ = tokio::time::sleep(FOREVER) => { + panic!("handle is dropped, no other gate holders exist") + } + } + + // map gets cleaned on next lookup + cache + .get(timeline_id, ShardSelector::Page(key), &mgr) + .await + .err() + .expect("documented behavior: can't get new handle after shutdown"); + assert_eq!(cache.map.len(), 0); + + // ensure all refs to shard0 are gone and we're not leaking anything + let myself = Weak::clone(&shard0.myself); + drop(shard0); + drop(mgr); + assert_eq!(Weak::strong_count(&myself), 0); + } + + #[tokio::test] + async fn test_multiple_timelines_and_deletion() { + crate::tenant::harness::setup_logging(); + + let timeline_a = TimelineId::generate(); + let timeline_b = TimelineId::generate(); + assert_ne!(timeline_a, timeline_b); + let timeline_a = Arc::new_cyclic(|myself| StubTimeline { + gate: Default::default(), + id: timeline_a, + shard: ShardIdentity::unsharded(), + per_timeline_state: PerTimelineState::default(), + myself: myself.clone(), + }); + let timeline_b = Arc::new_cyclic(|myself| StubTimeline { + gate: Default::default(), + id: timeline_b, + shard: ShardIdentity::unsharded(), + per_timeline_state: PerTimelineState::default(), + myself: myself.clone(), + }); + let mut mgr = StubManager { + shards: vec![timeline_a.clone(), timeline_b.clone()], + }; + let key = DBDIR_KEY; + + let mut cache = Cache::::default(); + + cache + .get(timeline_a.id, ShardSelector::Page(key), &mgr) + .await + .expect("we have it"); + cache + .get(timeline_b.id, ShardSelector::Page(key), &mgr) + .await + .expect("we have it"); + assert_eq!(cache.map.len(), 2); + + // delete timeline A + timeline_a.per_timeline_state.shutdown(); + mgr.shards.retain(|t| t.id != timeline_a.id); + assert!( + mgr.resolve(timeline_a.id, ShardSelector::Page(key)) + .await + .is_err(), + "broken StubManager implementation" + ); + + assert_eq!( + cache.map.len(), + 2, + "cache still has a Weak handle to Timeline A" + ); + cache + .get(timeline_a.id, ShardSelector::Page(key), &mgr) + .await + .err() + .expect("documented behavior: can't get new handle after shutdown"); + assert_eq!(cache.map.len(), 1, "next access cleans up the cache"); + + cache + .get(timeline_b.id, ShardSelector::Page(key), &mgr) + .await + .expect("we still have it"); + } + + fn make_relation_key_for_shard(shard: ShardNumber, params: &ShardParameters) -> Key { + rel_block_to_key( + RelTag { + spcnode: 1663, + dbnode: 208101, + relnode: 2620, + forknum: 0, + }, + shard.0 as u32 * params.stripe_size.0, + ) + } + + #[tokio::test(start_paused = true)] + async fn test_shard_split() { + crate::tenant::harness::setup_logging(); + let timeline_id = TimelineId::generate(); + let parent = Arc::new_cyclic(|myself| StubTimeline { + gate: Default::default(), + id: timeline_id, + shard: ShardIdentity::unsharded(), + per_timeline_state: PerTimelineState::default(), + myself: myself.clone(), + }); + let child_params = ShardParameters { + count: ShardCount(2), + stripe_size: ShardStripeSize::default(), + }; + let child0 = Arc::new_cyclic(|myself| StubTimeline { + gate: Default::default(), + id: timeline_id, + shard: ShardIdentity::from_params(ShardNumber(0), &child_params), + per_timeline_state: PerTimelineState::default(), + myself: myself.clone(), + }); + let child1 = Arc::new_cyclic(|myself| StubTimeline { + gate: Default::default(), + id: timeline_id, + shard: ShardIdentity::from_params(ShardNumber(1), &child_params), + per_timeline_state: PerTimelineState::default(), + myself: myself.clone(), + }); + let child_shards_by_shard_number = [child0.clone(), child1.clone()]; + + let mut cache = Cache::::default(); + + // fill the cache with the parent + for i in 0..2 { + let handle = cache + .get( + timeline_id, + ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)), + &StubManager { + shards: vec![parent.clone()], + }, + ) + .await + .expect("we have it"); + assert!( + Weak::ptr_eq(&handle.myself, &parent.myself), + "mgr returns parent first" + ); + drop(handle); + } + + // + // SHARD SPLIT: tenant manager changes, but the cache isn't informed + // + + // while we haven't shut down the parent, the cache will return the cached parent, even + // if the tenant manager returns the child + for i in 0..2 { + let handle = cache + .get( + timeline_id, + ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)), + &StubManager { + shards: vec![], // doesn't matter what's in here, the cache is fully loaded + }, + ) + .await + .expect("we have it"); + assert!( + Weak::ptr_eq(&handle.myself, &parent.myself), + "mgr returns parent" + ); + drop(handle); + } + + let parent_handle = cache + .get( + timeline_id, + ShardSelector::Page(make_relation_key_for_shard(ShardNumber(0), &child_params)), + &StubManager { + shards: vec![parent.clone()], + }, + ) + .await + .expect("we have it"); + assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself)); + + // invalidate the cache + parent.per_timeline_state.shutdown(); + + // the cache will now return the child, even though the parent handle still exists + for i in 0..2 { + let handle = cache + .get( + timeline_id, + ShardSelector::Page(make_relation_key_for_shard(ShardNumber(i), &child_params)), + &StubManager { + shards: vec![child0.clone(), child1.clone()], // <====== this changed compared to previous loop + }, + ) + .await + .expect("we have it"); + assert!( + Weak::ptr_eq( + &handle.myself, + &child_shards_by_shard_number[i as usize].myself + ), + "mgr returns child" + ); + drop(handle); + } + + // all the while the parent handle kept the parent gate open + tokio::select! { + _ = parent_handle.gate.close() => { + panic!("parent handle is keeping gate open"); + } + _ = tokio::time::sleep(FOREVER) => { } + } + drop(parent_handle); + tokio::select! { + _ = parent.gate.close() => { } + _ = tokio::time::sleep(FOREVER) => { + panic!("parent handle is dropped, no other gate holders exist") + } + } + } + + #[tokio::test(start_paused = true)] + async fn test_connection_handler_exit() { + crate::tenant::harness::setup_logging(); + let timeline_id = TimelineId::generate(); + let shard0 = Arc::new_cyclic(|myself| StubTimeline { + gate: Default::default(), + id: timeline_id, + shard: ShardIdentity::unsharded(), + per_timeline_state: PerTimelineState::default(), + myself: myself.clone(), + }); + let mgr = StubManager { + shards: vec![shard0.clone()], + }; + let key = DBDIR_KEY; + + // Simulate 10 connections that's opened, used, and closed + let mut used_handles = vec![]; + for _ in 0..10 { + let mut cache = Cache::::default(); + let handle = { + let handle = cache + .get(timeline_id, ShardSelector::Page(key), &mgr) + .await + .expect("we have the timeline"); + assert!(Weak::ptr_eq(&handle.myself, &shard0.myself)); + handle + }; + handle.getpage(); + used_handles.push(Arc::downgrade(&handle.0)); + } + + // No handles exist, thus gates are closed and don't require shutdown + assert!(used_handles + .iter() + .all(|weak| Weak::strong_count(weak) == 0)); + + // ... thus the gate should close immediately, even without shutdown + tokio::select! { + _ = shard0.gate.close() => { } + _ = tokio::time::sleep(FOREVER) => { + panic!("handle is dropped, no other gate holders exist") + } + } + } +}