Skip to content

Commit

Permalink
pageserver: refactor TenantId to TenantShardId in Tenant & Timeline (#…
Browse files Browse the repository at this point in the history
…5957)

(includes two preparatory commits from
#5960)

## Problem

To accommodate multiple shards in the same tenant on the same
pageserver, we must include the full TenantShardId in local paths. That
means that all code touching local storage needs to see the
TenantShardId.

## Summary of changes

- Replace `tenant_id: TenantId` with `tenant_shard_id: TenantShardId` on
Tenant, Timeline and RemoteTimelineClient.
- Use TenantShardId in helpers for building local paths.
- Update all the relevant call sites.

This doesn't update absolutely everything: things like PageCache,
TaskMgr, WalRedo are still shard-naive. The purpose of this PR is to
update the core types so that others code can be added/updated
incrementally without churning the most central shared types.
  • Loading branch information
jcsp authored Nov 29, 2023
1 parent 70b5646 commit 9e55ad4
Show file tree
Hide file tree
Showing 37 changed files with 691 additions and 613 deletions.
7 changes: 5 additions & 2 deletions control_plane/src/bin/attachment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use clap::Parser;
use hex::FromHex;
use hyper::StatusCode;
use hyper::{Body, Request, Response};
use pageserver_api::shard::TenantShardId;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -173,7 +174,8 @@ async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiE
if state.pageserver == Some(reattach_req.node_id) {
state.generation += 1;
response.tenants.push(ReAttachResponseTenant {
id: *t,
// TODO(sharding): make this shard-aware
id: TenantShardId::unsharded(*t),
gen: state.generation,
});
}
Expand All @@ -196,7 +198,8 @@ async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiEr
};

