Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: timeout argument for client.sample #18

Open
gehring opened this issue Sep 23, 2020 · 15 comments · May be fixed by #133
Open

Feature request: timeout argument for client.sample #18

gehring opened this issue Sep 23, 2020 · 15 comments · May be fixed by #133
Assignees
Labels
enhancement New feature or request

Comments

@gehring
Copy link

gehring commented Sep 23, 2020

Similarly to #4, it would be useful to be able to back out of sampling without needing to wrap things in a thread or use an executor. I agree that in many cases, you'd want to sample asynchronously to maximize throughput, but there are cases where the predictability and simplicity are preferable even if comes at the expense of efficiency (e.g., in research). A timeout argument would simplify the synchronous setting without sacrificing safeguards from indefinite waiting.

@tfboyd tfboyd added review Tag to review at the next standup. enhancement New feature or request and removed review Tag to review at the next standup. labels Nov 6, 2020
@tfboyd
Copy link
Collaborator

tfboyd commented Nov 11, 2020

We have worked on this a couple of times and we want to resolve it. We do not have an ETA.

@ebrevdo
Copy link
Collaborator

ebrevdo commented Apr 12, 2021

The Reverb datasets do allow a rate_limiter_timeout_ms argument; is that sufficient for your needs?

@gehring
Copy link
Author

gehring commented Apr 13, 2021

