Skip to content

Commit

Permalink
http api for getting the partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
koivunej committed Oct 3, 2023
1 parent a36f1bc commit 3f34d48
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 36 deletions.
155 changes: 122 additions & 33 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ async fn getpage_at_lsn_handler(
.await
}

async fn timeline_get_partioning(
async fn timeline_get_partitioning(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
Expand All @@ -1233,10 +1233,8 @@ async fn timeline_get_partioning(

#[derive(Default)]
enum Collect {
Always,
IfNone,
#[default]
Never,
Always,
}

impl std::str::FromStr for Collect {
Expand All @@ -1245,50 +1243,141 @@ async fn timeline_get_partioning(
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Ok(match s {
"always" | "yes" | "true" => Collect::Always,
"if-none" => Collect::IfNone,
"never" | "no" | "false" => Collect::Never,
// "if-none" => Collect::IfNone,
// "never" | "no" | "false" => Collect::Never,
s => return Err(anyhow::anyhow!("invalid value: {s:?}")),
})
}
}

#[serde_with::serde_as]
#[derive(serde::Serialize)]
struct Partitioning {
keys: crate::keyspace::KeyPartitioning,
keys: crate::keyspace::KeySpace,

#[serde_as = "serde_with::DisplayFromStr"]
at_lsn: Lsn,
}

let collect: Option<Collect> = parse_query_param(&request, "collect");
impl serde::Serialize for Partitioning {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_key("keys")?;
map.serialize_value(&KeySpace(&self.keys))?;
map.serialize_key("at_lsn")?;
map.serialize_value(&WithDisplay(&self.at_lsn))?;
map.end()
}
}

struct WithDisplay<'a, T>(&'a T);

impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(&self.0)
}
}

/*
struct KeySpaces<'a>(&'a [crate::keyspace::KeySpace]);
impl<'a> serde::Serialize for KeySpaces<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
for ks in self.0 {
seq.serialize_element(&KeySpace(ks))?;
}
seq.end()
}
}
*/

struct KeySpace<'a>(&'a crate::keyspace::KeySpace);

impl<'a> serde::Serialize for KeySpace<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
for kr in &self.0.ranges {
seq.serialize_element(&KeyRange(kr))?;
}
seq.end()
}
}

struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);

impl<'a> serde::Serialize for KeyRange<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(2)?;
t.serialize_element(&WithDisplay(&self.0.start))?;
t.serialize_element(&WithDisplay(&self.0.end))?;
t.end()
}
}

let collect: Option<Collect> = parse_query_param(&request, "collect")?;
let collect = collect.unwrap_or_default();

let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;

async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;

let partitioning = match collect {
Collect::Always => {
let lsn = timeline.get_last_record_lsn();
Some((timeline.collect_keyspace(lsn, &ctx).await, lsn))
}
_ => {
let p = timeline.partitioning();

match (collect, p) {
(Collect::IfNone, None) => {
let lsn = timeline.get_last_record_lsn();
Some((timeline.collect_keyspace(lsn, &ctx).await, lsn))
}
(_, p) => p,
}
}
let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
(
timeline
.collect_keyspace(at_lsn, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e))?,
at_lsn,
)
} /*_ => {
let p = timeline.partitioning();
match (collect, p) {
(Collect::IfNone, None) => {
let lsn = timeline.get_last_record_lsn();
Some((
timeline
.collect_keyspace(lsn, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e))?,
lsn,
))
}
(_, p) => p.map(|p| (p.0.parts.into_iter().flatten().collect::<Vec<_>>(), p.1)),
}
}*/
};

todo!()
json_response(
StatusCode::OK,
Partitioning {
keys: partitioning.0,
at_lsn: partitioning.1,
},
)
}
.instrument(info_span!("timeline_get", %tenant_id, %timeline_id))
.instrument(info_span!("timeline_get_partitioning", %tenant_id, %timeline_id))
.await
}

Expand Down Expand Up @@ -1636,12 +1725,12 @@ pub fn make_router(
.post("/v1/tracing/event", |r| {
testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/get", |r| {
testing_api_handler(
"check if page at lsn is successful",
r,
try_timeline_get_handler,
)
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/getpage", |r| {
testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/partitioning",
|r| testing_api_handler("read out the partitioning", r, timeline_get_partitioning),
)
.any(handler_404))
}
6 changes: 3 additions & 3 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,12 @@ impl Timeline {
}

pub(crate) fn partitioning(&self) -> Option<(KeyPartitioning, Lsn)> {
let (partitioning, at_lsn) = self.partitioning.lock().unwrap();
let g = self.partitioning.lock().unwrap();

if partitioning.parts.is_empty() || !at_lsn.is_valid() {
if g.0.parts.is_empty() || !g.1.is_valid() {
None
} else {
Some((partitioning.clone(), at_lsn))
Some((g.0.clone(), g.1))
}
}

Expand Down

0 comments on commit 3f34d48

Please sign in to comment.