Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(pageserver) remove task_mgr for most global tasks #8449

Merged
merged 6 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 70 additions & 94 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_evicti
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::{secondary, TenantSharedResources};
use pageserver::{
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, LibpqEndpointListener,
};
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;

use metrics::set_build_info_metric;
Expand Down Expand Up @@ -430,8 +434,10 @@ fn start_pageserver(

// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let background_purges = mgr::BackgroundPurges::default();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
background_purges.clone(),
TenantSharedResources {
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
Expand Down Expand Up @@ -523,7 +529,7 @@ fn start_pageserver(
}
});

let secondary_controller = secondary::spawn_tasks(
let (secondary_controller, secondary_controller_tasks) = secondary::spawn_tasks(
tenant_manager.clone(),
remote_storage.clone(),
background_jobs_barrier.clone(),
Expand All @@ -536,18 +542,19 @@ fn start_pageserver(
// been configured.
let disk_usage_eviction_state: Arc<disk_usage_eviction_task::State> = Arc::default();

launch_disk_usage_global_eviction_task(
let disk_usage_eviction_task = launch_disk_usage_global_eviction_task(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
);

// Start up the service to handle HTTP mgmt API request. We created the
// listener earlier already.
{
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
let http_endpoint_listener = {
let _rt_guard = MGMT_REQUEST_RUNTIME.enter(); // for hyper
let cancel = CancellationToken::new();

let router_state = Arc::new(
http::routes::State::new(
Expand All @@ -568,77 +575,44 @@ fn start_pageserver(
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?
.serve(service)
.with_graceful_shutdown(task_mgr::shutdown_watcher());
.with_graceful_shutdown({
let cancel = cancel.clone();
async move { cancel.clone().cancelled().await }
});

task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::HttpEndpointListener,
None,
None,
let task = MGMT_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"http endpoint listener",
true,
async {
server.await?;
Ok(())
},
);
}

if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
// The actual size calculation does need downloads, and
// creates a child context with the right DownloadBehavior.
DownloadBehavior::Error,
);
server,
));
HttpEndpointListener(CancellableTask { task, cancel })
};

let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");

task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
"consumption metrics collection",
true,
{
let tenant_manager = tenant_manager.clone();
async move {
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
let cancel = task_mgr::shutdown_token();

tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => {}
};

pageserver::consumption_metrics::collect_metrics(
tenant_manager,
metric_collection_endpoint,
&conf.metric_collection_bucket,
conf.metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
local_disk_storage,
cancel,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))
.await?;
Ok(())
}
},
);
}
let consumption_metrics_tasks = {
let cancel = shutdown_pageserver.child_token();
let task = crate::BACKGROUND_RUNTIME.spawn({
let tenant_manager = tenant_manager.clone();
let cancel = cancel.clone();
async move {
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
tokio::select! {
_ = cancel.cancelled() => { return; },
_ = background_jobs_barrier.wait() => {}
};

pageserver::consumption_metrics::run(conf, tenant_manager, cancel).await;
}
});
ConsumptionMetricsTasks(CancellableTask { task, cancel })
};

// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
{
let libpq_listener = {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
TaskKind::LibpqEndpointListener,
// listener task shouldn't need to download anything. (We will
Expand All @@ -647,29 +621,20 @@ fn start_pageserver(
// accept connections.)
DownloadBehavior::Error,
);
task_mgr::spawn(
COMPUTE_REQUEST_RUNTIME.handle(),
TaskKind::LibpqEndpointListener,
None,
None,
"libpq endpoint listener",
true,
{
let tenant_manager = tenant_manager.clone();
async move {
page_service::libpq_listener_main(
tenant_manager,
pg_auth,
pageserver_listener,
conf.pg_auth_type,
libpq_ctx,
task_mgr::shutdown_token(),
)
.await
}
},
);
}

let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"libpq listener",
page_service::libpq_listener_main(
tenant_manager.clone(),
pg_auth,
pageserver_listener,
conf.pg_auth_type,
libpq_ctx,
cancel.clone(),
),
));
LibpqEndpointListener(CancellableTask { task, cancel })
};

let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());

