Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(downloaders): Download bodies while writing to disk to utilize bandwidth optimally #744

Closed
gakonst opened this issue Jan 6, 2023 · 6 comments
Assignees
Labels
C-enhancement New feature or request

Comments

@gakonst
Copy link
Member

gakonst commented Jan 6, 2023

Context

  1. Right now, the bodies downloader gets a batch of blocks given the headers

async fn fetch_bodies(
&self,
headers: Vec<&SealedHeader>,
) -> DownloadResult<Vec<BlockResponse>> {

  1. Exposed over bodies_stream which concurrently batch polls the headers split in chunks

fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> DownloadStream<'a, BlockResponse>
where
I: IntoIterator<Item = &'b SealedHeader>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a,
{
Box::pin(
stream::iter(headers.into_iter())
.chunks(self.batch_size)
.map(move |headers| {
(move || self.fetch_bodies(headers.clone()))
.retry(ExponentialBackoff::default().with_max_times(self.retries))
})
.buffered(self.concurrency)
.map_ok(|blocks| stream::iter(blocks.into_iter()).map(Ok))
.try_flatten(),
)
}
}

  1. The bodies stage queries the DB with a range of blocks to get the headers it needs

fn bodies_to_download<DB: Database>(
&self,
tx: &mut <DB as DatabaseGAT<'_>>::TXMut,
starting_block: BlockNumber,
target: BlockNumber,
) -> Result<Vec<SealedHeader>, StageError> {
let mut header_cursor = tx.cursor::<tables::Headers>()?;
let mut header_hashes_cursor = tx.cursor::<tables::CanonicalHeaders>()?;
let mut walker = header_hashes_cursor
.walk(starting_block)?
.take_while(|item| item.as_ref().map_or(false, |(num, _)| *num <= target));
let mut bodies_to_download = Vec::new();
while let Some(Ok((block_number, header_hash))) = walker.next() {
let (_, header) = header_cursor.seek_exact((block_number, header_hash).into())?.ok_or(
DatabaseIntegrityError::Header { number: block_number, hash: header_hash },
)?;
bodies_to_download.push(SealedHeader::new(header, header_hash));
}
Ok(bodies_to_download)
}
}

  1. Then there's the I/O heavy component which seems to block the stream

let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = input.stage_progress.unwrap_or_default();
debug!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, start_tx_id = current_tx_id, transition_id, "Commencing sync");
while let Some(result) = bodies_stream.next().await {
let Ok(response) = result else {
error!(target: "sync::stages::bodies", block = highest_block + 1, error = ?result.unwrap_err(), "Error downloading block");
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
})
};

  1. As a result, the stream isn't downloading stuff in the background until it's polled again, underutilizing our bandwidth. It

image

Solution

  • Should the downloader keep downloading while we're writing blocks in the i/o part of the loop?
  • Should we be spawning the i/o loop?

idk/not sure keen to discuss.

@gakonst gakonst added C-enhancement New feature or request S-needs-triage This issue needs to be labelled labels Jan 6, 2023
@gakonst gakonst changed the title perf(downloaders): Download while writing to disk perf(downloaders): Download bodies while writing to disk to utilize bandwidth optimally Jan 6, 2023
@rkrasiuk
Copy link
Member

rkrasiuk commented Jan 6, 2023

You can't do it without breaking the current control flow (e.g. delegating commit responsibility to stages), since downloaders are not separate processes, but short-lived streams (they exist only for the duration of a single Stage::execute invocation)

@gakonst
Copy link
Member Author

gakonst commented Jan 6, 2023

Maybe the downloader can download larger slices than the commit threshold in a background task? And the bodies stream simply yields these values instead of downloading + yielding them? cc @mattsse also given stream territory

@gakonst
Copy link
Member Author

gakonst commented Jan 6, 2023

Maybe related to the logic to be unified with Headers here #741 (comment)

@onbjerg
Copy link
Collaborator

onbjerg commented Jan 7, 2023

That block in the while loop is not I/O heavy, it is not writing to disk. I think the heavy part is in the downloader itself where it validates tx root in-place for each body which is blocking. Things are only written to disk when the stage is done (tx.commit is called). It might be blocking though, but I think that's just a product of the stream not being handrolled like for headers.

This also seems to suggest that we are downloading bodies really fast but blocked on other stuff, so we can up the commit threshold.

@mattsse
Copy link
Collaborator

mattsse commented Jan 7, 2023

downloading should be a separate task then and the stage just controls that task, submits new batch requests, pulls responses then.

@onbjerg
Copy link
Collaborator

onbjerg commented Jan 8, 2023

Superceded by #764

@onbjerg onbjerg closed this as completed Jan 8, 2023
@github-project-automation github-project-automation bot moved this from Todo to Done in Reth Tracker Jan 8, 2023
@onbjerg onbjerg removed this from Reth Tracker Jan 11, 2023
@DaniPopes DaniPopes removed the S-needs-triage This issue needs to be labelled label Apr 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants