diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 9d478d3a616b..667b16fb9062 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -47,6 +47,7 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{convert::TryInto, ops::AddAssign}; +use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -62,7 +63,7 @@ pub struct PageCache { walredo_mgr: WalRedoManager, // Allows .await on the arrival of a particular LSN. - seqwait_lsn: SeqWait, + seqwait_lsn: SeqWait, // Counters, for metrics collection. pub num_entries: AtomicU64, @@ -72,9 +73,9 @@ pub struct PageCache { // copies of shared.first/last_valid_lsn fields (copied here so // that they can be read without acquiring the mutex). - pub first_valid_lsn: AtomicU64, - pub last_valid_lsn: AtomicU64, - pub last_record_lsn: AtomicU64, + first_valid_lsn: AtomicLsn, + last_valid_lsn: AtomicLsn, + last_record_lsn: AtomicLsn, } #[derive(Clone)] @@ -83,22 +84,14 @@ pub struct PageCacheStats { pub num_page_images: u64, pub num_wal_records: u64, pub num_getpage_requests: u64, - pub first_valid_lsn: u64, - pub last_valid_lsn: u64, - pub last_record_lsn: u64, } impl AddAssign for PageCacheStats { fn add_assign(&mut self, other: Self) { - *self = Self { - num_entries: self.num_entries + other.num_entries, - num_page_images: self.num_page_images + other.num_page_images, - num_wal_records: self.num_wal_records + other.num_wal_records, - num_getpage_requests: self.num_getpage_requests + other.num_getpage_requests, - first_valid_lsn: self.first_valid_lsn + other.first_valid_lsn, - last_valid_lsn: self.last_valid_lsn + other.last_valid_lsn, - last_record_lsn: self.last_record_lsn + other.last_record_lsn, - } + self.num_entries += other.num_entries; + self.num_page_images += other.num_page_images; + self.num_wal_records += other.num_wal_records; + self.num_getpage_requests += other.num_getpage_requests; } } @@ -120,9 +113,9 @@ struct PageCacheShared { // walreceiver.rs instead of here, but it seems convenient to keep all three // values together. // - first_valid_lsn: u64, - last_valid_lsn: u64, - last_record_lsn: u64, + first_valid_lsn: Lsn, + last_valid_lsn: Lsn, + last_record_lsn: Lsn, } lazy_static! { @@ -204,25 +197,25 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { PageCache { shared: Mutex::new(PageCacheShared { - first_valid_lsn: 0, - last_valid_lsn: 0, - last_record_lsn: 0, + first_valid_lsn: Lsn(0), + last_valid_lsn: Lsn(0), + last_record_lsn: Lsn(0), }), db: open_rocksdb(&conf, timelineid), walredo_mgr: WalRedoManager::new(conf, timelineid), - seqwait_lsn: SeqWait::new(0), + seqwait_lsn: SeqWait::new(Lsn(0)), num_entries: AtomicU64::new(0), num_page_images: AtomicU64::new(0), num_wal_records: AtomicU64::new(0), num_getpage_requests: AtomicU64::new(0), - first_valid_lsn: AtomicU64::new(0), - last_valid_lsn: AtomicU64::new(0), - last_record_lsn: AtomicU64::new(0), + first_valid_lsn: AtomicLsn::new(0), + last_valid_lsn: AtomicLsn::new(0), + last_record_lsn: AtomicLsn::new(0), } } @@ -242,18 +235,18 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct CacheKey { pub tag: BufferTag, - pub lsn: u64, + pub lsn: Lsn, } impl CacheKey { pub fn pack(&self, buf: &mut BytesMut) { self.tag.pack(buf); - buf.put_u64(self.lsn); + buf.put_u64(self.lsn.0); } pub fn unpack(buf: &mut BytesMut) -> CacheKey { CacheKey { tag: BufferTag::unpack(buf), - lsn: buf.get_u64(), + lsn: Lsn::from(buf.get_u64()), } } } @@ -343,7 +336,7 @@ impl BufferTag { #[derive(Debug, Clone)] pub struct WALRecord { - pub lsn: u64, // LSN at the *end* of the record + pub lsn: Lsn, // LSN at the *end* of the record pub will_init: bool, pub truncate: bool, pub rec: Bytes, @@ -355,7 +348,7 @@ pub struct WALRecord { impl WALRecord { pub fn pack(&self, buf: &mut BytesMut) { - buf.put_u64(self.lsn); + buf.put_u64(self.lsn.0); buf.put_u8(self.will_init as u8); buf.put_u8(self.truncate as u8); buf.put_u32(self.main_data_offset); @@ -363,7 +356,7 @@ impl WALRecord { buf.put_slice(&self.rec[..]); } pub fn unpack(buf: &mut BytesMut) -> WALRecord { - let lsn = buf.get_u64(); + let lsn = Lsn::from(buf.get_u64()); let will_init = buf.get_u8() != 0; let truncate = buf.get_u8() != 0; let main_data_offset = buf.get_u32(); @@ -387,7 +380,7 @@ impl PageCache { /// /// Returns an 8k page image /// - pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { + pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); let lsn = self.wait_lsn(req_lsn).await?; @@ -448,7 +441,7 @@ impl PageCache { /// /// Get size of relation at given LSN. /// - pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + pub async fn relsize_get(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { self.wait_lsn(lsn).await?; return self.relsize_get_nowait(rel, lsn); } @@ -456,7 +449,7 @@ impl PageCache { /// /// Does relation exist at given LSN? /// - pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { + pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: Lsn) -> anyhow::Result { let lsn = self.wait_lsn(req_lsn).await?; let key = CacheKey { @@ -497,7 +490,7 @@ impl PageCache { pub fn collect_records_for_apply( &self, tag: BufferTag, - lsn: u64, + lsn: Lsn, ) -> (Option, Vec) { let mut buf = BytesMut::new(); let key = CacheKey { tag, lsn }; @@ -576,7 +569,7 @@ impl PageCache { let mut key = CacheKey { tag, lsn: rec.lsn }; // What was the size of the relation before this record? - let last_lsn = self.last_valid_lsn.load(Ordering::Acquire); + let last_lsn = self.last_valid_lsn.load(); let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; let content = CacheEntryContent { @@ -606,7 +599,7 @@ impl PageCache { /// /// Memorize a full image of a page version /// - pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { + pub fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { let key = CacheKey { tag, lsn }; let content = CacheEntryContent { page_image: Some(img), @@ -629,7 +622,7 @@ impl PageCache { pub fn create_database( &self, - lsn: u64, + lsn: Lsn, db_id: Oid, tablespace_id: Oid, src_db_id: Oid, @@ -646,7 +639,7 @@ impl PageCache { }, blknum: 0, }, - lsn: 0, + lsn: Lsn(0), }; key.pack(&mut buf); let mut iter = self.db.raw_iterator(); @@ -679,22 +672,19 @@ impl PageCache { } /// Remember that WAL has been received and added to the page cache up to the given LSN - pub fn advance_last_valid_lsn(&self, lsn: u64) { + pub fn advance_last_valid_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { shared.last_valid_lsn = lsn; - self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn); self.seqwait_lsn.advance(lsn); } else { warn!( - "attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})", - oldlsn >> 32, - oldlsn & 0xffffffff, - lsn >> 32, - lsn & 0xffffffff + "attempted to move last valid LSN backwards (was {}, new {})", + oldlsn, lsn ); } } @@ -704,7 +694,7 @@ impl PageCache { /// /// NOTE: this updates last_valid_lsn as well. /// - pub fn advance_last_record_lsn(&self, lsn: u64) { + pub fn advance_last_record_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. @@ -713,8 +703,8 @@ impl PageCache { shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn); + self.last_record_lsn.store(lsn); self.seqwait_lsn.advance(lsn); } @@ -723,7 +713,7 @@ impl PageCache { /// /// TODO: This should be called by garbage collection, so that if an older /// page is requested, we will return an error to the requestor. - pub fn _advance_first_valid_lsn(&self, lsn: u64) { + pub fn _advance_first_valid_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. @@ -731,29 +721,29 @@ impl PageCache { // Can't overtake last_valid_lsn (except when we're // initializing the system and last_valid_lsn hasn't been set yet. - assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); + assert!(shared.last_valid_lsn == Lsn(0) || lsn < shared.last_valid_lsn); shared.first_valid_lsn = lsn; - self.first_valid_lsn.store(lsn, Ordering::Relaxed); + self.first_valid_lsn.store(lsn); } - pub fn init_valid_lsn(&self, lsn: u64) { + pub fn init_valid_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); - assert!(shared.first_valid_lsn == 0); - assert!(shared.last_valid_lsn == 0); - assert!(shared.last_record_lsn == 0); + assert!(shared.first_valid_lsn == Lsn(0)); + assert!(shared.last_valid_lsn == Lsn(0)); + assert!(shared.last_record_lsn == Lsn(0)); shared.first_valid_lsn = lsn; shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.first_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); + self.first_valid_lsn.store(lsn); + self.last_valid_lsn.store(lsn); + self.last_record_lsn.store(lsn); } - pub fn get_last_valid_lsn(&self) -> u64 { + pub fn get_last_valid_lsn(&self) -> Lsn { let shared = self.shared.lock().unwrap(); shared.last_record_lsn @@ -768,9 +758,6 @@ impl PageCache { num_page_images: self.num_page_images.load(Ordering::Relaxed), num_wal_records: self.num_wal_records.load(Ordering::Relaxed), num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), - last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), - last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), } } @@ -781,8 +768,8 @@ impl PageCache { // // The caller must ensure that WAL has been received up to 'lsn'. // - fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { - assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); + fn relsize_get_nowait(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { + assert!(lsn <= self.last_valid_lsn.load()); let mut key = CacheKey { tag: BufferTag { @@ -833,8 +820,9 @@ impl PageCache { loop { thread::sleep(conf.gc_period); let last_lsn = self.get_last_valid_lsn(); - if last_lsn > conf.gc_horizon { - let horizon = last_lsn - conf.gc_horizon; + + // checked_sub() returns None on overflow. + if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) { let mut maxkey = CacheKey { tag: BufferTag { rel: RelTag { @@ -845,7 +833,7 @@ impl PageCache { }, blknum: u32::MAX, }, - lsn: u64::MAX, + lsn: Lsn::MAX, }; let now = Instant::now(); let mut reconstructed = 0u64; @@ -873,7 +861,7 @@ impl PageCache { maxkey.lsn = min(horizon, last_lsn); // do not remove last version let mut minkey = maxkey.clone(); - minkey.lsn = 0; // first version + minkey.lsn = Lsn(0); // first version // reconstruct most recent page version if (v[0] & PAGE_IMAGE_FLAG) == 0 { @@ -942,17 +930,17 @@ impl PageCache { // // Wait until WAL has been received up to the given LSN. // - async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result { - let mut lsn = req_lsn; - //When invalid LSN is requested, it means "don't wait, return latest version of the page" - //This is necessary for bootstrap. - if lsn == 0 { - lsn = self.last_valid_lsn.load(Ordering::Acquire); + async fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result { + // When invalid LSN is requested, it means "don't wait, return latest version of the page" + // This is necessary for bootstrap. + if lsn == Lsn(0) { + let last_valid_lsn = self.last_valid_lsn.load(); trace!( "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - self.last_valid_lsn.load(Ordering::Acquire), + last_valid_lsn, lsn ); + lsn = last_valid_lsn; } self.seqwait_lsn @@ -960,9 +948,8 @@ impl PageCache { .await .with_context(|| { format!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff + "Timed out while waiting for WAL record at LSN {} to arrive", + lsn ) })?; @@ -983,9 +970,6 @@ pub fn get_stats() -> PageCacheStats { num_page_images: 0, num_wal_records: 0, num_getpage_requests: 0, - first_valid_lsn: 0, - last_valid_lsn: 0, - last_record_lsn: 0, }; pcaches.iter().for_each(|(_sys_id, pcache)| { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 37790b556198..72f97aaaa7c3 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -25,6 +25,7 @@ use tokio::runtime; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task; +use zenith_utils::lsn::Lsn; use crate::basebackup; use crate::page_cache; @@ -84,7 +85,7 @@ struct ZenithRequest { relnode: u32, forknum: u8, blkno: u32, - lsn: u64, + lsn: Lsn, } #[derive(Debug)] @@ -373,7 +374,7 @@ impl FeMessage { relnode: body.get_u32(), forknum: body.get_u8(), blkno: body.get_u32(), - lsn: body.get_u64(), + lsn: Lsn::from(body.get_u64()), }; // TODO: consider using protobuf or serde bincode for less error prone diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index d090419b0f13..c38d7adaa9f5 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -34,6 +34,7 @@ use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; use postgres_ffi::xlog_utils::*; +use zenith_utils::lsn::Lsn; // From pg_tablespace_d.h // @@ -60,20 +61,21 @@ pub fn restore_timeline( .join(timeline.to_string()) .join("snapshots"); - let mut last_snapshot_lsn: u64 = 0; + let mut last_snapshot_lsn: Lsn = Lsn(0); for direntry in fs::read_dir(&snapshotspath).unwrap() { let direntry = direntry?; - let filename = direntry.file_name().to_str().unwrap().to_owned(); - - let lsn = u64::from_str_radix(&filename, 16)?; + let filename = direntry.file_name(); + let lsn = Lsn::from_filename(&filename)?; last_snapshot_lsn = max(lsn, last_snapshot_lsn); - restore_snapshot(conf, pcache, timeline, &filename)?; - info!("restored snapshot at {}", filename); + // FIXME: pass filename as Path instead of str? + let filename_str = filename.into_string().unwrap(); + restore_snapshot(conf, pcache, timeline, &filename_str)?; + info!("restored snapshot at {:?}", filename_str); } - if last_snapshot_lsn == 0 { + if last_snapshot_lsn == Lsn(0) { error!( "could not find valid snapshot in {}", snapshotspath.display() @@ -183,7 +185,7 @@ fn restore_relfile( dboid: u32, path: &Path, ) -> Result<()> { - let lsn = u64::from_str_radix(snapshot, 16)?; + let lsn = Lsn::from_hex(snapshot)?; // Does it look like a relation file? @@ -245,15 +247,16 @@ fn restore_wal( _conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId, - startpoint: u64, + startpoint: Lsn, ) -> Result<()> { let walpath = format!("timelines/{}/wal", timeline); let mut waldecoder = WalStreamDecoder::new(startpoint); - let mut segno = XLByteToSeg(startpoint, 16 * 1024 * 1024); - let mut offset = XLogSegmentOffset(startpoint, 16 * 1024 * 1024); - let mut last_lsn = 0; + const SEG_SIZE: u64 = 16 * 1024 * 1024; + let mut segno = startpoint.segment_number(SEG_SIZE); + let mut offset = startpoint.segment_offset(SEG_SIZE); + let mut last_lsn = Lsn(0); loop { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, 16 * 1024 * 1024); @@ -336,11 +339,7 @@ fn restore_wal( segno += 1; offset = 0; } - info!( - "reached end of WAL at {:X}/{:X}", - last_lsn >> 32, - last_lsn & 0xffffffff - ); + info!("reached end of WAL at {}", last_lsn); Ok(()) } diff --git a/pageserver/src/tui.rs b/pageserver/src/tui.rs index 46cd76b9cea5..c0e1d657b38b 100644 --- a/pageserver/src/tui.rs +++ b/pageserver/src/tui.rs @@ -240,7 +240,9 @@ fn get_metric_u64(title: &str, value: u64) -> Spans { ]) } -fn get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> { +// This is not used since LSNs were removed from page cache stats. +// Maybe it will be used in the future? +fn _get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> { Spans::from(vec![ Span::styled(format!("{:<20}", title), Style::default()), Span::raw(": "), @@ -248,13 +250,6 @@ fn get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> { ]) } -// FIXME: We really should define a datatype for LSNs, with Display trait and -// helper functions. There's one in tokio-postgres, but I don't think we want -// to rely on that. -fn format_lsn(lsn: u64) -> String { - return format!("{:X}/{:X}", lsn >> 32, lsn & 0xffff_ffff); -} - impl tui::widgets::Widget for MetricsWidget { fn render(self, area: Rect, buf: &mut Buffer) { let block = Block::default() @@ -268,14 +263,19 @@ impl tui::widgets::Widget for MetricsWidget { let mut lines: Vec = Vec::new(); let page_cache_stats = crate::page_cache::get_stats(); + + // This is not used since LSNs were removed from page cache stats. + // Maybe it will be used in the future? + /* let lsnrange = format!( "{} - {}", - format_lsn(page_cache_stats.first_valid_lsn), - format_lsn(page_cache_stats.last_valid_lsn) + page_cache_stats.first_valid_lsn, page_cache_stats.last_valid_lsn ); - let last_valid_recordlsn_str = format_lsn(page_cache_stats.last_record_lsn); + let last_valid_recordlsn_str = page_cache_stats.last_record_lsn.to_string(); lines.push(get_metric_str("Valid LSN range", &lsnrange)); lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str)); + */ + lines.push(get_metric_u64( "# of cache entries", page_cache_stats.num_entries, diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 6cfd446a7f46..33e16ca336ea 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -4,8 +4,7 @@ use log::*; use std::cmp::min; use std::str; use thiserror::Error; - -const XLOG_BLCKSZ: u32 = 8192; +use zenith_utils::lsn::Lsn; // FIXME: this is configurable in PostgreSQL, 16 MB is the default const WAL_SEGMENT_SIZE: u64 = 16 * 1024 * 1024; @@ -41,9 +40,9 @@ const SizeOfXLogLongPHD: usize = (2 + 2 + 4 + 8 + 4) + 4 + 8 + 4 + 4; #[allow(dead_code)] pub struct WalStreamDecoder { - lsn: u64, + lsn: Lsn, - startlsn: u64, // LSN where this record starts + startlsn: Lsn, // LSN where this record starts contlen: u32, padlen: u32, @@ -56,7 +55,7 @@ pub struct WalStreamDecoder { #[error("{msg} at {lsn}")] pub struct WalDecodeError { msg: String, - lsn: u64, + lsn: Lsn, } // @@ -64,11 +63,11 @@ pub struct WalDecodeError { // FIXME: This isn't a proper rust stream // impl WalStreamDecoder { - pub fn new(lsn: u64) -> WalStreamDecoder { + pub fn new(lsn: Lsn) -> WalStreamDecoder { WalStreamDecoder { lsn, - startlsn: 0, + startlsn: Lsn(0), contlen: 0, padlen: 0, @@ -89,10 +88,10 @@ impl WalStreamDecoder { /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function /// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid. /// - pub fn poll_decode(&mut self) -> Result, WalDecodeError> { + pub fn poll_decode(&mut self) -> Result, WalDecodeError> { loop { // parse and verify page boundaries as we go - if self.lsn % WAL_SEGMENT_SIZE == 0 { + if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 { // parse long header if self.inputbuf.remaining() < SizeOfXLogLongPHD { @@ -100,7 +99,7 @@ impl WalStreamDecoder { } let hdr = self.decode_XLogLongPageHeaderData(); - if hdr.std.xlp_pageaddr != self.lsn { + if hdr.std.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { msg: "invalid xlog segment header".into(), lsn: self.lsn, @@ -110,15 +109,13 @@ impl WalStreamDecoder { self.lsn += SizeOfXLogLongPHD as u64; continue; - } else if self.lsn % (XLOG_BLCKSZ as u64) == 0 { - // parse page header - + } else if self.lsn.block_offset() == 0 { if self.inputbuf.remaining() < SizeOfXLogShortPHD { return Ok(None); } let hdr = self.decode_XLogPageHeaderData(); - if hdr.xlp_pageaddr != self.lsn { + if hdr.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { msg: "invalid xlog page header".into(), lsn: self.lsn, @@ -163,7 +160,7 @@ impl WalStreamDecoder { continue; } else { // we're continuing a record, possibly from previous page. - let pageleft: u32 = XLOG_BLCKSZ - (self.lsn % (XLOG_BLCKSZ as u64)) as u32; + let pageleft = self.lsn.remaining_in_block() as u32; // read the rest of the record, or as much as fits on this page. let n = min(self.contlen, pageleft) as usize; @@ -184,16 +181,11 @@ impl WalStreamDecoder { // XLOG_SWITCH records are special. If we see one, we need to skip // to the next WAL segment. if is_xlog_switch_record(&recordbuf) { - trace!( - "saw xlog switch record at {:X}/{:X}", - (self.lsn >> 32), - self.lsn & 0xffffffff - ); - self.padlen = (WAL_SEGMENT_SIZE - (self.lsn % WAL_SEGMENT_SIZE)) as u32; - } - - if self.lsn % 8 != 0 { - self.padlen = 8 - (self.lsn % 8) as u32; + trace!("saw xlog switch record at {}", self.lsn); + self.padlen = self.lsn.calc_padding(WAL_SEGMENT_SIZE) as u32; + } else { + // Pad to an 8-byte boundary + self.padlen = self.lsn.calc_padding(8u32) as u32; } let result = (self.lsn, recordbuf); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 6671c13d2563..5ef5f1cf02e1 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -31,6 +31,7 @@ use tokio::time::{sleep, Duration}; use tokio_postgres::replication::{PgTimestamp, ReplicationStream}; use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow}; use tokio_stream::StreamExt; +use zenith_utils::lsn::Lsn; // // We keep one WAL Receiver active per timeline. @@ -138,7 +139,7 @@ async fn walreceiver_main( let identify = identify_system(&rclient).await?; info!("{:?}", identify); - let end_of_wal = u64::from(identify.xlogpos); + let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; let pcache = page_cache::get_pagecache(&conf, timelineid).unwrap(); @@ -148,7 +149,7 @@ async fn walreceiver_main( // let mut startpoint = pcache.get_last_valid_lsn(); let last_valid_lsn = pcache.get_last_valid_lsn(); - if startpoint == 0 { + if startpoint == Lsn(0) { // If we start here with identify.xlogpos we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn // smaller than identify.xlogpos. @@ -157,37 +158,28 @@ async fn walreceiver_main( // different like having 'initdb' method on a pageserver (or importing some shared // empty database snapshot), so for now I just put start of first segment which // seems to be a valid record. - pcache.init_valid_lsn(0x_1_000_000_u64); - startpoint = 0x_1_000_000_u64; + pcache.init_valid_lsn(Lsn(0x_1_000_000)); + startpoint = Lsn(0x_1_000_000); } else { // There might be some padding after the last full record, skip it. // // FIXME: It probably would be better to always start streaming from the beginning // of the page, or the segment, so that we could check the page/segment headers // too. Just for the sake of paranoia. - if startpoint % 8 != 0 { - startpoint += 8 - (startpoint % 8); - } + startpoint += startpoint.calc_padding(8u32); } debug!( - "last_valid_lsn {:X}/{:X} starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...", - (last_valid_lsn >> 32), - (last_valid_lsn & 0xffffffff), - (startpoint >> 32), - (startpoint & 0xffffffff), - timelineid, - (end_of_wal >> 32), - (end_of_wal & 0xffffffff) + "last_valid_lsn {} starting replication from {} for timeline {}, server is at {}...", + last_valid_lsn, startpoint, timelineid, end_of_wal ); - let startpoint = PgLsn::from(startpoint); let query = format!("START_REPLICATION PHYSICAL {}", startpoint); let copy_stream = rclient.copy_both_simple::(&query).await?; let physical_stream = ReplicationStream::new(copy_stream); tokio::pin!(physical_stream); - let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); + let mut waldecoder = WalStreamDecoder::new(startpoint); while let Some(replication_message) = physical_stream.next().await { match replication_message? { @@ -195,7 +187,7 @@ async fn walreceiver_main( // Pass the WAL data to the decoder, and see if we can decode // more records as a result. let data = xlog_data.data(); - let startlsn = xlog_data.wal_start(); + let startlsn = Lsn::from(xlog_data.wal_start()); let endlsn = startlsn + data.len() as u64; write_wal_file( @@ -205,13 +197,7 @@ async fn walreceiver_main( data, )?; - trace!( - "received XLogData between {:X}/{:X} and {:X}/{:X}", - (startlsn >> 32), - (startlsn & 0xffffffff), - (endlsn >> 32), - (endlsn & 0xffffffff) - ); + trace!("received XLogData between {} and {}", startlsn, endlsn); waldecoder.feed_bytes(data); @@ -298,11 +284,7 @@ async fn walreceiver_main( pcache.advance_last_valid_lsn(endlsn); if !caught_up && endlsn >= end_of_wal { - info!( - "caught up at LSN {:X}/{:X}", - (endlsn >> 32), - (endlsn & 0xffffffff) - ); + info!("caught up at LSN {}", endlsn); caught_up = true; } } @@ -320,7 +302,7 @@ async fn walreceiver_main( ); if reply_requested { // TODO: More thought should go into what values are sent here. - let last_lsn = PgLsn::from(pcache.get_last_valid_lsn()); + let last_lsn = PgLsn::from(u64::from(pcache.get_last_valid_lsn())); let write_lsn = last_lsn; let flush_lsn = last_lsn; let apply_lsn = PgLsn::INVALID; @@ -387,7 +369,7 @@ pub async fn identify_system(client: &tokio_postgres::Client) -> Result>, } @@ -138,7 +138,7 @@ impl WalRedoManager { /// Request the WAL redo manager to apply WAL records, to reconstruct the page image /// of the given page version. /// - pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result { + pub async fn request_redo(&self, tag: BufferTag, lsn: Lsn) -> Result { // Create a channel where to receive the response let (tx, rx) = oneshot::channel::>(); @@ -225,18 +225,16 @@ impl WalRedoManagerInternal { } else if info == pg_constants::XLOG_XACT_ABORT { status = pg_constants::TRANSACTION_STATUS_ABORTED; } else { - trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}", + trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", status, - record.lsn >> 32, - record.lsn & 0xffffffff, + record.lsn, record.main_data_offset, record.rec.len()); return; } - trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}", + trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {} main_data_offset {}, rec.len {}", status, - record.lsn >> 32, - record.lsn & 0xffffffff, + record.lsn, record.main_data_offset, record.rec.len()); let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) @@ -305,9 +303,8 @@ impl WalRedoManagerInternal { let info = xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::CLOG_ZEROPAGE { page.clone_from_slice(zero_page_bytes); - trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}", - record.lsn >> 32, - record.lsn & 0xffffffff, + trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {} main_data_offset {}, rec.len {}", + record.lsn, record.main_data_offset, record.rec.len()); } } else if xl_rmid == pg_constants::RM_XACT_ID { @@ -325,11 +322,10 @@ impl WalRedoManagerInternal { let result: Result; trace!( - "applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", + "applied {} WAL records in {} ms to reconstruct page image at LSN {}", nrecords, duration.as_millis(), - lsn >> 32, - lsn & 0xffff_ffff + lsn ); if let Err(e) = apply_result { @@ -536,13 +532,13 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { buf.freeze() } -fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes { +fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes { let len = 4 + 8 + rec.len(); let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8(b'A'); buf.put_u32(len as u32); - buf.put_u64(endlsn); + buf.put_u64(endlsn.0); buf.put(rec); assert!(buf.len() == 1 + len); diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 9de98202c67a..fb4d415f22b1 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -1,5 +1,7 @@ //! zenith_utils is intended to be a place to put code that is shared //! between other crates in this repository. +/// `Lsn` type implements common tasks on Log Sequence Numbers +pub mod lsn; /// SeqWait allows waiting for a future sequence number to arrive pub mod seqwait; diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs new file mode 100644 index 000000000000..fa8bc6a54542 --- /dev/null +++ b/zenith_utils/src/lsn.rs @@ -0,0 +1,239 @@ +#![warn(missing_docs)] + +use std::fmt; +use std::ops::{Add, AddAssign}; +use std::path::Path; +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Transaction log block size in bytes +pub const XLOG_BLCKSZ: u32 = 8192; + +/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr +#[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] +pub struct Lsn(pub u64); + +/// We tried to parse an LSN from a string, but failed +#[derive(Debug, PartialEq, thiserror::Error)] +#[error("LsnParseError")] +pub struct LsnParseError; + +impl Lsn { + /// Maximum possible value for an LSN + pub const MAX: Lsn = Lsn(u64::MAX); + + /// Subtract a number, returning None on overflow. + pub fn checked_sub>(self, other: T) -> Option { + let other: u64 = other.into(); + self.0.checked_sub(other).map(Lsn) + } + + /// Parse an LSN from a filename in the form `0000000000000000` + pub fn from_filename(filename: F) -> Result + where + F: AsRef, + { + let filename: &Path = filename.as_ref(); + let filename = filename.to_str().ok_or(LsnParseError)?; + Lsn::from_hex(filename) + } + + /// Parse an LSN from a string in the form `0000000000000000` + pub fn from_hex(s: S) -> Result + where + S: AsRef, + { + let s: &str = s.as_ref(); + let n = u64::from_str_radix(s, 16).or(Err(LsnParseError))?; + Ok(Lsn(n)) + } + + /// Compute the offset into a segment + pub fn segment_offset(self, seg_sz: u64) -> u64 { + self.0 % seg_sz + } + + /// Compute the segment number + pub fn segment_number(self, seg_sz: u64) -> u64 { + self.0 / seg_sz + } + + /// Compute the offset into a block + pub fn block_offset(self) -> u64 { + const BLCKSZ: u64 = XLOG_BLCKSZ as u64; + self.0 % BLCKSZ + } + + /// Compute the bytes remaining in this block + /// + /// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`. + pub fn remaining_in_block(self) -> u64 { + const BLCKSZ: u64 = XLOG_BLCKSZ as u64; + BLCKSZ - (self.0 % BLCKSZ) + } + + /// Compute the bytes remaining to fill a chunk of some size + /// + /// If the LSN is already at the chunk boundary, it will return 0. + pub fn calc_padding>(self, sz: T) -> u64 { + let sz: u64 = sz.into(); + // By using wrapping_sub, we can subtract first and then mod second. + // If it's done the other way around, then we would return a full + // chunk size if we're already at the chunk boundary. + // (Regular subtraction will panic on overflow in debug builds.) + (sz.wrapping_sub(self.0)) % sz + } +} + +impl From for Lsn { + fn from(n: u64) -> Self { + Lsn(n) + } +} + +impl From for u64 { + fn from(lsn: Lsn) -> u64 { + lsn.0 + } +} + +impl FromStr for Lsn { + type Err = LsnParseError; + + /// Parse an LSN from a string in the form `00000000/00000000` + /// + /// If the input string is missing the '/' character, then use `Lsn::from_hex` + fn from_str(s: &str) -> Result { + let mut splitter = s.split('/'); + if let (Some(left), Some(right), None) = (splitter.next(), splitter.next(), splitter.next()) + { + let left_num = u32::from_str_radix(left, 16).map_err(|_| LsnParseError)?; + let right_num = u32::from_str_radix(right, 16).map_err(|_| LsnParseError)?; + Ok(Lsn((left_num as u64) << 32 | right_num as u64)) + } else { + Err(LsnParseError) + } + } +} + +impl fmt::Display for Lsn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff) + } +} + +impl Add for Lsn { + type Output = Lsn; + + fn add(self, other: u64) -> Self::Output { + // panic if the addition overflows. + Lsn(self.0.checked_add(other).unwrap()) + } +} + +impl AddAssign for Lsn { + fn add_assign(&mut self, other: u64) { + // panic if the addition overflows. + self.0 = self.0.checked_add(other).unwrap(); + } +} + +/// An [`Lsn`] that can be accessed atomically. +pub struct AtomicLsn { + inner: AtomicU64, +} + +impl AtomicLsn { + /// Creates a new atomic `Lsn`. + pub fn new(val: u64) -> Self { + AtomicLsn { + inner: AtomicU64::new(val), + } + } + + /// Atomically retrieve the `Lsn` value from memory. + pub fn load(&self) -> Lsn { + Lsn(self.inner.load(Ordering::Acquire)) + } + + /// Atomically store a new `Lsn` value to memory. + pub fn store(&self, lsn: Lsn) { + self.inner.store(lsn.0, Ordering::Release); + } + + /// Adds to the current value, returning the previous value. + /// + /// This operation will panic on overflow. + pub fn fetch_add(&self, val: u64) -> Lsn { + let prev = self.inner.fetch_add(val, Ordering::AcqRel); + if prev.checked_add(val).is_none() { + panic!("AtomicLsn overflow"); + } + Lsn(prev) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lsn_strings() { + assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555))); + assert_eq!("aaaa/bbbb".parse(), Ok(Lsn(0x0000AAAA0000BBBB))); + assert_eq!("1/A".parse(), Ok(Lsn(0x000000010000000A))); + assert_eq!("0/0".parse(), Ok(Lsn(0))); + "ABCDEFG/12345678".parse::().unwrap_err(); + "123456789/AAAA5555".parse::().unwrap_err(); + "12345678/AAAA55550".parse::().unwrap_err(); + "-1/0".parse::().unwrap_err(); + "1/-1".parse::().unwrap_err(); + + assert_eq!(format!("{}", Lsn(0x12345678AAAA5555)), "12345678/AAAA5555"); + assert_eq!(format!("{}", Lsn(0x000000010000000A)), "1/A"); + + assert_eq!(Lsn::from_hex("12345678AAAA5555"), Ok(Lsn(0x12345678AAAA5555))); + assert_eq!(Lsn::from_hex("0"), Ok(Lsn(0))); + assert_eq!(Lsn::from_hex("F12345678AAAA5555"), Err(LsnParseError)); + } + + #[test] + fn test_lsn_math() { + assert_eq!(Lsn(1234) + 11u64, Lsn(1245)); + + assert_eq!( + { + let mut lsn = Lsn(1234); + lsn += 11u64; + lsn + }, + Lsn(1245) + ); + + assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1))); + assert_eq!(Lsn(1234).checked_sub(1235u64), None); + + let seg_sz = 16u64 * 1024 * 1024; + assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7u64); + assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64); + + assert_eq!(Lsn(0x4007).block_offset(), 7u64); + assert_eq!(Lsn(0x4000).block_offset(), 0u64); + assert_eq!(Lsn(0x4007).remaining_in_block(), 8185u64); + assert_eq!(Lsn(0x4000).remaining_in_block(), 8192u64); + + assert_eq!(Lsn(0xffff01).calc_padding(seg_sz), 255u64); + assert_eq!(Lsn(0x2000000).calc_padding(seg_sz), 0u64); + assert_eq!(Lsn(0xffff01).calc_padding(8u32), 7u64); + assert_eq!(Lsn(0xffff00).calc_padding(8u32), 0u64); + } + + #[test] + fn test_atomic_lsn() { + let lsn = AtomicLsn::new(0); + assert_eq!(lsn.fetch_add(1234), Lsn(0)); + assert_eq!(lsn.load(), Lsn(1234)); + lsn.store(Lsn(5678)); + assert_eq!(lsn.load(), Lsn(5678)); + } +} diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs index c30304ab6ab4..b4f3cdd45424 100644 --- a/zenith_utils/src/seqwait.rs +++ b/zenith_utils/src/seqwait.rs @@ -1,6 +1,7 @@ #![warn(missing_docs)] use std::collections::BTreeMap; +use std::fmt::Debug; use std::mem; use std::sync::Mutex; use std::time::Duration; @@ -18,9 +19,12 @@ pub enum SeqWaitError { } /// Internal components of a `SeqWait` -struct SeqWaitInt { - waiters: BTreeMap, Receiver<()>)>, - current: u64, +struct SeqWaitInt +where + T: Ord, +{ + waiters: BTreeMap, Receiver<()>)>, + current: T, shutdown: bool, } @@ -38,13 +42,19 @@ struct SeqWaitInt { /// [`wait_for`]: SeqWait::wait_for /// [`advance`]: SeqWait::advance /// -pub struct SeqWait { - internal: Mutex, +pub struct SeqWait +where + T: Ord, +{ + internal: Mutex>, } -impl SeqWait { +impl SeqWait +where + T: Ord + Debug + Copy, +{ /// Create a new `SeqWait`, initialized to a particular number - pub fn new(starting_num: u64) -> Self { + pub fn new(starting_num: T) -> Self { let internal = SeqWaitInt { waiters: BTreeMap::new(), current: starting_num, @@ -82,7 +92,7 @@ impl SeqWait { /// /// This call won't complete until someone has called `advance` /// with a number greater than or equal to the one we're waiting for. - pub async fn wait_for(&self, num: u64) -> Result<(), SeqWaitError> { + pub async fn wait_for(&self, num: T) -> Result<(), SeqWaitError> { let mut rx = { let mut internal = self.internal.lock().unwrap(); if internal.current >= num { @@ -116,7 +126,7 @@ impl SeqWait { /// [`SeqWaitError::Timeout`] will be returned. pub async fn wait_for_timeout( &self, - num: u64, + num: T, timeout_duration: Duration, ) -> Result<(), SeqWaitError> { timeout(timeout_duration, self.wait_for(num)) @@ -130,23 +140,30 @@ impl SeqWait { /// /// `advance` will panic if you send it a lower number than /// a previous call. - pub fn advance(&self, num: u64) { + pub fn advance(&self, num: T) { let wake_these = { let mut internal = self.internal.lock().unwrap(); if internal.current > num { panic!( - "tried to advance backwards, from {} to {}", + "tried to advance backwards, from {:?} to {:?}", internal.current, num ); } internal.current = num; // split_off will give me all the high-numbered waiters, - // so split and then swap. Everything at or above (num + 1) - // gets to stay. - let mut split = internal.waiters.split_off(&(num + 1)); + // so split and then swap. Everything at or above `num` + // stays. + let mut split = internal.waiters.split_off(&num); std::mem::swap(&mut split, &mut internal.waiters); + + // `split_at` didn't get the value at `num`; if it's + // there take that too. + if let Some(sleeper) = internal.waiters.remove(&num) { + split.insert(num, sleeper); + } + split };