Expand All @@ -695,7 +660,18 @@ fn start_pageserver(
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
pageserver::shutdown_pageserver(&tenant_manager, deletion_queue.clone(), 0).await;
pageserver::shutdown_pageserver(
http_endpoint_listener,
libpq_listener,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,
background_purges,
deletion_queue.clone(),
secondary_controller_tasks,
0,
)
.await;
unreachable!()
})
}
Expand Down
100 changes: 62 additions & 38 deletions pageserver/src/consumption_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::size::CalculateSyntheticSizeError;
Expand Down Expand Up @@ -39,49 +40,74 @@ type RawMetric = (MetricsKey, (EventType, u64));
/// for deduplication, but that is no longer needed.
type Cache = HashMap<MetricsKey, (EventType, u64)>;

pub async fn run(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) {
let Some(metric_collection_endpoint) = conf.metric_collection_endpoint.as_ref() else {
return;
};

let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");

let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
// The actual size calculation does need downloads, and
// creates a child context with the right DownloadBehavior.
DownloadBehavior::Error,
);
let collect_metrics = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"consumption metrics collection",
collect_metrics(
tenant_manager.clone(),
metric_collection_endpoint,
&conf.metric_collection_bucket,
conf.metric_collection_interval,
conf.id,
local_disk_storage,
cancel.clone(),
metrics_ctx,
)
.instrument(info_span!("metrics_collection")),
));

let worker_ctx =
RequestContext::todo_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
let synthetic_size_worker = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"synthetic size calculation",
calculate_synthetic_size_worker(
tenant_manager.clone(),
conf.synthetic_size_calculation_interval,
cancel.clone(),
worker_ctx,
)
.instrument(info_span!("synthetic_size_worker")),
));

let (collect_metrics, synthetic_size_worker) =
futures::future::join(collect_metrics, synthetic_size_worker).await;
collect_metrics
.expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
synthetic_size_worker
.expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
}

/// Main thread that serves metrics collection
#[allow(clippy::too_many_arguments)]
pub async fn collect_metrics(
async fn collect_metrics(
tenant_manager: Arc<TenantManager>,
metric_collection_endpoint: &Url,
metric_collection_bucket: &Option<RemoteStorageConfig>,
metric_collection_interval: Duration,
synthetic_size_calculation_interval: Duration,
node_id: NodeId,
local_disk_storage: Utf8PathBuf,
cancel: CancellationToken,
ctx: RequestContext,
) -> anyhow::Result<()> {
// spin up background worker that caclulates tenant sizes
let worker_ctx =
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::CalculateSyntheticSize,
None,
None,
"synthetic size calculation",
false,
{
let tenant_manager = tenant_manager.clone();
async move {
calculate_synthetic_size_worker(
tenant_manager,
synthetic_size_calculation_interval,
&cancel,
&worker_ctx,
)
.instrument(info_span!("synthetic_size_worker"))
.await?;
Ok(())
}
},
);

let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);

let cancel = task_mgr::shutdown_token();

let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);

let mut cached_metrics = tokio::select! {
Expand Down Expand Up @@ -168,11 +194,9 @@ pub async fn collect_metrics(
BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
);

let res = tokio::time::timeout_at(
started_at + metric_collection_interval,
task_mgr::shutdown_token().cancelled(),
)
.await;
let res =
tokio::time::timeout_at(started_at + metric_collection_interval, cancel.cancelled())
.await;
if res.is_ok() {
return Ok(());
}
Expand Down Expand Up @@ -272,8 +296,8 @@ async fn reschedule(
async fn calculate_synthetic_size_worker(
tenant_manager: Arc<TenantManager>,
synthetic_size_calculation_interval: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
cancel: CancellationToken,
ctx: RequestContext,
) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker");
scopeguard::defer! {
Expand Down Expand Up @@ -313,7 +337,7 @@ async fn calculate_synthetic_size_worker(
// there is never any reason to exit calculate_synthetic_size_worker following any
// return value -- we don't need to care about shutdown because no tenant is found when
// pageserver is shut down.
calculate_and_log(&tenant, cancel, ctx).await;
calculate_and_log(&tenant, &cancel, &ctx).await;
}

crate::tenant::tasks::warn_when_period_overrun(
Expand Down
Loading
Loading