I had to move away from reverb for now since I had trouble working with states with variable shape (I'm not sure if that is supported but it seemed possible to me at time) but I'll can try to recall as best I can the context for this.

If you're referring to ReplayDataset, then I don't believe it would have worked for me due to the explicit shape argument. Also, I believe what I was hoping to do is to avoid explicitly checking if there are enough samples, using the timeout as sort of a hack. The context for this is one where you have several problem/env instances generating samples to be stored in separate buffer. Batches would be sampled from a random buffer but some buffers might not have seen enough samples. Skipping updates when batches timeout would have made handling things simple regardless of whether updates were done synchronously or asynchronously. Dealing with the iterator closing every time would likely have been a bit awkward for that use case though it probably could have been workable.

That being said, I want to acknowledge that it's very possible that I was trying to use reverb to do things it wasn't meant to support! I wouldn't be offended if that was the gist of your reply ;)

@ebrevdo
Copy link
Collaborator

ebrevdo commented May 25, 2021

The new Trajectory{Writer,Dataset} do provide more flexibility; it's also possible to have variable shaped tensors as long as you're not concatenating them into time trajectories.

That said, the python client sampler lacks a timeout argument - only the Datasets allow it. So I'll leave this open until we propagate the rate_limiter_timeout_ms into the python API for the python sampler.

@ebrevdo ebrevdo self-assigned this May 25, 2021
@varshneydevansh
Copy link

Is this still pending? Can I look into this?

@acassirer
Copy link
Contributor

The FR is still open so feel free to take a look! Just a warning though that the implementation is a bit involved so it isn't quite as just "propagating an argument".

@varshneydevansh
Copy link

Hi @acassirer,

I sat last night and tried to understand that the problem encountered in the #4 is a common one in distributed systems or client-server architectures: how to handle timeouts gracefully when the server goes offline or becomes unresponsive. When using the Reverb library, as the example shows, the client can indeed get blocked indefinitely if the server crashes or is terminated.

Our current workaround was using the server_info method to check the server's status before calling create_item makes sense but, as we've noted, it is not completely foolproof.

The provided suggestion from the supposed response regarding using concurrent.futures for offloading the create_item call to a separate thread is a valid approach. The main advantage there is that we can set a timeout on the thread's execution, and if the server goes down, the timeout will trigger, allowing your client to handle the situation gracefully.

The updated Reverb API seems to provide a method called flush for the TrajectoryWriter which has a built-in timeout. This is ideal because it means that timeout functionality is natively supported by the library. You can set a timeout for the flush operation using the timeout_ms argument. If the operation doesn't complete within the timeout, a DeadlineExceededError is raised. This is a much cleaner and efficient way to handle timeouts compared to using a separate thread for each call.


In this FR do, we have to modify the https://github.com/deepmind/reverb/blob/58f5f018082860caa4057d24d75d725709dcd2bb/reverb/client.py#L345

def sample(
   timeout: Optional[float] = None,

somewhat like this and then maybe we have to modify the RPC call related to this method?

@acassirer
Copy link
Contributor

Hey,

The sampler in the c++ layer have to be updated and the pybind layer have to be modified accordingly. The RPC call might have to change in two ways:

  1. You would have to make use of the rate limiter timeout ms. This only applies when the server is up and you are blocked on the lack of data rather than the server being offline.
  2. The gRPC context probably have to be configured so that the call is terminated if the server cannot be found within the deadline.

@varshneydevansh
Copy link

https://github.com/deepmind/reverb/blob/58f5f018082860caa4057d24d75d725709dcd2bb/reverb/cc/sampler.cc#L198

https://github.com/deepmind/reverb/blob/58f5f018082860caa4057d24d75d725709dcd2bb/reverb/cc/sampler.h#L127

 std::pair<int64_t, absl::Status> FetchSamples(
      internal::Queue<std::unique_ptr<Sample>>* queue, int64_t num_samples,
      absl::Duration rate_limiter_timeout) override {
    std::unique_ptr<grpc::ClientReaderWriterInterface<SampleStreamRequest,
                                                      SampleStreamResponse>>

absl::Duration rate_limiter_timeout

the timeout is already being set -

      request.mutable_rate_limiter_timeout()->set_milliseconds(
          NonnegativeDurationToInt64Millis(rate_limiter_timeout));

This is what I modified in the sampler.cc

  std::pair<int64_t, absl::Status> FetchSamples(
      internal::Queue<std::unique_ptr<Sample>>* queue, int64_t num_samples,
      absl::Duration rate_limiter_timeout) override {
    std::unique_ptr<grpc::ClientReaderWriterInterface<SampleStreamRequest,
                                                      SampleStreamResponse>>
        stream;
    {
      absl::MutexLock lock(&mu_);
      if (closed_) {
        return {0, absl::CancelledError("`Close` called on Sampler.")};
      }
      context_ = std::make_unique<grpc::ClientContext>();
      context_->set_wait_for_ready(false);

      // Setting the deadline for the gRPC context
      context_->set_deadline(absl::ToChronoTime(absl::Now() + rate_limiter_timeout));

      stream = stub_->SampleStream(context_.get());
    }

@acassirer
Copy link
Contributor

That does indeed look sensible with the exception that the time it takes to connect is not zero so there is a potential issue when you establish a connection only to have the gRPC deadline expire before the rate limiter returns a valid sample. This would result in the data being lost as it is successfully sampled from the table but never returned to the caller.

@varshneydevansh
Copy link

So should I do something like this?

       // Buffer time to account for connection overhead
       constexpr auto CONNECTION_BUFFER_TIME = std::chrono::milliseconds(50); // Or some other suitable value

      // Setting the deadline for the gRPC context
      context_->set_deadline(absl::ToChronoTime(absl::Now() + rate_limiter_timeout + CONNECTION_BUFFER_TIME));
  • Yes, I may need a little more help, as I am pretty much a junior developer. But, I'm picking up new things as I go.

@acassirer
Copy link
Contributor

Yes I think something like that would be reasonable. The important thing will be to add test coverage in the C++ layer and Python layer.

@varshneydevansh
Copy link

So, beside the test coverage, are there any other changes, do I have to make changes?

I am also opening the PR so that you can guide me better. =)

@varshneydevansh varshneydevansh linked a pull request Aug 23, 2023 that will close this issue
3 tasks
@acassirer
Copy link
Contributor

You would have to change the pybind layer as well of course in order to expose it to Python. Then there is the story with the datasets in which this MUST NOT be enabled.

Then the test coverage will show more of whether this solution is working or not.

@varshneydevansh
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants