Skip to content

Commit

Permalink
refactor(page_service): Timeline gate guard holding + cancellation + …
Browse files Browse the repository at this point in the history
…shutdown (#8339)

Since the introduction of sharding, the protocol handling loop in
`handle_pagerequests` cannot know anymore which concrete
`Tenant`/`Timeline` object any of the incoming `PagestreamFeMessage`
resolves to.
In fact, one message might resolve to one `Tenant`/`Timeline` while
the next one may resolve to another one.

To avoid going to tenant manager, we added the `shard_timelines` which
acted as an ever-growing cache that held timeline gate guards open for
the lifetime of the connection.
The consequence of holding the gate guards open was that we had to be
sensitive to every cached `Timeline::cancel` on each interaction with
the network connection, so that Timeline shutdown would not have to wait
for network connection interaction.

We can do better than that, meaning more efficiency & better
abstraction.
I proposed a sketch for it in

* #8286

and this PR implements an evolution of that sketch.

The main idea is is that `mod page_service` shall be solely concerned
with the following:
1. receiving requests by speaking the protocol / pagestream subprotocol
2. dispatching the request to a corresponding method on the correct
shard/`Timeline` object
3. sending response by speaking the protocol / pagestream subprotocol.

The cancellation sensitivity responsibilities are clear cut:
* while in `page_service` code, sensitivity to page_service cancellation
is sufficient
* while in `Timeline` code, sensitivity to `Timeline::cancel` is
sufficient

To enforce these responsibilities, we introduce the notion of a
`timeline::handle::Handle` to a `Timeline` object that is checked out
from a `timeline::handle::Cache` for **each request**.
The `Handle` derefs to `Timeline` and is supposed to be used for a
single async method invocation on `Timeline`.
See the lengthy doc comment in `mod handle` for details of the design.
  • Loading branch information
problame authored and arpad-m committed Aug 5, 2024
1 parent 311cc71 commit 31122ad
Show file tree
Hide file tree
Showing 8 changed files with 1,387 additions and 432 deletions.
43 changes: 11 additions & 32 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ use pageserver::config::PageserverIdentity;
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::task_mgr::{COMPUTE_REQUEST_RUNTIME, WALRECEIVER_RUNTIME};
use pageserver::tenant::{secondary, TenantSharedResources};
use pageserver::{
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, LibpqEndpointListener,
};
use pageserver::{CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener};
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
Expand All @@ -31,11 +29,9 @@ use tracing::*;
use metrics::set_build_info_metric;
use pageserver::{
config::PageServerConf,
context::{DownloadBehavior, RequestContext},
deletion_queue::DeletionQueue,
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
task_mgr::{BACKGROUND_RUNTIME, MGMT_REQUEST_RUNTIME},
tenant::mgr,
virtual_file,
};
Expand Down Expand Up @@ -594,30 +590,13 @@ fn start_pageserver(

// Spawn a task to listen for libpq connections. It will spawn further tasks
// for each connection. We created the listener earlier already.
let libpq_listener = {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
TaskKind::LibpqEndpointListener,
// listener task shouldn't need to download anything. (We will
// create a separate sub-contexts for each connection, with their
// own download behavior. This context is used only to listen and
// accept connections.)
DownloadBehavior::Error,
);

let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"libpq listener",
page_service::libpq_listener_main(
tenant_manager.clone(),
pg_auth,
pageserver_listener,
conf.pg_auth_type,
libpq_ctx,
cancel.clone(),
),
));
LibpqEndpointListener(CancellableTask { task, cancel })
};
let page_service = page_service::spawn(conf, tenant_manager.clone(), pg_auth, {
let _entered = COMPUTE_REQUEST_RUNTIME.enter(); // TcpListener::from_std requires it
pageserver_listener
.set_nonblocking(true)
.context("set listener to nonblocking")?;
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
});

let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());

Expand Down Expand Up @@ -645,7 +624,7 @@ fn start_pageserver(
shutdown_pageserver.take();
pageserver::shutdown_pageserver(
http_endpoint_listener,
libpq_listener,
page_service,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,
Expand Down
5 changes: 5 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ impl From<GetActiveTenantError> for ApiError {
GetActiveTenantError::WaitForActiveTimeout { .. } => {
ApiError::ResourceUnavailable(format!("{}", e).into())
}
GetActiveTenantError::SwitchedTenant => {
// in our HTTP handlers, this error doesn't happen
// TODO: separate error types
ApiError::ResourceUnavailable("switched tenant".into())
}
}
}
}
Expand Down
10 changes: 4 additions & 6 deletions pageserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;

use crate::task_mgr::TaskKind;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;
use tenant::{
Expand Down Expand Up @@ -63,7 +62,6 @@ pub struct CancellableTask {
pub cancel: CancellationToken,
}
pub struct HttpEndpointListener(pub CancellableTask);
pub struct LibpqEndpointListener(pub CancellableTask);
pub struct ConsumptionMetricsTasks(pub CancellableTask);
pub struct DiskUsageEvictionTask(pub CancellableTask);
impl CancellableTask {
Expand All @@ -77,7 +75,7 @@ impl CancellableTask {
#[allow(clippy::too_many_arguments)]
pub async fn shutdown_pageserver(
http_listener: HttpEndpointListener,
libpq_listener: LibpqEndpointListener,
page_service: page_service::Listener,
consumption_metrics_worker: ConsumptionMetricsTasks,
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
tenant_manager: &TenantManager,
Expand All @@ -89,8 +87,8 @@ pub async fn shutdown_pageserver(
use std::time::Duration;
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
timed(
libpq_listener.0.shutdown(),
let remaining_connections = timed(
page_service.stop_accepting(),
"shutdown LibpqEndpointListener",
Duration::from_secs(1),
)
Expand All @@ -108,7 +106,7 @@ pub async fn shutdown_pageserver(
// Shut down any page service tasks: any in-progress work for particular timelines or tenants
// should already have been canclled via mgr::shutdown_all_tenants
timed(
task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
remaining_connections.shutdown(),
"shutdown PageRequestHandlers",
Duration::from_secs(1),
)
Expand Down
Loading

0 comments on commit 31122ad

Please sign in to comment.