Skip to content

Commit

Permalink
GrpcStore retry first
Browse files Browse the repository at this point in the history
We see the first request in a GrpcStore request failing but this bypasses
the retry logic because it's already passed out of the retry function.

Read the first message from the upstream GrpcStore within the retry logic,
only exit from the retry logic after the first message was recieved.
  • Loading branch information
chrisstaite-menlo committed Jan 17, 2024
1 parent 6a379b3 commit 4a514df
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 17 deletions.
2 changes: 1 addition & 1 deletion nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl ByteStreamServer {
let any_store = store.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let stream = grpc_store.read(Request::new(read_request)).await?.into_inner();
let stream = grpc_store.read(Request::new(read_request)).await?;
return Ok(Response::new(Box::pin(stream)));
}

Expand Down
56 changes: 40 additions & 16 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use rand::rngs::OsRng;
use rand::Rng;
use tokio::time::sleep;
use tonic::transport::Channel;
use tonic::{transport, IntoRequest, Request, Response, Streaming};
use tonic::{transport, IntoRequest, Request, Response, Status, Streaming};
use tracing::error;
use uuid::Uuid;

Expand All @@ -65,6 +65,22 @@ pub struct GrpcStore {
retrier: Retrier,
}

struct FirstStream {
first_response: Option<Option<ReadResponse>>,
stream: Streaming<ReadResponse>,
}

impl Stream for FirstStream {
type Item = Result<ReadResponse, Status>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
if let Some(first_response) = self.first_response.take() {
return std::task::Poll::Ready(first_response.map(Ok));
}
Pin::new(&mut self.stream).poll_next(cx)
}
}

impl GrpcStore {
pub async fn new(config: &nativelink_config::stores::GrpcStore) -> Result<Self, Error> {
let jitter_amt = config.retry.jitter;
Expand Down Expand Up @@ -233,7 +249,7 @@ impl GrpcStore {
pub async fn read(
&self,
grpc_request: impl IntoRequest<ReadRequest>,
) -> Result<Response<Streaming<ReadResponse>>, Error> {
) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> {
error_if!(
matches!(self.store_type, nativelink_config::stores::StoreType::ac),
"CAS operation on AC store"
Expand All @@ -253,11 +269,21 @@ impl GrpcStore {
);

self.perform_request(request, |request| async move {
self.bytestream_client
let mut response = self
.bytestream_client
.clone()
.read(Request::new(request))
.await
.err_tip(|| "in GrpcStore::read")
.err_tip(|| "in GrpcStore::read")?
.into_inner();
let first_response = response
.message()
.await
.err_tip(|| "Fetching first chunk in GrpcStore::read()")?;
Ok(Box::new(FirstStream {
first_response: Some(first_response),
stream: response,
}))
})
.await
}
Expand Down Expand Up @@ -640,20 +666,18 @@ impl Store for GrpcStore {
read_limit: length.unwrap_or(0) as i64,
}))
.await
.err_tip(|| "in GrpcStore::get_part()")?
.into_inner();
.err_tip(|| "in GrpcStore::get_part()")?;

loop {
let maybe_message = stream
.message()
.await
.err_tip(|| "While fetching message in GrpcStore::get_part()")?;
let Some(message) = maybe_message else {
writer
.send_eof()
.await
.err_tip(|| "Could not send eof in GrpcStore::get_part()")?;
break; // EOF.
let message = match future::poll_fn(|cx| Pin::new(&mut stream).poll_next(cx)).await {
Some(message) => message.err_tip(|| "While fetching message in GrpcStore::get_part()")?,
None => {
writer
.send_eof()
.await
.err_tip(|| "Could not send eof in GrpcStore::get_part()")?;
break; // EOF.
}
};
if message.data.is_empty() {
writer
Expand Down

0 comments on commit 4a514df

Please sign in to comment.