Skip to content

Commit

Permalink
Move testing pageserver libpq cmds to HTTP api (#2429)
Browse files Browse the repository at this point in the history
Closes #2422.

The APIs have been feature gated with the `testing_api!` macro so that
they return 400s when support hasn't been compiled in.
  • Loading branch information
sharnoff authored Sep 20, 2022
1 parent 4b25b96 commit 4a3b3ff
Show file tree
Hide file tree
Showing 29 changed files with 355 additions and 230 deletions.
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ opt-level = 3
opt-level = 1

[alias]
build_testing = ["build", "--features", "failpoints"]
build_testing = ["build", "--features", "testing"]
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ jobs:
run: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FEATURES="--features failpoints"
CARGO_FEATURES="--features testing"
CARGO_FLAGS="--locked --timings $CARGO_FEATURES"
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FEATURES="--features failpoints,profiling"
CARGO_FEATURES="--features testing,profiling"
CARGO_FLAGS="--locked --timings --release $CARGO_FEATURES"
fi
echo "cov_prefix=${cov_prefix}" >> $GITHUB_ENV
Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,12 @@ Ensure your dependencies are installed as described [here](https://github.com/ne

```sh
git clone --recursive https://github.com/neondatabase/neon.git
make # builds also postgres and installs it to ./pg_install

# either:
CARGO_BUILD_FLAGS="--features=testing" make
# or:
make debug

./scripts/pytest
```

Expand Down
6 changes: 3 additions & 3 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ edition = "2021"

[features]
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]

# Feature that enables a special API, fail_point! macro (adds some runtime cost)
# to run tests on outage conditions
failpoints = ["fail/failpoints"]
profiling = ["pprof"]

[dependencies]
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ fn main() -> anyhow::Result<()> {

if arg_matches.is_present("enabled-features") {
let features: &[&str] = &[
#[cfg(feature = "failpoints")]
"failpoints",
#[cfg(feature = "testing")]
"testing",
#[cfg(feature = "profiling")]
"profiling",
];
Expand Down
18 changes: 18 additions & 0 deletions pageserver/src/http/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,21 @@ pub struct TimelineInfo {
pub local: Option<LocalTimelineInfo>,
pub remote: Option<RemoteTimelineInfo>,
}

pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;

/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}
141 changes: 141 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ use utils::{
lsn::Lsn,
};

// Imports only used for testing APIs
#[cfg(feature = "testing")]
use super::models::{ConfigureFailpointsRequest, TimelineGcRequest};
#[cfg(feature = "testing")]
use crate::CheckpointConfig;

struct State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
Expand Down Expand Up @@ -661,6 +667,103 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
json_response(StatusCode::OK, ())
}

#[cfg(any(feature = "testing", feature = "failpoints"))]
async fn failpoints_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(
"Cannot manage failpoints because pageserver was compiled without failpoints support"
.to_owned(),
));
}

let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);

// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = if fp.actions == "exit" {
fail::cfg_callback(fp.name, || {
info!("Exit requested by failpoint");
std::process::exit(1);
})
} else {
fail::cfg(fp.name, &fp.actions)
};

if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(format!(
"Failed to configure failpoints: {err_msg}"
)));
}
}

json_response(StatusCode::OK, ())
}

// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/regress/test_gc.py.
// This probably should require special authentication or a global flag to
// enable, I don't think we want to or need to allow regular clients to invoke
// GC.
// @hllinnaka in commits ec44f4b29, 3aca717f3
#[cfg(feature = "testing")]
async fn timeline_gc_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;

// FIXME: currently this will return a 500 error on bad tenant id; it should be 4XX
let repo = tenant_mgr::get_tenant(tenant_id, false)?;
let gc_req: TimelineGcRequest = json_request(&mut request).await?;

let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| repo.get_gc_horizon());

// Use tenant's pitr setting
let pitr = repo.get_pitr_interval();
let result = repo.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?;
json_response(StatusCode::OK, result)
}

// Run compaction immediately on given timeline.
// FIXME This is just for tests. Don't expect this to be exposed to
// the users or the api.
// @dhammika in commit a0781f229
#[cfg(feature = "testing")]
async fn timeline_compact_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;

let repo = tenant_mgr::get_tenant(tenant_id, true)?;
// FIXME: currently this will return a 500 error on bad timeline id; it should be 4XX
let timeline = repo.get_timeline(timeline_id).with_context(|| {
format!("No timeline {timeline_id} in repository for tenant {tenant_id}")
})?;
timeline.compact()?;

json_response(StatusCode::OK, ())
}

// Run checkpoint immediately on given timeline.
#[cfg(feature = "testing")]
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;

let repo = tenant_mgr::get_tenant(tenant_id, true)?;
// FIXME: currently this will return a 500 error on bad timeline id; it should be 4XX
let timeline = repo.get_timeline(timeline_id).with_context(|| {
format!("No timeline {timeline_id} in repository for tenant {tenant_id}")
})?;
timeline.checkpoint(CheckpointConfig::Forced)?;

json_response(StatusCode::OK, ())
}

async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
Expand All @@ -687,12 +790,38 @@ pub fn make_router(
}))
}

