From ddfb4508426c11a09a2390b8235f28eec6cf8834 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 12 Jul 2024 18:55:19 +0000 Subject: [PATCH] no more task_mgr for page_service --- pageserver/src/bin/pageserver.rs | 40 ++------ pageserver/src/lib.rs | 9 +- pageserver/src/page_service.rs | 156 ++++++++++++++++++++++--------- 3 files changed, 123 insertions(+), 82 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index a05d216d19716..b80fcf0f36c6d 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -18,9 +18,7 @@ use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_evicti use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; use pageserver::task_mgr::WALRECEIVER_RUNTIME; use pageserver::tenant::{secondary, TenantSharedResources}; -use pageserver::{ - CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, LibpqEndpointListener, -}; +use pageserver::{CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener}; use remote_storage::GenericRemoteStorage; use tokio::signal::unix::SignalKind; use tokio::time::Instant; @@ -30,11 +28,9 @@ use tracing::*; use metrics::set_build_info_metric; use pageserver::{ config::{defaults::*, PageServerConf}, - context::{DownloadBehavior, RequestContext}, deletion_queue::DeletionQueue, http, page_cache, page_service, task_mgr, - task_mgr::TaskKind, - task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, + task_mgr::{BACKGROUND_RUNTIME, MGMT_REQUEST_RUNTIME}, tenant::mgr, virtual_file, }; @@ -609,30 +605,12 @@ fn start_pageserver( // Spawn a task to listen for libpq connections. It will spawn further tasks // for each connection. We created the listener earlier already. - let libpq_listener = { - let cancel = CancellationToken::new(); - let libpq_ctx = RequestContext::todo_child( - TaskKind::LibpqEndpointListener, - // listener task shouldn't need to download anything. (We will - // create a separate sub-contexts for each connection, with their - // own download behavior. This context is used only to listen and - // accept connections.) - DownloadBehavior::Error, - ); - - let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( - "libpq listener", - page_service::libpq_listener_main( - tenant_manager.clone(), - pg_auth, - pageserver_listener, - conf.pg_auth_type, - libpq_ctx, - cancel.clone(), - ), - )); - LibpqEndpointListener(CancellableTask { task, cancel }) - }; + let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, { + pageserver_listener + .set_nonblocking(true) + .context("set listener to nonblocking")?; + tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")? + }); let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard()); @@ -660,7 +638,7 @@ fn start_pageserver( shutdown_pageserver.take(); pageserver::shutdown_pageserver( http_endpoint_listener, - libpq_listener, + page_service, consumption_metrics_tasks, disk_usage_eviction_task, &tenant_manager, diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 5501d66579e66..a4ce693ea79f4 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -30,7 +30,6 @@ pub mod walingest; pub mod walrecord; pub mod walredo; -use crate::task_mgr::TaskKind; use camino::Utf8Path; use deletion_queue::DeletionQueue; use tenant::mgr::TenantManager; @@ -73,7 +72,7 @@ impl CancellableTask { #[tracing::instrument(skip_all, fields(%exit_code))] pub async fn shutdown_pageserver( http_listener: HttpEndpointListener, - libpq_listener: LibpqEndpointListener, + page_service: page_service::Listener, consumption_metrics_worker: ConsumptionMetricsTasks, disk_usage_eviction_task: Option, tenant_manager: &TenantManager, @@ -83,8 +82,8 @@ pub async fn shutdown_pageserver( use std::time::Duration; // Shut down the libpq endpoint task. This prevents new connections from // being accepted. - timed( - libpq_listener.0.shutdown(), + let remaining_connections = timed( + page_service.stop_accepting(), "shutdown LibpqEndpointListener", Duration::from_secs(1), ) @@ -102,7 +101,7 @@ pub async fn shutdown_pageserver( // Shut down any page service tasks: any in-progress work for particular timelines or tenants // should already have been canclled via mgr::shutdown_all_tenants timed( - task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None), + remaining_connections.shutdown(), "shutdown PageRequestHandlers", Duration::from_secs(1), ) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index fd6260fb8a33a..1d08909188289 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -5,7 +5,7 @@ use anyhow::Context; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use pageserver_api::key::Key; use pageserver_api::models::TenantState; use pageserver_api::models::{ @@ -25,7 +25,6 @@ use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::collections::HashMap; use std::io; -use std::net::TcpListener; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -34,6 +33,7 @@ use std::time::Instant; use std::time::SystemTime; use tokio::io::AsyncWriteExt; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; use utils::sync::gate::GateGuard; @@ -47,14 +47,15 @@ use utils::{ use crate::auth::check_permission; use crate::basebackup; use crate::basebackup::BasebackupError; +use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS}; use crate::pgdatadir_mapping::Version; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; -use crate::task_mgr; use crate::task_mgr::TaskKind; +use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME}; use crate::tenant::mgr::GetActiveTenantError; use crate::tenant::mgr::GetTenantError; use crate::tenant::mgr::ShardResolveResult; @@ -76,32 +77,101 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); /////////////////////////////////////////////////////////////////////////////// +pub struct Listener { + cancel: CancellationToken, + /// Cancel the listener task through `listen_cancel` to shut down the listener + /// and get a handle on the existing connections. + task: JoinHandle, +} + +pub struct Connections { + cancel: CancellationToken, + tasks: tokio::task::JoinSet, +} + +pub fn spawn( + conf: &'static PageServerConf, + tenant_manager: Arc, + pg_auth: Option>, + tcp_listener: tokio::net::TcpListener, +) -> Listener { + let cancel = CancellationToken::new(); + let libpq_ctx = RequestContext::todo_child( + TaskKind::LibpqEndpointListener, + // listener task shouldn't need to download anything. (We will + // create a separate sub-contexts for each connection, with their + // own download behavior. This context is used only to listen and + // accept connections.) + DownloadBehavior::Error, + ); + let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( + "libpq listener", + libpq_listener_main( + tenant_manager, + pg_auth, + tcp_listener, + conf.pg_auth_type, + libpq_ctx, + cancel.clone(), + ) + .map(anyhow::Ok), + )); + + Listener { cancel, task } +} + +impl Listener { + pub async fn stop_accepting(self) -> Connections { + self.cancel.cancel(); + self.task + .await + .expect("unreachable: we wrap the listener task in task_mgr::exit_on_panic_or_error") + } +} +impl Connections { + pub async fn shutdown(self) { + let Self { cancel, mut tasks } = self; + cancel.cancel(); + while let Some(res) = tasks.join_next().await { + // the logging done here mimics what was formerly done by task_mgr + match res { + Ok(Ok(())) => {} + Ok(Err(e)) => error!("error in page_service connection task: {:?}", e), + Err(e) => error!("page_service connection task panicked: {:?}", e), + } + } + } +} + /// /// Main loop of the page service. /// /// Listens for connections, and launches a new handler task for each. /// +/// Returns Ok(()) upon cancellation via `cancel`, returning the set of +/// open connections. +/// pub async fn libpq_listener_main( tenant_manager: Arc, auth: Option>, - listener: TcpListener, + listener: tokio::net::TcpListener, auth_type: AuthType, listener_ctx: RequestContext, - cancel: CancellationToken, -) -> anyhow::Result<()> { - listener.set_nonblocking(true)?; - let tokio_listener = tokio::net::TcpListener::from_std(listener)?; + listener_cancel: CancellationToken, +) -> Connections { + let connections_cancel = CancellationToken::new(); + let mut connection_handler_tasks = tokio::task::JoinSet::default(); // Wait for a new connection to arrive, or for server shutdown. while let Some(res) = tokio::select! { biased; - _ = cancel.cancelled() => { + _ = listener_cancel.cancelled() => { // We were requested to shut down. None } - res = tokio_listener.accept() => { + res = listener.accept() => { Some(res) } } { @@ -110,28 +180,16 @@ pub async fn libpq_listener_main( // Connection established. Spawn a new task to handle it. debug!("accepted connection from {}", peer_addr); let local_auth = auth.clone(); - let connection_ctx = listener_ctx .detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download); - - // PageRequestHandler tasks are not associated with any particular - // timeline in the task manager. In practice most connections will - // only deal with a particular timeline, but we don't know which one - // yet. - task_mgr::spawn( - &tokio::runtime::Handle::current(), - TaskKind::PageRequestHandler, - None, - None, - "serving compute connection task", - page_service_conn_main( - tenant_manager.clone(), - local_auth, - socket, - auth_type, - connection_ctx, - ), - ); + connection_handler_tasks.spawn(page_service_conn_main( + tenant_manager.clone(), + local_auth, + socket, + auth_type, + connection_ctx, + connections_cancel.child_token(), + )); } Err(err) => { // accept() failed. Log the error, and loop back to retry on next connection. @@ -140,11 +198,16 @@ pub async fn libpq_listener_main( } } - debug!("page_service loop terminated"); + debug!("page_service listener loop terminated"); - Ok(()) + Connections { + cancel: connections_cancel, + tasks: connection_handler_tasks, + } } +type ConnectionHandlerResult = anyhow::Result<()>; + #[instrument(skip_all, fields(peer_addr))] async fn page_service_conn_main( tenant_manager: Arc, @@ -152,7 +215,8 @@ async fn page_service_conn_main( socket: tokio::net::TcpStream, auth_type: AuthType, connection_ctx: RequestContext, -) -> anyhow::Result<()> { + cancel: CancellationToken, +) -> ConnectionHandlerResult { let _guard = LIVE_CONNECTIONS .with_label_values(&["page_service"]) .guard(); @@ -200,13 +264,11 @@ async fn page_service_conn_main( // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. - let mut conn_handler = PageServerHandler::new(tenant_manager, auth, connection_ctx); + let mut conn_handler = + PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone()); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; - match pgbackend - .run(&mut conn_handler, &task_mgr::shutdown_token()) - .await - { + match pgbackend.run(&mut conn_handler, &cancel).await { Ok(()) => { // we've been requested to shut down Ok(()) @@ -249,6 +311,8 @@ struct PageServerHandler { /// or the ratio used when splitting shards (i.e. how many children created from one) /// parent shard, where a "large" number might be ~8. shard_timelines: HashMap, + + cancel: CancellationToken, } #[derive(thiserror::Error, Debug)] @@ -324,6 +388,7 @@ impl PageServerHandler { tenant_manager: Arc, auth: Option>, connection_ctx: RequestContext, + cancel: CancellationToken, ) -> Self { PageServerHandler { tenant_manager, @@ -331,6 +396,7 @@ impl PageServerHandler { claims: None, connection_ctx, shard_timelines: HashMap::new(), + cancel, } } @@ -338,7 +404,7 @@ impl PageServerHandler { /// /// We currently need to shut down when any of the following happens: /// 1. any of the timelines we hold GateGuards for in `shard_timelines` is cancelled - /// 2. task_mgr requests shutdown of the connection + /// 2. connection shutdown is requested via `Self::cancel` /// /// NB on (1): the connection's lifecycle is not actually tied to any of the /// `shard_timelines`s' lifecycles. But it's _necessary_ in the current @@ -362,7 +428,7 @@ impl PageServerHandler { let mut cancellation_sources = Vec::with_capacity(1 + self.shard_timelines.len()); use futures::future::Either; - cancellation_sources.push(Either::Left(task_mgr::shutdown_watcher())); + cancellation_sources.push(Either::Left(self.cancel.cancelled())); cancellation_sources.extend( self.shard_timelines .values() @@ -407,7 +473,7 @@ impl PageServerHandler { /// # Coding Discipline /// /// Coding discipline within this function: all interaction with the `pgb` connection - /// needs to be sensitive to page_service shutdown, currently signalled via [`task_mgr::shutdown_token`]. + /// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`]. /// This is so that we can shutdown page_service quickly. #[instrument(skip_all)] async fn handle_pagerequests( @@ -423,13 +489,11 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - let cancel = task_mgr::shutdown_token(); - // switch client to COPYBOTH pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; tokio::select! { biased; - _ = cancel.cancelled() => { + _ = self.cancel.cancelled() => { return Err(QueryError::Shutdown) } res = pgb.flush() => { @@ -441,7 +505,7 @@ impl PageServerHandler { // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData) let msg = tokio::select! { biased; - _ = cancel.cancelled() => { + _ = self.cancel.cancelled() => { return Err(QueryError::Shutdown) } msg = pgb.read_message() => { msg } @@ -558,7 +622,7 @@ impl PageServerHandler { pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; tokio::select! { biased; - _ = cancel.cancelled() => { + _ = self.cancel.cancelled() => { // We were requested to shut down. info!("shutdown request received in page handler"); return Err(QueryError::Shutdown)