Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming gateway WIP #4

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open

Streaming gateway WIP #4

wants to merge 36 commits into from

Conversation

kalabukdima
Copy link
Contributor

No description provided.

Send additional request when all ongoing ones wait for too long
- Update to the latest transport crate
- Downgrade rustls to 0.23.10 to avoid cert error
  #2
- Use subsquid-datasets crate from archive-router repo
If a worker has responded with only a subset of blocks
in a chunk, send continuation request for the rest of
the chunk repeatedly until the full chunk is fetched
@kalabukdima kalabukdima self-assigned this Aug 16, 2024
src/controller/stream.rs Outdated Show resolved Hide resolved
Cargo.toml Outdated Show resolved Hide resolved
src/controller/stream.rs Outdated Show resolved Hide resolved
src/controller/stream.rs Outdated Show resolved Hide resolved
src/controller/stream.rs Outdated Show resolved Hide resolved
src/controller/stream.rs Outdated Show resolved Hide resolved

use super::stream::StreamController;

const MAX_PARALLEL_STREAMS: u8 = 5;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely should be much larger than that and I suggest to remove it for now.

It is not clear yet on what principles this limit should exist. Most likely it is simply total_memory / memory_usage_per_stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the main reason for this is to limit memory usage. If we want to host a public gateway, we surely don't want to make it unlimited

pub struct ClientRequest {
pub dataset_id: DatasetId,
pub query: ParsedQuery,
pub buffer_size: usize,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything below is definitely something not to be exposed even in the "secret mode".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's primarily designed for debugging, I don't see a reason to not allow users of their own gateway to set those parameters per request. They can set them in the config anyway.
But if we will host a public gateway, those should definitely be limited or removed at all.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to not allow users of their own gateway to set those parameters per request

It is not polite to expose to users low level details, that could change anytime. Also, I believe, those kinds of settings could be set once and for all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I will hide it from public API, but I don't want to hardcode it because it slows down the development process if you have to recompile and restart the gateway (it's more than a minute) to test a new value

src/controller/stream.rs Outdated Show resolved Hide resolved
src/controller/stream.rs Show resolved Hide resolved
}
}

let index = self.buffer.first_index();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value could be returned from .push_front() and .first_index() could be removed all together, to make idea behind SlidingArray more clear.

pub struct ClientRequest {
pub dataset_id: DatasetId,
pub query: ParsedQuery,
pub buffer_size: usize,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to not allow users of their own gateway to set those parameters per request

It is not polite to expose to users low level details, that could change anytime. Also, I believe, those kinds of settings could be set once and for all.

src/controller/timeouts.rs Outdated Show resolved Hide resolved
if let Some(timeout) = timeout {
tokio::select! {
_ = tokio::time::sleep_until(start + timeout) => break,
recv = current_timeout.changed() => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, such a pedantic tracking of a desired timeout only useful at the start of streaming, when no times are available.

Another way to bootstrap things:

  1. For the first batch of requests issue each one with a 100 ms timeout.

  2. Once 100 ms timeout triggers

2.1. If there are successful responses or it is a very first chunk in the stream, then try again.

2.2. Otherwise wait another 100 ms and go to 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. I've made it only get the timeout at the moment of sending a request, without adjusting it if it changes

let num_infs = self.num_infs.load(Ordering::Relaxed);

// TODO: optimize time complexity
let kth = ((durations.len() + num_infs) as f32 * self.quantile).floor() as usize;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the role of num_infs here? Why "dead" workers should affect the expected response times of healthy ones?

My suggestion is:

  • Use VecDeque to hold up to N (~50) last successful response times

  • Make sure, that response times for chunks that are far behind the current buffer never sneak in

  • When there is no enough entries to trim at the desired percentile, use the longest response time for timeout (possibly some reserve).

  • When no time info is available use something like 200ms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This algorithm ensures that the timeout_percentile of the slowest requests are retried on average. Its a generalization of what you proposed (wait until 90% of the requests complete, then retry the rest) but in the streaming manner - when the requests could start at different times. In this case I'm tracking the durations of completed requests and the current number of "ongoing" requests. The goal is to find a timeout which would "cut" the 1-timeout_quantile fraction of the remaining requests.

I totally agree that this algorithm is hard to comprehend. But I couldn't fast a better one with the same performance and the same good qualities, namely:

  • the requests are retried as soon as we know they fall into the given fraction of the slowest ones,
  • if less than the timeout_percentile requests have finished so far, we will continue waiting for any of them to complete,
  • a request which has been just retried should be also timed out with the current duration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, almost as you suggested, but instead of using the longest existing time I always use the hard-coded timeout until there are 50 observations. Otherwise, some unexpected durations (either too fast or too long) could lead to unpredictable consequences.

priorities: HashMap<PeerId, Priority>,
}

impl WorkersPool {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern regarding the chosen method of evaluation of worker performance is that it is hard to reason about impact of the distant past behaviour.

For example, a worker could have a hiccup, but yet to receive quite a bit of queries at that moment (e.g. because it hosts some low replicated data chunks, that got a sudden demand). In such case, the worker will be significantly de-prioritised, but for how long? Might be for a very long time, depending on a usage pattern and overall network performance...

I suggest to track workers with a set of windowed (by time) counters and to compute resulting priorities at query time with some formula.

A set of counters could be:

  • Number of "big" timeouts
  • Number of times the worker failed to respond within a retry threshold
  • Number of times worker responded within a 50% percentile
  • Number of Busy errors
  • Number of currently pending requests made to to this worker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These priorities are reset every epoch, which is 30mins currently, but I suspect it may get shorter later. This simple algo has already sped up the streaming about 2-4 times.

I like what you're suggesting, but how about getting back to it after the initial release? It looks like improving performance without ability to run real world benchmarks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

- Report stream summary
- Add canonical log lines for HTTP requests
- Reorganize span fields
- Support JSON log format
Determine the request timeout at the moment of sending it.
Just return the default timeout if not enough data is available
- Remove backoff time — retry errors immediately or fail
- Don't extend the buffer if no more queries could be sent
@kalabukdima kalabukdima force-pushed the streaming branch 2 times, most recently from a3113f8 to f3f1fe2 Compare September 9, 2024 18:09
Get rid of S3 API dependency
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants