Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

automatic layer eviction #3552

Merged
merged 1 commit into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-utils = "0.8.5"
either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"
Expand Down
1 change: 1 addition & 0 deletions control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["blocking", "json"] }
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
tar.workspace = true
thiserror.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.get("eviction_policy")
.map(|x| serde_json::from_str(x))
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
problame marked this conversation as resolved.
Show resolved Hide resolved
})
.send()?
.error_from_body()?;
Expand Down
1 change: 1 addition & 0 deletions libs/pageserver_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ byteorder.workspace = true
utils.workspace = true
postgres_ffi.workspace = true
enum-map.workspace = true
serde_json.workspace = true

workspace_hack.workspace = true
12 changes: 9 additions & 3 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ pub struct TenantConfigRequest {
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
// We defer the parsing of the eviction_policy field to the request handler.
// Otherwise we'd have to move the types for eviction policy into this package.
// We might do that once the eviction feature has stabilizied.
// For now, this field is not even documented in the openapi_spec.yml.
pub eviction_policy: Option<serde_json::Value>,
}

impl TenantConfigRequest {
Expand All @@ -174,6 +179,7 @@ impl TenantConfigRequest {
lagging_wal_timeout: None,
max_lsn_wal_lag: None,
trace_read_requests: None,
eviction_policy: None,
}
}
}
Expand Down Expand Up @@ -263,11 +269,11 @@ pub struct LayerResidenceEvent {
///
#[serde(rename = "timestamp_millis_since_epoch")]
#[serde_as(as = "serde_with::TimestampMilliSeconds")]
timestamp: SystemTime,
pub timestamp: SystemTime,
/// The new residence status of the layer.
status: LayerResidenceStatus,
pub status: LayerResidenceStatus,
/// The reason why we had to record this event.
reason: LayerResidenceEventReason,
pub reason: LayerResidenceEventReason,
}

/// The reason for recording a given [`ResidenceEvent`].
Expand Down
3 changes: 2 additions & 1 deletion pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const_format.workspace = true
consumption_metrics.workspace = true
crc32c.workspace = true
crossbeam-utils.workspace = true
either.workspace = true
fail.workspace = true
futures.workspace = true
git-version.workspace = true
Expand Down Expand Up @@ -51,7 +52,7 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-postgres.workspace = true
tokio-util.workspace = true
toml_edit.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tracing.workspace = true
url.workspace = true
walkdir.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,13 @@ impl PageServerConf {
})?);
}

if let Some(eviction_policy) = item.get("eviction_policy") {
t_conf.eviction_policy = Some(
toml_edit::de::from_item(eviction_policy.clone())
koivunej marked this conversation as resolved.
Show resolved Hide resolved
.context("parse eviction_policy")?,
);
}

Ok(t_conf)
}

Expand Down
8 changes: 8 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,14 @@ async fn update_tenant_config_handler(
);
}

if let Some(eviction_policy) = request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde_json::from_value(eviction_policy)
.context("parse field `eviction_policy`")
.map_err(ApiError::BadRequest)?,
);
}

let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
Expand Down
3 changes: 3 additions & 0 deletions pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ pub enum TaskKind {
// Compaction. One per tenant.
Compaction,

// Eviction. One per timeline.
Eviction,

// Initial logical size calculation
InitialLogicalSizeCalculation,

Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2757,6 +2757,7 @@ pub mod harness {
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
trace_read_requests: Some(tenant_conf.trace_read_requests),
eviction_policy: Some(tenant_conf.eviction_policy),
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions pageserver/src/tenant/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub struct TenantConf {
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
pub trace_read_requests: bool,
pub eviction_policy: EvictionPolicy,
}

/// Same as TenantConf, but this struct preserves the information about
Expand Down Expand Up @@ -153,6 +154,34 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub trace_read_requests: Option<bool>,

#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub eviction_policy: Option<EvictionPolicy>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum EvictionPolicy {
NoEviction,
LayerAccessThreshold(EvictionPolicyLayerAccessThreshold),
}

impl EvictionPolicy {
pub fn discriminant_str(&self) -> &'static str {
match self {
EvictionPolicy::NoEviction => "NoEviction",
EvictionPolicy::LayerAccessThreshold(_) => "LayerAccessThreshold",
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct EvictionPolicyLayerAccessThreshold {
#[serde(with = "humantime_serde")]
pub period: Duration,
#[serde(with = "humantime_serde")]
pub threshold: Duration,
}

impl TenantConfOpt {
Expand Down Expand Up @@ -189,6 +218,7 @@ impl TenantConfOpt {
trace_read_requests: self
.trace_read_requests
.unwrap_or(global_conf.trace_read_requests),
eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy),
}
}

Expand Down Expand Up @@ -261,6 +291,7 @@ impl Default for TenantConf {
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false,
eviction_policy: EvictionPolicy::NoEviction,
}
}
}
Expand Down
73 changes: 55 additions & 18 deletions pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use either::Either;
use enum_map::EnumMap;
use enumset::EnumSet;
use pageserver_api::models::LayerAccessKind;
Expand Down Expand Up @@ -92,7 +93,23 @@ pub enum ValueReconstructResult {
}

