Skip to content

Commit

Permalink
no more task_mgr for page_service
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Jul 12, 2024
1 parent 4114efc commit ddfb450
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 82 deletions.
40 changes: 9 additions & 31 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_evicti
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::{secondary, TenantSharedResources};
use pageserver::{
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, LibpqEndpointListener,
};
use pageserver::{CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener};
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
Expand All @@ -30,11 +28,9 @@ use tracing::*;
use metrics::set_build_info_metric;
use pageserver::{
config::{defaults::*, PageServerConf},
context::{DownloadBehavior, RequestContext},
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
task_mgr::{BACKGROUND_RUNTIME, MGMT_REQUEST_RUNTIME},
tenant::mgr,
virtual_file,
};
Expand Down Expand Up @@ -609,30 +605,12 @@ fn start_pageserver(

// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
let libpq_listener = {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
TaskKind::LibpqEndpointListener,
// listener task shouldn't need to download anything. (We will
// create a separate sub-contexts for each connection, with their
// own download behavior. This context is used only to listen and
// accept connections.)
DownloadBehavior::Error,
);

let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"libpq listener",
page_service::libpq_listener_main(
tenant_manager.clone(),
pg_auth,
pageserver_listener,
conf.pg_auth_type,
libpq_ctx,
cancel.clone(),
),
));
LibpqEndpointListener(CancellableTask { task, cancel })
};
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
});

let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());

Expand Down Expand Up @@ -660,7 +638,7 @@ fn start_pageserver(
shutdown_pageserver.take();
pageserver::shutdown_pageserver(
http_endpoint_listener,
libpq_listener,
page_service,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,
Expand Down
9 changes: 4 additions & 5 deletions pageserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;

use crate::task_mgr::TaskKind;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;
use tenant::mgr::TenantManager;
Expand Down Expand Up @@ -73,7 +72,7 @@ impl CancellableTask {
#[tracing::instrument(skip_all, fields(%exit_code))]
pub async fn shutdown_pageserver(
http_listener: HttpEndpointListener,
libpq_listener: LibpqEndpointListener,
page_service: page_service::Listener,
consumption_metrics_worker: ConsumptionMetricsTasks,
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
tenant_manager: &TenantManager,
Expand All @@ -83,8 +82,8 @@ pub async fn shutdown_pageserver(
use std::time::Duration;
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
timed(
libpq_listener.0.shutdown(),
let remaining_connections = timed(
page_service.stop_accepting(),
"shutdown LibpqEndpointListener",
Duration::from_secs(1),
)
Expand All @@ -102,7 +101,7 @@ pub async fn shutdown_pageserver(
// Shut down any page service tasks: any in-progress work for particular timelines or tenants
// should already have been canclled via mgr::shutdown_all_tenants
timed(
task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
remaining_connections.shutdown(),
"shutdown PageRequestHandlers",
Duration::from_secs(1),
)
Expand Down
Loading

0 comments on commit ddfb450

Please sign in to comment.