Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove old WAL on safekeepers. #1550

Merged
merged 1 commit into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use url::{ParseError, Url};

use safekeeper::control_file::{self};
use safekeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use safekeeper::http;
use safekeeper::s3_offload;
use safekeeper::remove_wal;
use safekeeper::wal_service;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, callmemaybe};
use safekeeper::{http, s3_offload};
use utils::{
http::endpoint, logging, shutdown::exit_now, signals, tcp_listener, zid::ZNodeId, GIT_VERSION,
};
Expand Down Expand Up @@ -292,6 +292,15 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
);
}

let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("WAL removal thread".into())
.spawn(|| {
remove_wal::thread_main(conf_);
})?,
);

// TODO: put more thoughts into handling of failed threads
// We probably should restart them.

Expand Down
7 changes: 6 additions & 1 deletion safekeeper/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,28 @@ const ZENITH_PREFIX: &str = "zenith";

/// Published data about safekeeper. Fields made optional for easy migrations.
#[serde_as]
#[derive(Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct SafekeeperInfo {
/// Term of the last entry.
pub last_log_term: Option<Term>,
/// LSN of the last record.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub flush_lsn: Option<Lsn>,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub commit_lsn: Option<Lsn>,
/// LSN up to which safekeeper offloaded WAL to s3.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub s3_wal_lsn: Option<Lsn>,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub remote_consistent_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
}

Expand Down
20 changes: 20 additions & 0 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::Serializer;
use std::fmt::Display;
use std::sync::Arc;

use crate::broker::SafekeeperInfo;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
use crate::timeline::GlobalTimelines;
Expand Down Expand Up @@ -123,6 +124,20 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
json_response(StatusCode::CREATED, ())
}

/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let zttid = ZTenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let safekeeper_info: SafekeeperInfo = json_request(&mut request).await?;

let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
tli.record_safekeeper_info(&safekeeper_info, ZNodeId(1))?;

json_response(StatusCode::OK, ())
}

/// Safekeeper http router.
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
let router = endpoint::make_router();
Expand All @@ -134,4 +149,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
timeline_status_handler,
)
.post("/v1/timeline", timeline_create_handler)
// for tests
.post(
"/v1/record_safekeeper_info/:tenant_id/:timeline_id",
record_safekeeper_info,
)
}
1 change: 1 addition & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod handler;
pub mod http;
pub mod json_ctrl;
pub mod receive_wal;
pub mod remove_wal;
pub mod s3_offload;
pub mod safekeeper;
pub mod send_wal;
Expand Down
25 changes: 25 additions & 0 deletions safekeeper/src/remove_wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//! Thread removing old WAL.

use std::{thread, time::Duration};

use tracing::*;

use crate::{timeline::GlobalTimelines, SafeKeeperConf};

pub fn thread_main(conf: SafeKeeperConf) {
let wal_removal_interval = Duration::from_millis(5000);
loop {
let active_tlis = GlobalTimelines::get_active_timelines();
for zttid in &active_tlis {
if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) {
if let Err(e) = tli.remove_old_wal() {
warn!(
"failed to remove WAL for tenant {} timeline {}: {}",
tli.zttid.tenant_id, tli.zttid.timeline_id, e
);
}
}
}
thread::sleep(wal_removal_interval)
}
}
24 changes: 24 additions & 0 deletions safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};

use postgres_ffi::xlog_utils::TimeLineID;

use postgres_ffi::xlog_utils::XLogSegNo;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
Expand Down Expand Up @@ -880,6 +882,24 @@ where
}
Ok(())
}

/// Get oldest segno we still need to keep. We hold WAL till it is consumed
/// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
pub fn get_horizon_segno(&self) -> XLogSegNo {
let horizon_lsn = min(
min(
self.state.remote_consistent_lsn,
self.state.peer_horizon_lsn,
),
self.state.s3_wal_lsn,
);
let res = horizon_lsn.segment_number(self.state.server.wal_seg_size as usize);
info!("horizon is {}, res {}", horizon_lsn, res);
res
}
}

#[cfg(test)]
Expand Down Expand Up @@ -935,6 +955,10 @@ mod tests {
fn flush_wal(&mut self) -> Result<()> {
Ok(())
}

fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
Box::new(move |_segno_up_to: XLogSegNo| Ok(()))
}
}

#[test]
Expand Down
24 changes: 24 additions & 0 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use anyhow::{bail, Context, Result};

use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::XLogSegNo;

use std::cmp::{max, min};
use std::collections::HashMap;
Expand Down Expand Up @@ -88,6 +89,7 @@ struct SharedState {
active: bool,
num_computes: u32,
pageserver_connstr: Option<String>,
last_removed_segno: XLogSegNo,
}

impl SharedState {
Expand All @@ -109,6 +111,7 @@ impl SharedState {
active: false,
num_computes: 0,
pageserver_connstr: None,
last_removed_segno: 0,
})
}

Expand All @@ -127,6 +130,7 @@ impl SharedState {
active: false,
num_computes: 0,
pageserver_connstr: None,
last_removed_segno: 0,
})
}