#[derive(Debug)]
pub struct LayerAccessStats(Mutex<LayerAccessStatsInner>);
pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);

/// This struct holds two instances of [`LayerAccessStatsInner`].
/// Accesses are recorded to both instances.
/// The `for_scraping_api`instance can be reset from the management API via [`LayerAccessStatsReset`].
/// The `for_eviction_policy` is never reset.
#[derive(Debug, Default, Clone)]
struct LayerAccessStatsLocked {
for_scraping_api: LayerAccessStatsInner,
for_eviction_policy: LayerAccessStatsInner,
}

impl LayerAccessStatsLocked {
fn iter_mut(&mut self) -> impl Iterator<Item = &mut LayerAccessStatsInner> {
[&mut self.for_scraping_api, &mut self.for_eviction_policy].into_iter()
}
}

#[derive(Debug, Default, Clone)]
struct LayerAccessStatsInner {
Expand All @@ -104,10 +121,10 @@ struct LayerAccessStatsInner {
}

#[derive(Debug, Clone, Copy)]
struct LayerAccessStatFullDetails {
when: SystemTime,
task_kind: TaskKind,
access_kind: LayerAccessKind,
pub(super) struct LayerAccessStatFullDetails {
pub(super) when: SystemTime,
pub(super) task_kind: TaskKind,
pub(super) access_kind: LayerAccessKind,
}

#[derive(Clone, Copy, strum_macros::EnumString)]
Expand Down Expand Up @@ -142,13 +159,13 @@ impl LayerAccessStatFullDetails {

impl LayerAccessStats {
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
new
}

pub(crate) fn for_new_layer_file() -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsInner::default()));
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
Expand Down Expand Up @@ -176,37 +193,43 @@ impl LayerAccessStats {
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) {
let mut inner = self.0.lock().unwrap();
inner
.last_residence_changes
.write(LayerResidenceEvent::new(status, reason));
let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner
.last_residence_changes
.write(LayerResidenceEvent::new(status, reason))
});
}

fn record_access(&self, access_kind: LayerAccessKind, task_kind: TaskKind) {
let mut inner = self.0.lock().unwrap();
let this_access = LayerAccessStatFullDetails {
when: SystemTime::now(),
task_kind,
access_kind,
};
inner.first_access.get_or_insert(this_access);
inner.count_by_access_kind[access_kind] += 1;
inner.task_kind_flag |= task_kind;
inner.last_accesses.write(this_access);

let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner.first_access.get_or_insert(this_access);
inner.count_by_access_kind[access_kind] += 1;
inner.task_kind_flag |= task_kind;
inner.last_accesses.write(this_access);
})
}

fn as_api_model(
&self,
reset: LayerAccessStatsReset,
) -> pageserver_api::models::LayerAccessStats {
let mut inner = self.0.lock().unwrap();
let mut locked = self.0.lock().unwrap();
let inner = &mut locked.for_scraping_api;
let LayerAccessStatsInner {
first_access,
count_by_access_kind,
task_kind_flag,
last_accesses,
last_residence_changes,
} = &*inner;
} = inner;
let ret = pageserver_api::models::LayerAccessStats {
access_count_by_access_kind: count_by_access_kind
.iter()
Expand All @@ -231,6 +254,20 @@ impl LayerAccessStats {
}
ret
}

pub(super) fn most_recent_access_or_residence_event(
&self,
) -> Either<LayerAccessStatFullDetails, LayerResidenceEvent> {
let locked = self.0.lock().unwrap();
let inner = &locked.for_eviction_policy;
match inner.last_accesses.recent() {
Some(a) => Either::Left(*a),
None => match inner.last_residence_changes.recent() {
Some(e) => Either::Right(e.clone()),
None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"),
}
}
}
}

/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
Expand Down
Loading