Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
borrow::Cow,
cmp::min,
cmp::{max, min},
fs::File,
io::{self, BufWriter, Seek, Write},
path::Path,
Expand Down Expand Up @@ -45,6 +45,8 @@ const MIN_VALUE_COMPRESSION_SAMPLES_SIZE: usize = 1024;
const MIN_KEY_COMPRESSION_SAMPLES_SIZE: usize = 1024;
/// The bytes that are used per key/value entry for a sample.
const COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY: usize = 100;
/// The minimum bytes that are used per key/value entry for a sample.
const MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY: usize = 16;

/// Trait for entries from that SST files can be created
pub trait Entry {
Expand Down Expand Up @@ -153,58 +155,75 @@ impl<'a> StaticSortedFileBuilder<'a> {
{
return Ok(());
}
let key_compression_samples_size = min(KEY_COMPRESSION_SAMPLES_SIZE, total_key_size / 10);
let key_compression_samples_size = min(KEY_COMPRESSION_SAMPLES_SIZE, total_key_size / 16);
let value_compression_samples_size =
min(VALUE_COMPRESSION_SAMPLES_SIZE, total_value_size / 10);
min(VALUE_COMPRESSION_SAMPLES_SIZE, total_value_size / 16);
let mut value_samples = Vec::with_capacity(value_compression_samples_size);
let mut value_sample_sizes = Vec::new();
let mut key_samples = Vec::with_capacity(key_compression_samples_size);
let mut key_sample_sizes = Vec::new();
let mut i = 12345678 % entries.len();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entries are stored by hash, so basically randomly distributed. No need to add a random iteration order here. Instead there is benefit in actually visiting all entries when there ain't that many entries

let mut j = 0;
loop {
let entry = &entries[i];

// Limit the number of iterations to avoid infinite loops
let max_iterations =
max(total_key_size, total_value_size) / COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY * 2;
for i in 0..max_iterations {
let entry = &entries[i % entries.len()];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to add the same entry to the dict multiple times? I guess this is only an edge case so it's fine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, potentially picking from the same entry twice, but it chooses only up to 100bytes from that entry randomly, so likely a different range of the entry.

Copy link
Contributor

@vercel vercel bot Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code will panic with division by zero when entries.len() == 0 due to the modulo operation i % entries.len().

View Details

Analysis

On line 170, the expression entries[i % entries.len()] will cause a panic if the entries slice is empty, since entries.len() would be 0 and division by zero is undefined behavior in the modulo operation. The same issue occurs on line 204 in the second loop.

While the early return on lines 153-156 might catch some cases where both total_key_size and total_value_size are small, it doesn't guarantee that entries is non-empty. An empty entries slice could theoretically be passed with non-zero size parameters, or the size parameters could be inconsistent with the actual entries.

This would cause a runtime panic that could crash the application when processing certain edge case inputs.


Recommendation

Add a guard at the beginning of the function to handle empty entries:

if entries.is_empty() {
    return Ok(());
}

Add this check right after line 157 (after the existing early return) to ensure the function exits safely when there are no entries to process.

let value_remaining = value_compression_samples_size - value_samples.len();
let key_remaining = key_compression_samples_size - key_samples.len();
if value_remaining > 0
&& let EntryValue::Small { value } | EntryValue::Medium { value } = entry.value()
{
let value = if value.len() <= COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
value
} else {
j = (j + 12345678) % (value.len() - COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY);
&value[j..j + COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY]
};
if value.len() <= value_remaining {
value_sample_sizes.push(value.len());
value_samples.extend_from_slice(value);
} else {
value_sample_sizes.push(value_remaining);
value_samples.extend_from_slice(&value[..value_remaining]);
if value_remaining < MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
break;
}
if let EntryValue::Small { value } | EntryValue::Medium { value } = entry.value() {
let len = value.len();
if len >= MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
let used_len = min(value_remaining, COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY);
if len <= used_len {
value_sample_sizes.push(len);
value_samples.extend_from_slice(value);
} else {
value_sample_sizes.push(used_len);
let p = value_samples.len() % (len - used_len);
Copy link
Contributor

@vercel vercel bot Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sampling logic uses value_samples.len() to determine the offset, which grows as samples are added, leading to inconsistent and potentially biased sampling.

View Details

Analysis

On lines 184 and 220, the code uses value_samples.len() and key_samples.len() respectively to calculate the offset p for sampling from entries that are larger than the desired sample size. However, these lengths grow as more samples are added to the vectors, which means:

  1. Inconsistent sampling: The offset calculation changes based on how many samples have already been collected, not based on the current entry being processed
  2. Potentially biased distribution: Later entries in the iteration will have larger offset values, leading to sampling from different parts of the data in a way that depends on processing order
  3. Loss of determinism: The sampling behavior depends on the accumulated state rather than being deterministic per entry

The original code used a separate counter j that was incremented pseudo-randomly to provide more consistent sampling behavior across entries.


Recommendation

Replace the sampling logic to use a deterministic offset based on the iteration index instead:

For values (line 184):

let p = i % (len - used_len);

For keys (line 220):

let p = i % (len - used_len);

This ensures that the sampling offset is deterministic and based on the current iteration, providing more consistent sampling across different entries regardless of how many samples have been collected so far.

value_samples.extend_from_slice(&value[p..p + used_len]);
};
}
}
if key_remaining > 0 {
}
assert!(value_samples.len() == value_sample_sizes.iter().sum::<usize>());
if value_samples.len() > MIN_VALUE_COMPRESSION_SAMPLES_SIZE && value_sample_sizes.len() > 5
{
self.value_compression_dictionary = zstd::dict::from_continuous(
&value_samples,
&value_sample_sizes,
VALUE_COMPRESSION_DICTIONARY_SIZE,
)
.context("Value dictionary creation failed")?;
} else {
self.value_compression_dictionary = Vec::new();
}

for i in 0..max_iterations {
let entry = &entries[i % entries.len()];
let key_remaining = key_compression_samples_size - key_samples.len();
if key_remaining < MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
break;
}
let len = entry.key_len();
if len >= MIN_COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY {
let used_len = min(key_remaining, COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY);
if entry.key_len() <= used_len {
key_sample_sizes.push(entry.key_len());
if len <= used_len {
key_sample_sizes.push(len);
entry.write_key_to(&mut key_samples);
} else {
let mut temp = Vec::with_capacity(entry.key_len());
let mut temp = Vec::with_capacity(len);
entry.write_key_to(&mut temp);
debug_assert!(temp.len() == entry.key_len());
debug_assert!(temp.len() == len);

j = (j + 12345678) % (temp.len() - used_len);
let p = key_samples.len() % (len - used_len);
key_sample_sizes.push(used_len);
key_samples.extend_from_slice(&temp[j..j + used_len]);
key_samples.extend_from_slice(&temp[p..p + used_len]);
}
}
if key_remaining == 0 && value_remaining == 0 {
break;
}
i = (i + 12345678) % entries.len();
}
assert!(key_samples.len() == key_sample_sizes.iter().sum::<usize>());
assert!(value_samples.len() == value_sample_sizes.iter().sum::<usize>());
if key_samples.len() > MIN_KEY_COMPRESSION_SAMPLES_SIZE && key_sample_sizes.len() > 5 {
self.key_compression_dictionary = zstd::dict::from_continuous(
&key_samples,
Expand All @@ -213,15 +232,6 @@ impl<'a> StaticSortedFileBuilder<'a> {
)
.context("Key dictionary creation failed")?;
}
if value_samples.len() > MIN_VALUE_COMPRESSION_SAMPLES_SIZE && value_sample_sizes.len() > 5
{
self.value_compression_dictionary = zstd::dict::from_continuous(
&value_samples,
&value_sample_sizes,
VALUE_COMPRESSION_DICTIONARY_SIZE,
)
.context("Value dictionary creation failed")?;
}
Ok(())
}

Expand Down
Loading