Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only support compressed reads if the compression setting is present #8238

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use bytes::{BufMut, BytesMut};
use pageserver_api::models::ImageCompressionAlgorithm;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use tracing::warn;

use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
Expand Down Expand Up @@ -72,14 +73,22 @@ impl<'a> BlockCursor<'a> {
len_buf.copy_from_slice(&buf[off..off + 4]);
off += 4;
}
len_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
let bit_mask = if self.read_compressed {
!LEN_COMPRESSION_BIT_MASK
} else {
0x7f
};
len_buf[0] &= bit_mask;
u32::from_be_bytes(len_buf) as usize
};
let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;

let mut tmp_buf = Vec::new();
let buf_to_write;
let compression = if compression_bits <= BYTE_UNCOMPRESSED {
let compression = if compression_bits <= BYTE_UNCOMPRESSED || !self.read_compressed {
if compression_bits > BYTE_UNCOMPRESSED {
warn!("reading key above future limit ({len} bytes)");
}
Comment on lines +89 to +91
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean that we are reading a previously compressed as uncompressed?

Shouldn't we also or alternatively check if the read bytes start with the zstd fourcc/magic?

Copy link
Member

@koivunej koivunej Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, I am having a hard time parsing this compression_bits. I get it that here it can be the u32 after anding it with 0x0f or 0x7f which means -- aha, it is never masked.

Related note: On line L332R341 the LEN_COMPRESSION_BIT_MASK is used as literal 0xf0:

-assert_eq!(len_buf[0] & 0xf0, 0);
+assert_eq!(len_buf[0] & LEN_COMPRESSION_BIT_MASK, 0);

Ok... So perhaps I know understand. Possible compression_bits are:

match compression_bits >> 4 {
  0 => /* image layer written before the compression support or small value? */,
  1..8 => /* reserved */,
  8 => /* uncompressed blob */,
  9 => /* zstd */,
  10..=15 => /* undefined or written before compression support and too large, which we warn here? */
  _ => unreachable!("u4"),
}

If this is correctly understood, then okay maybe ... The compression_bits > BYTE_COMPRESSED just looks so off, in my mind a bitfield doesn't support ordered comparison. It'd be nice to have enums and matches for these. Err nope that cannot be correct.

Did you test that this warning is produced with some hand-crafted image file?

EDIT: rust snippet had 1..8 and 10..=15 wrong way around, possibly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean that we are reading a previously compressed as uncompressed?

If the compression setting is disabled, yes. This has the consequence that we can't turn off compression easily any more, but I think it's okay to have it for a while, after which point we'll (mostly) revert this PR.

It'd be nice to have enums and matches for these

match is not good for that as FOOBAR => is equivalent to a variable capture.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my match correct? Did you test if this is hit? Will that be used as a success criteria for the compression support? If so, what is the plan to read all image layers?

Other questions remain, magic/fourcc instead of reserving more bits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my match correct?

almost correct, the 1..8 range doesn't have the highest bit set so it's an indicator for small uncompressed values.

match compression_bits >> 4 {
  0..8 => /* small, uncompressed value below 128 bytes */,
  8 => /* uncompressed blob */,
  9 => /* zstd */,
  10..=15 => /* reserved or written before compression support and too large, which we warn here */
  _ => unreachable!("u4"),
}

Will that be used as a success criteria for the compression support?

What do you mean, can you expand?

If so, what is the plan to read all image layers?

??

magic/fourcc instead of reserving more bits?

What do you mean by that? I do not think we should autodetect zstd magic here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Christian wants this to be merged as-is, doing that now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

success criteria
plan to read all image layers

Christian mentioned these in #8238 (comment). I was wondering similarly what are the next steps.

magic/fourcc

What do you mean by that? I do not think we should autodetect zstd magic here.

zstd always starts the compressed bytes with the same 4 magic bytes. I was thinking if we should instead use that knowledge instead of awkwardly reserving bits, as I had no idea what was the plan for the next step.. But yeah, seems there is a plan after all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference, the slack thread where I proposed to merge asap is https://neondb.slack.com/archives/C074JQU7NER/p1720021276202079

buf_to_write = dstbuf;
None
} else if compression_bits == BYTE_ZSTD {
Expand Down Expand Up @@ -384,10 +393,10 @@ mod tests {
use rand::{Rng, SeedableRng};

async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED, 0>(blobs).await
round_trip_test_compressed::<BUFFERED, false>(blobs).await
}

async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: u8>(
async fn round_trip_test_compressed<const BUFFERED: bool, const COMPRESSION: bool>(
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
blobs: &[Vec<u8>],
) -> Result<(), Error> {
let temp_dir = camino_tempfile::tempdir()?;
Expand All @@ -400,17 +409,15 @@ mod tests {
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = match COMPRESSION {
0 => wtr.write_blob(blob.clone(), &ctx).await,
1 => {
wtr.write_blob_maybe_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }),
)
.await
}
_ => unreachable!("Invalid compression {COMPRESSION}"),
let (_, res) = if COMPRESSION {
wtr.write_blob_maybe_compressed(
blob.clone(),
&ctx,
Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }),
)
.await
} else {
wtr.write_blob(blob.clone(), &ctx).await
};
let offs = res?;
offsets.push(offs);
Expand All @@ -425,7 +432,7 @@ mod tests {

let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
let rdr = BlockCursor::new_with_compression(rdr, COMPRESSION);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let blob_read = rdr.read_blob(*offset, &ctx).await?;
assert_eq!(
Expand Down Expand Up @@ -459,6 +466,8 @@ mod tests {
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false, true>(blobs).await?;
round_trip_test_compressed::<true, true>(blobs).await?;
Ok(())
}

Expand All @@ -474,8 +483,8 @@ mod tests {
];
round_trip_test::<false>(blobs).await?;
round_trip_test::<true>(blobs).await?;
round_trip_test_compressed::<false, 1>(blobs).await?;
round_trip_test_compressed::<true, 1>(blobs).await?;
round_trip_test_compressed::<false, true>(blobs).await?;
round_trip_test_compressed::<true, true>(blobs).await?;
Ok(())
}

Expand Down
31 changes: 28 additions & 3 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,24 @@ impl<'a> BlockReaderRef<'a> {
/// ```
///
pub struct BlockCursor<'a> {
pub(super) read_compressed: bool,
reader: BlockReaderRef<'a>,
}

impl<'a> BlockCursor<'a> {
pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
BlockCursor { reader }
Self::new_with_compression(reader, false)
}
pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
BlockCursor {
read_compressed,
reader,
}
}
// Needed by cli
pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
BlockCursor {
read_compressed: false,
reader: BlockReaderRef::FileBlockReader(reader),
}
}
Expand Down Expand Up @@ -188,11 +196,25 @@ pub struct FileBlockReader<'a> {

