Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move testing pageserver libpq cmds to HTTP api #2429

Merged
merged 6 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ opt-level = 3
opt-level = 1

[alias]
build_testing = ["build", "--features", "failpoints"]
build_testing = ["build", "--features", "testing"]
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ jobs:
run: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FEATURES="--features failpoints"
CARGO_FEATURES="--features testing"
CARGO_FLAGS="--locked --timings $CARGO_FEATURES"
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FEATURES="--features failpoints,profiling"
CARGO_FEATURES="--features testing,profiling"
CARGO_FLAGS="--locked --timings --release $CARGO_FEATURES"
fi
echo "cov_prefix=${cov_prefix}" >> $GITHUB_ENV
Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,12 @@ Ensure your dependencies are installed as described [here](https://github.com/ne

```sh
git clone --recursive https://github.com/neondatabase/neon.git
make # builds also postgres and installs it to ./pg_install

# either:
CARGO_BUILD_FLAGS="--features=testing" make
# or:
make debug

./scripts/pytest
```

Expand Down
6 changes: 3 additions & 3 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ edition = "2021"

[features]
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]

# Feature that enables a special API, fail_point! macro (adds some runtime cost)
# to run tests on outage conditions
failpoints = ["fail/failpoints"]
profiling = ["pprof"]

[dependencies]
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ fn main() -> anyhow::Result<()> {

if arg_matches.is_present("enabled-features") {
let features: &[&str] = &[
#[cfg(feature = "failpoints")]
"failpoints",
#[cfg(feature = "testing")]
"testing",
#[cfg(feature = "profiling")]
"profiling",
];
Expand Down
18 changes: 18 additions & 0 deletions pageserver/src/http/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,21 @@ pub struct TimelineInfo {
pub local: Option<LocalTimelineInfo>,
pub remote: Option<RemoteTimelineInfo>,
}

pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;

/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}
141 changes: 141 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ use utils::{
lsn::Lsn,
};

// Imports only used for testing APIs
#[cfg(feature = "testing")]
use super::models::{ConfigureFailpointsRequest, TimelineGcRequest};
#[cfg(feature = "testing")]
use crate::CheckpointConfig;

struct State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
Expand Down Expand Up @@ -661,6 +667,103 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
json_response(StatusCode::OK, ())
}

#[cfg(any(feature = "testing", feature = "failpoints"))]
sharnoff marked this conversation as resolved.
Show resolved Hide resolved
async fn failpoints_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(
"Cannot manage failpoints because pageserver was compiled without failpoints support"
.to_owned(),
));
}

let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);

// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = if fp.actions == "exit" {
fail::cfg_callback(fp.name, || {
info!("Exit requested by failpoint");
std::process::exit(1);
})
} else {
fail::cfg(fp.name, &fp.actions)
};

if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(format!(
"Failed to configure failpoints: {err_msg}"
)));
}
}

json_response(StatusCode::OK, ())
}

// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/regress/test_gc.py.
// This probably should require special authentication or a global flag to
// enable, I don't think we want to or need to allow regular clients to invoke
// GC.
// @hllinnaka in commits ec44f4b29, 3aca717f3
#[cfg(feature = "testing")]
async fn timeline_gc_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;

// FIXME: currently this will return a 500 error on bad tenant id; it should be 4XX
let repo = tenant_mgr::get_tenant(tenant_id, false)?;
let gc_req: TimelineGcRequest = json_request(&mut request).await?;

let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| repo.get_gc_horizon());

// Use tenant's pitr setting
let pitr = repo.get_pitr_interval();
let result = repo.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?;
json_response(StatusCode::OK, result)
}

// Run compaction immediately on given timeline.
// FIXME This is just for tests. Don't expect this to be exposed to
// the users or the api.
// @dhammika in commit a0781f229
#[cfg(feature = "testing")]
async fn timeline_compact_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;

let repo = tenant_mgr::get_tenant(tenant_id, true)?;
// FIXME: currently this will return a 500 error on bad timeline id; it should be 4XX
let timeline = repo.get_timeline(timeline_id).with_context(|| {
format!("No timeline {timeline_id} in repository for tenant {tenant_id}")
})?;
timeline.compact()?;

json_response(StatusCode::OK, ())
}

// Run checkpoint immediately on given timeline.
#[cfg(feature = "testing")]
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;

let repo = tenant_mgr::get_tenant(tenant_id, true)?;
// FIXME: currently this will return a 500 error on bad timeline id; it should be 4XX
let timeline = repo.get_timeline(timeline_id).with_context(|| {
format!("No timeline {timeline_id} in repository for tenant {tenant_id}")
})?;
timeline.checkpoint(CheckpointConfig::Forced)?;

json_response(StatusCode::OK, ())
}

async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
Expand All @@ -687,12 +790,38 @@ pub fn make_router(
}))
}