for req_tenant in validate_req.tenants {
if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
// TODO(sharding): make this shard-aware
if let Some(tenant_state) = locked.tenants.get(&req_tenant.id.tenant_id) {
let valid = tenant_state.generation == req_tenant.gen;
response.tenants.push(ValidateResponseTenant {
id: req_tenant.id,
Expand Down
10 changes: 6 additions & 4 deletions libs/pageserver_api/src/control_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
//! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId};
use utils::id::NodeId;

use crate::shard::TenantShardId;

#[derive(Serialize, Deserialize)]
pub struct ReAttachRequest {
Expand All @@ -13,7 +15,7 @@ pub struct ReAttachRequest {

#[derive(Serialize, Deserialize)]
pub struct ReAttachResponseTenant {
pub id: TenantId,
pub id: TenantShardId,
pub gen: u32,
}

Expand All @@ -24,7 +26,7 @@ pub struct ReAttachResponse {

#[derive(Serialize, Deserialize)]
pub struct ValidateRequestTenant {
pub id: TenantId,
pub id: TenantShardId,
pub gen: u32,
}

Expand All @@ -40,6 +42,6 @@ pub struct ValidateResponse {

#[derive(Serialize, Deserialize)]
pub struct ValidateResponseTenant {
pub id: TenantId,
pub id: TenantShardId,
pub valid: bool,
}
6 changes: 3 additions & 3 deletions libs/pageserver_api/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use serde::{Deserialize, Serialize};
use thiserror;
use utils::id::TenantId;

#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardNumber(pub u8);

#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(pub u8);

impl ShardCount {
Expand Down Expand Up @@ -39,7 +39,7 @@ impl ShardNumber {
/// Note that the binary encoding is _not_ backward compatible, because
/// at the time sharding is introduced, there are no existing binary structures
/// containing TenantId that we need to handle.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
Expand Down
3 changes: 2 additions & 1 deletion pageserver/benches/bench_layer_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::storage_layer::PersistentLayerDesc;
use pageserver_api::shard::TenantShardId;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min};
use std::fs::File;
Expand Down Expand Up @@ -211,7 +212,7 @@ fn bench_sequential(c: &mut Criterion) {
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = PersistentLayerDesc::new_img(
TenantId::generate(),
TenantShardId::unsharded(TenantId::generate()),
TimelineId::generate(),
zero.add(10 * i32)..zero.add(10 * i32 + 1),
Lsn(i),
Expand Down
58 changes: 35 additions & 23 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
use pageserver_api::shard::TenantShardId;
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde::de::IntoDeserializer;
use std::env;
Expand All @@ -25,7 +26,7 @@ use toml_edit::{Document, Item};
use camino::{Utf8Path, Utf8PathBuf};
use postgres_backend::AuthType;
use utils::{
id::{NodeId, TenantId, TimelineId},
id::{NodeId, TimelineId},
logging::LogFormat,
};

Expand Down Expand Up @@ -628,60 +629,67 @@ impl PageServerConf {
self.deletion_prefix().join(format!("header-{VERSION:02x}"))
}

pub fn tenant_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenants_path().join(tenant_id.to_string())
pub fn tenant_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
self.tenants_path().join(tenant_shard_id.to_string())
}

pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME)
pub fn tenant_ignore_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
self.tenant_path(tenant_shard_id)
.join(IGNORED_TENANT_FILE_NAME)
}

/// Points to a place in pageserver's local directory,
/// where certain tenant's tenantconf file should be located.
///
/// Legacy: superseded by tenant_location_config_path. Eventually
/// remove this function.
pub fn tenant_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(TENANT_CONFIG_NAME)
pub fn tenant_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
self.tenant_path(tenant_shard_id).join(TENANT_CONFIG_NAME)
}

pub fn tenant_location_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id)
pub fn tenant_location_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
self.tenant_path(tenant_shard_id)
.join(TENANT_LOCATION_CONFIG_NAME)
}

pub fn timelines_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(TIMELINES_SEGMENT_NAME)
pub fn timelines_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
self.tenant_path(tenant_shard_id)
.join(TIMELINES_SEGMENT_NAME)
}

pub fn timeline_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> Utf8PathBuf {
self.timelines_path(tenant_id).join(timeline_id.to_string())
pub fn timeline_path(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Utf8PathBuf {
self.timelines_path(tenant_shard_id)
.join(timeline_id.to_string())
}

pub fn timeline_uninit_mark_file_path(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Utf8PathBuf {
path_with_suffix_extension(
self.timeline_path(&tenant_id, &timeline_id),
self.timeline_path(&tenant_shard_id, &timeline_id),
TIMELINE_UNINIT_MARK_SUFFIX,
)
}

pub fn timeline_delete_mark_file_path(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Utf8PathBuf {
path_with_suffix_extension(
self.timeline_path(&tenant_id, &timeline_id),
self.timeline_path(&tenant_shard_id, &timeline_id),
TIMELINE_DELETE_MARK_SUFFIX,
)
}

pub fn tenant_deleted_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id)
pub fn tenant_deleted_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
self.tenant_path(tenant_shard_id)
.join(TENANT_DELETED_MARKER_FILE_NAME)
}

Expand All @@ -691,20 +699,24 @@ impl PageServerConf {

pub fn trace_path(
&self,
tenant_id: &TenantId,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
connection_id: &ConnectionId,
) -> Utf8PathBuf {
self.traces_path()
.join(tenant_id.to_string())
.join(tenant_shard_id.to_string())
.join(timeline_id.to_string())
.join(connection_id.to_string())
}

/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> Utf8PathBuf {
self.timeline_path(tenant_id, timeline_id)
pub fn metadata_path(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Utf8PathBuf {
self.timeline_path(tenant_shard_id, timeline_id)
.join(METADATA_FILE_NAME)
}

Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/consumption_metrics/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ impl TimelineSnapshot {
let last_record_lsn = t.get_last_record_lsn();

let current_exact_logical_size = {
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
let res = span
.in_scope(|| t.get_current_logical_size(ctx))
.context("get_current_logical_size");
Expand Down
25 changes: 12 additions & 13 deletions pageserver/src/control_plane_client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::collections::HashMap;

use pageserver_api::control_api::{
ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
use pageserver_api::{
control_api::{
ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
},
shard::TenantShardId,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio_util::sync::CancellationToken;
use url::Url;
use utils::{
backoff,
generation::Generation,
id::{NodeId, TenantId},
};
use utils::{backoff, generation::Generation, id::NodeId};

use crate::config::PageServerConf;

Expand All @@ -31,11 +30,11 @@ pub enum RetryForeverError {

#[async_trait::async_trait]
pub trait ControlPlaneGenerationsApi {
async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError>;
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError>;
async fn validate(
&self,
tenants: Vec<(TenantId, Generation)>,
) -> Result<HashMap<TenantId, bool>, RetryForeverError>;
tenants: Vec<(TenantShardId, Generation)>,
) -> Result<HashMap<TenantShardId, bool>, RetryForeverError>;
}

impl ControlPlaneClient {
Expand Down Expand Up @@ -127,7 +126,7 @@ impl ControlPlaneClient {
#[async_trait::async_trait]
impl ControlPlaneGenerationsApi for ControlPlaneClient {
/// Block until we get a successful response, or error out if we are shut down
async fn re_attach(&self) -> Result<HashMap<TenantId, Generation>, RetryForeverError> {
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
let re_attach_path = self
.base_url
.join("re-attach")
Expand All @@ -154,8 +153,8 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
/// Block until we get a successful response, or error out if we are shut down
async fn validate(
&self,
tenants: Vec<(TenantId, Generation)>,
) -> Result<HashMap<TenantId, bool>, RetryForeverError> {
tenants: Vec<(TenantShardId, Generation)>,
) -> Result<HashMap<TenantShardId, bool>, RetryForeverError> {
let re_attach_path = self
.base_url
.join("validate")
Expand Down
Loading

1 comment on commit 9e55ad4

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2468 tests run: 2367 passed, 0 failed, 101 skipped (full report)


Flaky tests (6)

Postgres 16

  • test_crafted_wal_end[last_wal_record_xlog_switch_ends_on_page_boundary]: debug
  • test_crafted_wal_end[last_wal_record_crossing_segment]: debug
  • test_ondemand_download_timetravel[local_fs]: debug
  • test_ondemand_download_timetravel[mock_s3]: debug

Postgres 14

  • test_competing_branchings_from_loading_race_to_ok_or_err: release
  • test_timeline_deletion_with_files_stuck_in_upload_queue: debug

Code coverage (full report)

  • functions: 54.5% (9176 of 16844 functions)
  • lines: 82.1% (53523 of 65174 lines)

The comment gets automatically updated with the latest test results
9e55ad4 at 2023-11-29T15:35:09.579Z :recycle:

Please sign in to comment.