Expand Down Expand Up @@ -459,6 +463,26 @@ impl Timeline {
let shared_state = self.mutex.lock().unwrap();
shared_state.sk.wal_store.flush_lsn()
}

pub fn remove_old_wal(&self) -> Result<()> {
let horizon_segno: XLogSegNo;
let remover: Box<dyn Fn(u64) -> Result<(), anyhow::Error>>;
{
let shared_state = self.mutex.lock().unwrap();
horizon_segno = shared_state.sk.get_horizon_segno();
remover = shared_state.sk.wal_store.remove_up_to();
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(());
}
// release the lock before removing
}
let _enter =
info_span!("", timeline = %self.zttid.tenant_id, tenant = %self.zttid.timeline_id)
.entered();
remover(horizon_segno - 1)?;
self.mutex.lock().unwrap().last_removed_segno = horizon_segno;
Ok(())
}
}

// Utilities needed by various Connection-like objects
Expand Down
48 changes: 46 additions & 2 deletions safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ use anyhow::{anyhow, bail, Context, Result};
use std::io::{Read, Seek, SeekFrom};

use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::{find_end_of_wal, XLogSegNo, PG_TLI};
use postgres_ffi::xlog_utils::{
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, PG_TLI,
};
use std::cmp::min;

use std::fs::{self, File, OpenOptions};
use std::fs::{self, remove_file, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};

Expand Down Expand Up @@ -101,6 +103,10 @@ pub trait Storage {

/// Durably store WAL on disk, up to the last written WAL record.
fn flush_wal(&mut self) -> Result<()>;

/// Remove all segments <= given segno. Returns closure as we want to do
/// that without timeline lock.
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>>;
}

/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
Expand Down Expand Up @@ -466,6 +472,44 @@ impl Storage for PhysicalStorage {
self.update_flush_lsn();
Ok(())
}

fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
let timeline_dir = self.timeline_dir.clone();
let wal_seg_size = self.wal_seg_size.unwrap();
Box::new(move |segno_up_to: XLogSegNo| {
remove_up_to(&timeline_dir, wal_seg_size, segno_up_to)
})
}
}

/// Remove all WAL segments in timeline_dir <= given segno.
fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo) -> Result<()> {
let mut n_removed = 0;
for entry in fs::read_dir(&timeline_dir)? {
let entry = entry?;
let entry_path = entry.path();
let fname = entry_path.file_name().unwrap();

if let Some(fname_str) = fname.to_str() {
/* Ignore files that are not XLOG segments */
if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
continue;
}
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
if segno <= segno_up_to {
remove_file(entry_path)?;
n_removed += 1;
}
}
}
let segno_from = segno_up_to - n_removed + 1;
info!(
"removed {} WAL segments [{}; {}]",
n_removed,
XLogFileName(PG_TLI, segno_from, wal_seg_size),
XLogFileName(PG_TLI, segno_up_to, wal_seg_size)
);
Ok(())
}

pub struct WalReader {
Expand Down
49 changes: 49 additions & 0 deletions test_runner/batch_others/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,55 @@ def test_broker(zenith_env_builder: ZenithEnvBuilder):
time.sleep(0.5)


# Test that old WAL consumed by peers and pageserver is removed from safekeepers.
@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH")
def test_wal_removal(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
zenith_env_builder.broker = True
# to advance remote_consistent_llsn
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init_start()

env.zenith_cli.create_branch('test_safekeepers_wal_removal')
pg = env.postgres.create_start('test_safekeepers_wal_removal')

with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")

tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]

# force checkpoint to advance remote_consistent_lsn
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")

# We will wait for first segment removal. Make sure they exist for starter.
first_segments = [
os.path.join(sk.data_dir(), tenant_id, timeline_id, '000000010000000000000001')
for sk in env.safekeepers
]
assert all(os.path.exists(p) for p in first_segments)

http_cli = env.safekeepers[0].http_client()
# Pretend WAL is offloaded to s3.
http_cli.record_safekeeper_info(tenant_id, timeline_id, {'s3_wal_lsn': 'FFFFFFFF/FEFFFFFF'})

# wait till first segment is removed on all safekeepers
started_at = time.time()
while True:
if all(not os.path.exists(p) for p in first_segments):
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for first segment get removed")
time.sleep(0.5)


class ProposerPostgres(PgProtocol):
"""Object for running postgres without ZenithEnv"""
def __init__(self,
Expand Down
9 changes: 9 additions & 0 deletions test_runner/fixtures/zenith_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,9 @@ def append_logical_message(self,
def http_client(self) -> SafekeeperHttpClient:
return SafekeeperHttpClient(port=self.port.http)

def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")


@dataclass
class SafekeeperTimelineStatus:
Expand Down Expand Up @@ -1770,6 +1773,12 @@ def timeline_status(self, tenant_id: str, timeline_id: str) -> SafekeeperTimelin
flush_lsn=resj['flush_lsn'],
remote_consistent_lsn=resj['remote_consistent_lsn'])

def record_safekeeper_info(self, tenant_id: str, timeline_id: str, body):
res = self.post(
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
json=body)
res.raise_for_status()

def get_metrics(self) -> SafekeeperMetrics:
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()
Expand Down