macro_rules! testing_api {
($handler_desc:literal, $handler:path $(,)?) => {{
#[cfg(not(feature = "testing"))]
async fn cfg_disabled(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
Err(ApiError::BadRequest(
concat!(
"Cannot ",
$handler_desc,
" because pageserver was compiled without testing APIs",
)
.to_owned(),
))
}

#[cfg(feature = "testing")]
let handler = $handler;
#[cfg(not(feature = "testing"))]
let handler = cfg_disabled;
handler
}};
}

Ok(router
.data(Arc::new(
State::new(conf, auth, remote_index, remote_storage)
.context("Failed to initialize router state")?,
))
.get("/v1/status", status_handler)
.put(
"/v1/failpoints",
testing_api!("manage failpoints", failpoints_handler),
)
.get("/v1/tenant", tenant_list_handler)
.post("/v1/tenant", tenant_create_handler)
.get("/v1/tenant/:tenant_id", tenant_status)
Expand All @@ -705,6 +834,18 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc",
testing_api!("run timeline GC", timeline_gc_handler),
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/compact",
testing_api!("run timeline compaction", timeline_compact_handler),
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
testing_api!("run timeline checkpoint", timeline_checkpoint_handler),
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_delete_handler,
Expand Down
115 changes: 1 addition & 114 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use utils::{
lsn::Lsn,
postgres_backend::AuthType,
postgres_backend_async::{self, PostgresBackend},
pq_proto::{BeMessage, FeMessage, RowDescriptor, SINGLE_COL_ROWDESC},
pq_proto::{BeMessage, FeMessage, RowDescriptor},
simple_rcu::RcuReadGuard,
};

Expand Down Expand Up @@ -1005,31 +1005,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("failpoints ") {
ensure!(fail::has_failpoints(), "Cannot manage failpoints because pageserver was compiled without failpoints support");

let (_, failpoints) = query_string.split_at("failpoints ".len());

for failpoint in failpoints.split(';') {
if let Some((name, actions)) = failpoint.split_once('=') {
info!("cfg failpoint: {} {}", name, actions);

// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
if actions == "exit" {
fail::cfg_callback(name, || {
info!("Exit requested by failpoint");
std::process::exit(1);
})
.unwrap();
} else {
fail::cfg(name, actions).unwrap();
}
} else {
bail!("Invalid failpoints format");
}
}
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("show ") {
// show <tenant_id>
let (_, params_raw) = query_string.split_at("show ".len());
Expand Down Expand Up @@ -1072,94 +1047,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("do_gc ") {
// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/regress/test_gc.py.
// This probably should require special authentication or a global flag to
// enable, I don't think we want to or need to allow regular clients to invoke
// GC.

// do_gc <tenant_id> <timeline_id> <gc_horizon>
let re = Regex::new(r"^do_gc ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)([[:digit:]]+)?")
.unwrap();

let caps = re
.captures(query_string)
.with_context(|| format!("invalid do_gc: '{}'", query_string))?;

let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;

let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();

let tenant = tenant_mgr::get_tenant(tenant_id, true)?;

let gc_horizon: u64 = caps
.get(4)
.map(|h| h.as_str().parse())
.unwrap_or_else(|| Ok(tenant.get_gc_horizon()))?;

// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
let result = tenant.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?;
pgb.write_message(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layers_total"),
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
RowDescriptor::int8_col(b"layers_needed_by_pitr"),
RowDescriptor::int8_col(b"layers_needed_by_branches"),
RowDescriptor::int8_col(b"layers_not_updated"),
RowDescriptor::int8_col(b"layers_removed"),
RowDescriptor::int8_col(b"elapsed"),
]))?
.write_message(&BeMessage::DataRow(&[
Some(result.layers_total.to_string().as_bytes()),
Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
Some(result.layers_needed_by_pitr.to_string().as_bytes()),
Some(result.layers_needed_by_branches.to_string().as_bytes()),
Some(result.layers_not_updated.to_string().as_bytes()),
Some(result.layers_removed.to_string().as_bytes()),
Some(result.elapsed.as_millis().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("compact ") {
// Run compaction immediately on given timeline.
// FIXME This is just for tests. Don't expect this to be exposed to
// the users or the api.

// compact <tenant_id> <timeline_id>
let re = Regex::new(r"^compact ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)?").unwrap();

let caps = re
.captures(query_string)
.with_context(|| format!("Invalid compact: '{}'", query_string))?;

let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
timeline.compact()?;

pgb.write_message(&SINGLE_COL_ROWDESC)?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("checkpoint ") {
// Run checkpoint immediately on given timeline.

// checkpoint <tenant_id> <timeline_id>
let re = Regex::new(r"^checkpoint ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)?").unwrap();

let caps = re
.captures(query_string)
.with_context(|| format!("invalid checkpoint command: '{}'", query_string))?;

let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;

// Checkpoint the timeline and also compact it (due to `CheckpointConfig::Forced`).
timeline.checkpoint(CheckpointConfig::Forced)?;

pgb.write_message(&SINGLE_COL_ROWDESC)?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("get_lsn_by_timestamp ") {
// Locate LSN of last transaction with timestamp less or equal than sppecified
// TODO lazy static
Expand Down
Loading

0 comments on commit 4a3b3ff

Please sign in to comment.