/// Unique ID of this file, used as key in the page cache.
file_id: page_cache::FileId,

compressed_reads: bool,
}

impl<'a> FileBlockReader<'a> {
pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
FileBlockReader { file_id, file }
Self::new_with_compression(file, file_id, false)
}

pub fn new_with_compression(
file: &'a VirtualFile,
file_id: FileId,
compressed_reads: bool,
) -> Self {
FileBlockReader {
file_id,
file,
compressed_reads,
}
}

/// Read a page from the underlying file into given buffer.
Expand Down Expand Up @@ -239,7 +261,10 @@ impl<'a> FileBlockReader<'a> {

impl BlockReader for FileBlockReader<'_> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReader(self))
BlockCursor::new_with_compression(
BlockReaderRef::FileBlockReader(self),
self.compressed_reads,
)
}
}

Expand Down
28 changes: 19 additions & 9 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ pub struct ImageLayerInner {
file_id: FileId,

max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
compressed_reads: bool,
}

impl std::fmt::Debug for ImageLayerInner {
Expand All @@ -178,7 +179,8 @@ impl std::fmt::Debug for ImageLayerInner {

impl ImageLayerInner {
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
Expand Down Expand Up @@ -266,9 +268,10 @@ impl ImageLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
let path = self.path();

let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
.await
.and_then(|res| res)?;
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, false, ctx)
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
.await
.and_then(|res| res)?;

// not production code
let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
Expand Down Expand Up @@ -377,6 +380,7 @@ impl ImageLayerInner {
lsn: Lsn,
summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
support_compressed_reads: bool,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
Expand Down Expand Up @@ -420,6 +424,7 @@ impl ImageLayerInner {
file,
file_id,
max_vectored_read_bytes,
compressed_reads: support_compressed_reads,
key_range: actual_summary.key_range,
}))
}
Expand All @@ -430,7 +435,8 @@ impl ImageLayerInner {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);

Expand Down Expand Up @@ -490,12 +496,14 @@ impl ImageLayerInner {
&self,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
let mut result = Vec::new();
let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let cursor = block_reader.block_cursor();
while let Some(item) = stream.next().await {
// TODO: dedup code with get_reconstruct_value
Expand Down Expand Up @@ -530,7 +538,8 @@ impl ImageLayerInner {
.into(),
);

let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);

Expand Down Expand Up @@ -691,7 +700,8 @@ impl ImageLayerInner {

#[cfg(test)]
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let block_reader =
FileBlockReader::new_with_compression(&self.file, self.file_id, self.compressed_reads);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
ImageLayerIterator {
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,7 @@ impl DownloadedLayer {
lsn,
summary,
Some(owner.conf.max_vectored_read_bytes),
owner.conf.image_compression.is_some(),
ctx,
)
.await
Expand Down
Loading