diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index e02b13888931e..66809dba1e850 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1223,7 +1223,7 @@ async fn getpage_at_lsn_handler( .await } -async fn timeline_get_partioning( +async fn timeline_get_partitioning( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { @@ -1233,10 +1233,8 @@ async fn timeline_get_partioning( #[derive(Default)] enum Collect { - Always, - IfNone, #[default] - Never, + Always, } impl std::str::FromStr for Collect { @@ -1245,50 +1243,141 @@ async fn timeline_get_partioning( fn from_str(s: &str) -> std::result::Result { Ok(match s { "always" | "yes" | "true" => Collect::Always, - "if-none" => Collect::IfNone, - "never" | "no" | "false" => Collect::Never, + // "if-none" => Collect::IfNone, + // "never" | "no" | "false" => Collect::Never, s => return Err(anyhow::anyhow!("invalid value: {s:?}")), }) } } - #[serde_with::serde_as] - #[derive(serde::Serialize)] struct Partitioning { - keys: crate::keyspace::KeyPartitioning, + keys: crate::keyspace::KeySpace, - #[serde_as = "serde_with::DisplayFromStr"] at_lsn: Lsn, } - let collect: Option = parse_query_param(&request, "collect"); + impl serde::Serialize for Partitioning { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeMap; + let mut map = serializer.serialize_map(Some(2))?; + map.serialize_key("keys")?; + map.serialize_value(&KeySpace(&self.keys))?; + map.serialize_key("at_lsn")?; + map.serialize_value(&WithDisplay(&self.at_lsn))?; + map.end() + } + } + + struct WithDisplay<'a, T>(&'a T); + + impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serializer.collect_str(&self.0) + } + } + + /* + struct KeySpaces<'a>(&'a [crate::keyspace::KeySpace]); + + impl<'a> serde::Serialize for KeySpaces<'a> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeSeq; + let mut seq = serializer.serialize_seq(Some(self.0.len()))?; + for ks in self.0 { + seq.serialize_element(&KeySpace(ks))?; + } + seq.end() + } + } + */ + + struct KeySpace<'a>(&'a crate::keyspace::KeySpace); + + impl<'a> serde::Serialize for KeySpace<'a> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeSeq; + let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?; + for kr in &self.0.ranges { + seq.serialize_element(&KeyRange(kr))?; + } + seq.end() + } + } + + struct KeyRange<'a>(&'a std::ops::Range); + + impl<'a> serde::Serialize for KeyRange<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeTuple; + let mut t = serializer.serialize_tuple(2)?; + t.serialize_element(&WithDisplay(&self.0.start))?; + t.serialize_element(&WithDisplay(&self.0.end))?; + t.end() + } + } + + let collect: Option = parse_query_param(&request, "collect")?; let collect = collect.unwrap_or_default(); + let at_lsn: Option = parse_query_param(&request, "at_lsn")?; + async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; let partitioning = match collect { Collect::Always => { - let lsn = timeline.get_last_record_lsn(); - Some((timeline.collect_keyspace(lsn, &ctx).await, lsn)) - } - _ => { - let p = timeline.partitioning(); - - match (collect, p) { - (Collect::IfNone, None) => { - let lsn = timeline.get_last_record_lsn(); - Some((timeline.collect_keyspace(lsn, &ctx).await, lsn)) - } - (_, p) => p, - } - } + let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); + ( + timeline + .collect_keyspace(at_lsn, &ctx) + .await + .map_err(|e| ApiError::InternalServerError(e))?, + at_lsn, + ) + } /*_ => { + let p = timeline.partitioning(); + + match (collect, p) { + (Collect::IfNone, None) => { + let lsn = timeline.get_last_record_lsn(); + Some(( + timeline + .collect_keyspace(lsn, &ctx) + .await + .map_err(|e| ApiError::InternalServerError(e))?, + lsn, + )) + } + (_, p) => p.map(|p| (p.0.parts.into_iter().flatten().collect::>(), p.1)), + } + }*/ }; - todo!() + json_response( + StatusCode::OK, + Partitioning { + keys: partitioning.0, + at_lsn: partitioning.1, + }, + ) } - .instrument(info_span!("timeline_get", %tenant_id, %timeline_id)) + .instrument(info_span!("timeline_get_partitioning", %tenant_id, %timeline_id)) .await } @@ -1636,12 +1725,12 @@ pub fn make_router( .post("/v1/tracing/event", |r| { testing_api_handler("emit a tracing event", r, post_tracing_event_handler) }) - .get("/v1/tenant/:tenant_id/timeline/:timeline_id/get", |r| { - testing_api_handler( - "check if page at lsn is successful", - r, - try_timeline_get_handler, - ) + .get("/v1/tenant/:tenant_id/timeline/:timeline_id/getpage", |r| { + testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler) }) + .get( + "/v1/tenant/:tenant_id/timeline/:timeline_id/partitioning", + |r| testing_api_handler("read out the partitioning", r, timeline_get_partitioning), + ) .any(handler_404)) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index bd9633a315a47..a3778ea6f52de 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -440,12 +440,12 @@ impl Timeline { } pub(crate) fn partitioning(&self) -> Option<(KeyPartitioning, Lsn)> { - let (partitioning, at_lsn) = self.partitioning.lock().unwrap(); + let g = self.partitioning.lock().unwrap(); - if partitioning.parts.is_empty() || !at_lsn.is_valid() { + if g.0.parts.is_empty() || !g.1.is_valid() { None } else { - Some((partitioning.clone(), at_lsn)) + Some((g.0.clone(), g.1)) } }