Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Fix race condition in get_reconstruct_data #1447

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 100 additions & 98 deletions pageserver/src/layered_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1342,10 +1342,6 @@ impl LayeredTimeline {
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
) -> anyhow::Result<()> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;

let mut path: Vec<(ValueReconstructResult, Lsn, Arc<dyn Layer>)> = Vec::new();

let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
Expand All @@ -1361,119 +1357,125 @@ impl LayeredTimeline {

let mut result = ValueReconstructResult::Continue;
let mut cont_lsn = Lsn(request_lsn.0 + 1);
// Start from the current timeline.
let mut timeline = self;
let mut timeline_owned;

'outer: loop {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.materialized_page_cache_hit_counter.inc_by(1);
return Ok(());
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
// getting stuck in the loop.

// For debugging purposes, print the path of layers that we traversed
// through.
for (r, c, l) in path {
error!(
"PATH: result {:?}, cont_lsn {}, layer: {}",
r,
c,
l.filename().display()
);
'for_all_timelines: loop {
let layers = timeline.layers.read().unwrap();

'for_all_layers: loop {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.materialized_page_cache_hit_counter.inc_by(1);
return Ok(());
}
bail!("could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
// getting stuck in the loop.

// For debugging purposes, print the path of layers that we traversed
// through.
for (r, c, l) in path {
error!(
"PATH: result {:?}, cont_lsn {}, layer: {}",
r,
c,
l.filename().display()
);
}
bail!("could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
key,
Lsn(cont_lsn.0 - 1),
request_lsn,
timeline.ancestor_lsn)
}
prev_lsn = cont_lsn;
}
ValueReconstructResult::Missing => {
bail!(
"could not find data for key {} at LSN {}, for request at LSN {}",
key,
cont_lsn,
request_lsn
)
}
prev_lsn = cont_lsn;
}
ValueReconstructResult::Missing => {
bail!(
"could not find data for key {} at LSN {}, for request at LSN {}",
key,
cont_lsn,
request_lsn
)

// Recurse into ancestor if needed
if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
trace!(
"going into ancestor {}, cont_lsn is {}",
timeline.ancestor_lsn,
cont_lsn
);
let ancestor = timeline.get_ancestor_timeline()?;
drop(layers);
timeline_owned = ancestor;
timeline = &*timeline_owned;
prev_lsn = Lsn(u64::MAX);
continue 'for_all_timelines;
}
}

// Recurse into ancestor if needed
if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
trace!(
"going into ancestor {}, cont_lsn is {}",
timeline.ancestor_lsn,
cont_lsn
);
let ancestor = timeline.get_ancestor_timeline()?;
timeline_owned = ancestor;
timeline = &*timeline_owned;
prev_lsn = Lsn(u64::MAX);
continue;
}
// Check the open and frozen in-memory layers first
if let Some(open_layer) = &layers.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
)?;
cont_lsn = lsn_floor;
path.push((result, cont_lsn, open_layer.clone()));
continue;
}
}
for frozen_layer in layers.frozen_layers.iter() {
let start_lsn = frozen_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
)?;
cont_lsn = lsn_floor;
path.push((result, cont_lsn, frozen_layer.clone()));
continue 'for_all_layers;
}
}

let layers = timeline.layers.read().unwrap();
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn)? {
//info!("CHECKING for {} at {} on historic layer {}", key, cont_lsn, layer.filename().display());

// Check the open and frozen in-memory layers first
if let Some(open_layer) = &layers.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
)?;
cont_lsn = lsn_floor;
path.push((result, cont_lsn, open_layer.clone()));
continue;
}
}
for frozen_layer in layers.frozen_layers.iter() {
let start_lsn = frozen_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = frozen_layer.get_value_reconstruct_data(
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
)?;
cont_lsn = lsn_floor;
path.push((result, cont_lsn, frozen_layer.clone()));
continue 'outer;
path.push((result, cont_lsn, layer));
} else if self.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
result = ValueReconstructResult::Continue;
cont_lsn = Lsn(self.ancestor_lsn.0 + 1);
} else {
// Nothing found
result = ValueReconstructResult::Missing;
}
}

if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn)? {
//info!("CHECKING for {} at {} on historic layer {}", key, cont_lsn, layer.filename().display());

let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
)?;
cont_lsn = lsn_floor;
path.push((result, cont_lsn, layer));
} else if self.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
result = ValueReconstructResult::Continue;
cont_lsn = Lsn(self.ancestor_lsn.0 + 1);
} else {
// Nothing found
result = ValueReconstructResult::Missing;
}
}
}

Expand Down