macro_rules! testing_api {
($handler_desc:literal, $handler:path $(,)?) => {{
#[cfg(not(feature = "testing"))]
async fn cfg_disabled(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
Err(ApiError::BadRequest(
concat!(
"Cannot ",
$handler_desc,
" because pageserver was compiled without testing APIs",
)
.to_owned(),
))
}

#[cfg(feature = "testing")]
let handler = $handler;
#[cfg(not(feature = "testing"))]
let handler = cfg_disabled;
handler
}};
}

Ok(router
.data(Arc::new(
State::new(conf, auth, remote_index, remote_storage)
.context("Failed to initialize router state")?,
))
.get("/v1/status", status_handler)
.put(
"/v1/failpoints",
testing_api!("manage failpoints", failpoints_handler),
)
.get("/v1/tenant", tenant_list_handler)
.post("/v1/tenant", tenant_create_handler)
.get("/v1/tenant/:tenant_id", tenant_status)
Expand All @@ -705,6 +834,18 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc",
testing_api!("run timeline GC", timeline_gc_handler),
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/compact",
testing_api!("run timeline compaction", timeline_compact_handler),
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
testing_api!("run timeline checkpoint", timeline_checkpoint_handler),
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_delete_handler,
Expand Down
115 changes: 1 addition & 114 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use utils::{
lsn::Lsn,
postgres_backend::AuthType,
postgres_backend_async::{self, PostgresBackend},
pq_proto::{BeMessage, FeMessage, RowDescriptor, SINGLE_COL_ROWDESC},
pq_proto::{BeMessage, FeMessage, RowDescriptor},
simple_rcu::RcuReadGuard,
};

Expand Down Expand Up @@ -1005,31 +1005,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("failpoints ") {
ensure!(fail::has_failpoints(), "Cannot manage failpoints because pageserver was compiled without failpoints support");

let (_, failpoints) = query_string.split_at("failpoints ".len());

for failpoint in failpoints.split(';') {
if let Some((name, actions)) = failpoint.split_once('=') {
info!("cfg failpoint: {} {}", name, actions);

// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
if actions == "exit" {
fail::cfg_callback(name, || {
info!("Exit requested by failpoint");
std::process::exit(1);
})
.unwrap();
} else {
fail::cfg(name, actions).unwrap();
}
} else {
bail!("Invalid failpoints format");
}
}
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("show ") {
// show <tenant_id>
let (_, params_raw) = query_string.split_at("show ".len());
Expand Down Expand Up @@ -1072,94 +1047,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("do_gc ") {
// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/regress/test_gc.py.
// This probably should require special authentication or a global flag to
// enable, I don't think we want to or need to allow regular clients to invoke
// GC.

// do_gc <tenant_id> <timeline_id> <gc_horizon>
let re = Regex::new(r"^do_gc ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)([[:digit:]]+)?")
.unwrap();

let caps = re
.captures(query_string)
.with_context(|| format!("invalid do_gc: '{}'", query_string))?;

let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;

let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();

let tenant = tenant_mgr::get_tenant(tenant_id, true)?;

let gc_horizon: u64 = caps
.get(4)
.map(|h| h.as_str().parse())
.unwrap_or_else(|| Ok(tenant.get_gc_horizon()))?;

// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
let result = tenant.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?;
pgb.write_message(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layers_total"),
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
RowDescriptor::int8_col(b"layers_needed_by_pitr"),
RowDescriptor::int8_col(b"layers_needed_by_branches"),
RowDescriptor::int8_col(b"layers_not_updated"),
RowDescriptor::int8_col(b"layers_removed"),
RowDescriptor::int8_col(b"elapsed"),
]))?
.write_message(&BeMessage::DataRow(&[
Some(result.layers_total.to_string().as_bytes()),
Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
Some(result.layers_needed_by_pitr.to_string().as_bytes()),
Some(result.layers_needed_by_branches.to_string().as_bytes()),
Some(result.layers_not_updated.to_string().as_bytes()),
Some(result.layers_removed.to_string().as_bytes()),
Some(result.elapsed.as_millis().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("compact ") {
// Run compaction immediately on given timeline.
// FIXME This is just for tests. Don't expect this to be exposed to
// the users or the api.

// compact <tenant_id> <timeline_id>
let re = Regex::new(r"^compact ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)?").unwrap();

let caps = re
.captures(query_string)
.with_context(|| format!("Invalid compact: '{}'", query_string))?;

let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
timeline.compact()?;

pgb.write_message(&SINGLE_COL_ROWDESC)?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("checkpoint ") {
// Run checkpoint immediately on given timeline.

// checkpoint <tenant_id> <timeline_id>
let re = Regex::new(r"^checkpoint ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)?").unwrap();

let caps = re
.captures(query_string)
.with_context(|| format!("invalid checkpoint command: '{}'", query_string))?;

let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;

// Checkpoint the timeline and also compact it (due to `CheckpointConfig::Forced`).
timeline.checkpoint(CheckpointConfig::Forced)?;

pgb.write_message(&SINGLE_COL_ROWDESC)?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("get_lsn_by_timestamp ") {
// Locate LSN of last transaction with timestamp less or equal than sppecified
// TODO lazy static
Expand Down
Loading