Skip to content

Commit

Permalink
light-client: improve supervisor ergonomics (#403)
Browse files Browse the repository at this point in the history
* light-client: turn Handle into a trait

As we start to depend on the surface of the `Handle` we benefit from it
being a trait that can be implemented on a per need basis. This will
result in less overhead constructing object graphs in places where we
wannt to assert behaviour of other types in remote places, i.e. the
light-node rpc server. Overall we hope for an increased ease in writing
tests on module level.

Ref #219

* Add changes entry

* Remove async trait methods

* Make supervisor more CSP

* Improve crate imports

* Relax mutability on Handle

* Provide noop default implementations

* Update adr status

* Handle crossbeam errors honestly

* Add changes entry

* Mention ADR in change entry

* Correct PR link

* Simplify example
  • Loading branch information
xla authored Jul 3, 2020
1 parent 9adad9a commit 78ff0c3
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 137 deletions.
7 changes: 5 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
## pending
# pending

Light Client:

- Expose latest_trusted from Supervisor Handle ([#394])
- Turn `Handle` into a trait for ease of integration and testability ([#401])
- Improve `Supervisor` ergonomics according to [ADR-007] ([#403])

[#394]: https://github.com/informalsystems/tendermint-rs/pull/394
[#401]: https://github.com/informalsystems/tendermint-rs/pull/401
[#403]: https://github.com/informalsystems/tendermint-rs/pull/403
[ADR-007]: https://github.com/informalsystems/tendermint-rs/blob/master/docs/architecture/adr-007-light-client-supervisor-ergonomics.md

## [0.14.1] (2020-06-23)

Expand Down Expand Up @@ -274,7 +277,7 @@ This release is compatible with [tendermint v0.28]
[#205]: https://github.com/tendermint/kms/pull/219
[#181]: https://github.com/tendermint/kms/pull/181
[tendermint v0.29]: https://github.com/tendermint/tendermint/blob/master/CHANGELOG.md#v0290
[tendermint v0.28]: https://github.com/tendermint/tendermint/blob/master/CHANGELOG.md#v0280
[tendermint v0.28]: https://github.com/tendermint/tendermint/blob/master/CHANGELOG.md#v0280
[#30]: https://github.com/interchainio/tendermint-rs/pull/30
[#16]: https://github.com/interchainio/tendermint-rs/pull/16
[#15]: https://github.com/interchainio/tendermint-rs/pull/15
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Changelog

* 2020-06-30: Initial draft
* 2020-07-02: approved with corrections

## Context

Expand Down Expand Up @@ -32,7 +33,7 @@ server and ibc relayer.

## Status

Proposed
Approved

## Consequences

Expand Down
12 changes: 6 additions & 6 deletions light-client/examples/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,19 @@ fn sync_cmd(opts: SyncOpts) {
ProdEvidenceReporter::new(peer_addr),
);

let mut handle = supervisor.handle();
let handle = supervisor.handle();

std::thread::spawn(|| supervisor.run());

loop {
handle.verify_to_highest_async(|result| match result {
match handle.verify_to_highest() {
Ok(light_block) => {
println!("[ info ] synced to block {}", light_block.height());
println!("[info] synced to block {}", light_block.height());
}
Err(e) => {
println!("[ error ] sync failed: {}", e);
Err(err) => {
println!("[error] sync failed: {}", err);
}
});
}

std::thread::sleep(Duration::from_millis(800));
}
Expand Down
26 changes: 0 additions & 26 deletions light-client/src/callback.rs

This file was deleted.

18 changes: 18 additions & 0 deletions light-client/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Toplevel errors raised by the light client.

use std::fmt::Debug;

use anomaly::{BoxError, Context};
use crossbeam_channel as crossbeam;
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand Down Expand Up @@ -57,6 +60,9 @@ pub enum ErrorKind {

#[error("invalid light block: {0}")]
InvalidLightBlock(#[source] VerificationError),

#[error("internal channel disconnected")]
ChannelDisconnected,
}

impl ErrorKind {
Expand Down Expand Up @@ -107,3 +113,15 @@ impl ErrorExt for ErrorKind {
}
}
}

impl<T: Debug + Send + Sync + 'static> From<crossbeam::SendError<T>> for ErrorKind {
fn from(_err: crossbeam::SendError<T>) -> Self {
Self::ChannelDisconnected
}
}

impl From<crossbeam::RecvError> for ErrorKind {
fn from(_err: crossbeam::RecvError) -> Self {
Self::ChannelDisconnected
}
}
1 change: 0 additions & 1 deletion light-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

//! See the `light_client` module for the main documentation.

pub mod callback;
pub mod components;
pub mod contracts;
pub mod errors;
Expand Down
159 changes: 58 additions & 101 deletions light-client/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,49 @@ use crossbeam_channel as channel;

use tendermint::evidence::{ConflictingHeadersEvidence, Evidence};

use crate::{
bail,
callback::Callback,
errors::{Error, ErrorKind},
evidence::EvidenceReporter,
fork_detector::{Fork, ForkDetection, ForkDetector},
light_client::LightClient,
peer_list::PeerList,
state::State,
types::{Height, LightBlock, PeerId, Status},
};
use crate::bail;
use crate::errors::{Error, ErrorKind};
use crate::evidence::EvidenceReporter;
use crate::fork_detector::{Fork, ForkDetection, ForkDetector};
use crate::light_client::LightClient;
use crate::peer_list::PeerList;
use crate::state::State;
use crate::types::{Height, LightBlock, PeerId, Status};

pub trait Handle {
/// Get latest trusted block from the [`Supervisor`].
fn latest_trusted(&mut self) -> Result<Option<LightBlock>, Error>;
fn latest_trusted(&self) -> Result<Option<LightBlock>, Error> {
todo!()
}

/// Verify to the highest block.
fn verify_to_highest(&mut self) -> Result<LightBlock, Error>;
fn verify_to_highest(&self) -> Result<LightBlock, Error> {
todo!()
}

/// Verify to the block at the given height.
fn verify_to_target(&mut self, height: Height) -> Result<LightBlock, Error>;

/// Async version of `verify_to_highest`.
///
/// The given `callback` will be called asynchronously with the
/// verification result.
fn verify_to_highest_async(
&mut self,
callback: impl FnOnce(Result<LightBlock, Error>) -> () + Send + 'static,
);

/// Async version of `verify_to_target`.
///
/// The given `callback` will be called asynchronously with the
/// verification result.
fn verify_to_target_async(
&mut self,
height: Height,
callback: impl FnOnce(Result<LightBlock, Error>) -> () + Send + 'static,
);
fn verify_to_target(&self, _height: Height) -> Result<LightBlock, Error> {
todo!()
}

/// Terminate the underlying [`Supervisor`].
fn terminate(&mut self);
fn terminate(&self) -> Result<(), Error> {
todo!()
}
}

/// Input events sent by the [`Handle`]s to the [`Supervisor`]. They carry a [`Callback`] which is
/// used to communicate back the responses of the requests.
#[derive(Debug)]
enum HandleInput {
/// Terminate the supervisor process
Terminate(Callback<()>),
Terminate(channel::Sender<()>),
/// Verify to the highest height, call the provided callback with result
VerifyToHighest(Callback<Result<LightBlock, Error>>),
VerifyToHighest(channel::Sender<Result<LightBlock, Error>>),
/// Verify to the given height, call the provided callback with result
VerifyToTarget(Height, Callback<Result<LightBlock, Error>>),
VerifyToTarget(Height, channel::Sender<Result<LightBlock, Error>>),
/// Get the latest trusted block.
LatestTrusted(Callback<Result<Option<LightBlock>, Error>>),
LatestTrusted(channel::Sender<Option<LightBlock>>),
}

/// A light client `Instance` packages a `LightClient` together with its `State`.
Expand Down Expand Up @@ -98,7 +84,7 @@ impl Instance {
/// removed.
///
/// The supervisor is intended to be ran in its own thread, and queried
/// via a `Handle`, sync- or asynchronously.
/// via a `Handle`.
///
/// ## Example
///
Expand All @@ -111,12 +97,13 @@ impl Instance {
///
/// loop {
/// // Asynchronously query the supervisor via a handle
/// handle.verify_to_highest_async(|result| match result {
/// let maybe_block = handle.verify_to_highest();
/// match maybe_block {
/// Ok(light_block) => {
/// println!("[ info ] synced to block {}", light_block.height());
/// println!("[info] synced to block {}", light_block.height());
/// }
/// Err(e) => {
/// println!("[ error ] sync failed: {}", e);
/// println!("[error] sync failed: {}", e);
/// }
/// });
///
Expand Down Expand Up @@ -314,26 +301,26 @@ impl Supervisor {
/// Run the supervisor event loop in the same thread.
///
/// This method should typically be called within a new thread with `std::thread::spawn`.
pub fn run(mut self) {
pub fn run(mut self) -> Result<(), Error> {
loop {
let event = self.receiver.recv().unwrap();
let event = self.receiver.recv().map_err(ErrorKind::from)?;

match event {
HandleInput::LatestTrusted(callback) => {
HandleInput::LatestTrusted(sender) => {
let outcome = self.latest_trusted();
callback.call(Ok(outcome));
sender.send(outcome).map_err(ErrorKind::from)?;
}
HandleInput::Terminate(callback) => {
callback.call(());
return;
HandleInput::Terminate(sender) => {
sender.send(()).map_err(ErrorKind::from)?;
return Ok(());
}
HandleInput::VerifyToTarget(height, callback) => {
HandleInput::VerifyToTarget(height, sender) => {
let outcome = self.verify_to_target(height);
callback.call(outcome);
sender.send(outcome).map_err(ErrorKind::from)?;
}
HandleInput::VerifyToHighest(callback) => {
HandleInput::VerifyToHighest(sender) => {
let outcome = self.verify_to_highest();
callback.call(outcome);
sender.send(outcome).map_err(ErrorKind::from)?;
}
}
}
Expand All @@ -354,73 +341,43 @@ impl SupervisorHandle {
}

fn verify(
&mut self,
make_event: impl FnOnce(Callback<Result<LightBlock, Error>>) -> HandleInput,
&self,
make_event: impl FnOnce(channel::Sender<Result<LightBlock, Error>>) -> HandleInput,
) -> Result<LightBlock, Error> {
let (sender, receiver) = channel::bounded::<Result<LightBlock, Error>>(1);

let callback = Callback::new(move |result| {
sender.send(result).unwrap();
});
let event = make_event(sender);
self.sender.send(event).map_err(ErrorKind::from)?;

let event = make_event(callback);
self.sender.send(event).unwrap();

receiver.recv().unwrap()
receiver.recv().map_err(ErrorKind::from)?
}
}

impl Handle for SupervisorHandle {
fn latest_trusted(&mut self) -> Result<Option<LightBlock>, Error> {
let (sender, receiver) = channel::bounded::<Result<Option<LightBlock>, Error>>(1);

let callback = Callback::new(move |result| {
sender.send(result).unwrap();
});
fn latest_trusted(&self) -> Result<Option<LightBlock>, Error> {
let (sender, receiver) = channel::bounded::<Option<LightBlock>>(1);

// TODO(xla): Transform crossbeam errors into proper domain errors.
self.sender
.send(HandleInput::LatestTrusted(callback))
.unwrap();
.send(HandleInput::LatestTrusted(sender))
.map_err(ErrorKind::from)?;

// TODO(xla): Transform crossbeam errors into proper domain errors.
receiver.recv().unwrap()
Ok(receiver.recv().map_err(ErrorKind::from)?)
}

fn verify_to_highest(&mut self) -> Result<LightBlock, Error> {
fn verify_to_highest(&self) -> Result<LightBlock, Error> {
self.verify(HandleInput::VerifyToHighest)
}

fn verify_to_target(&mut self, height: Height) -> Result<LightBlock, Error> {
self.verify(|callback| HandleInput::VerifyToTarget(height, callback))
}

fn verify_to_highest_async(
&mut self,
callback: impl FnOnce(Result<LightBlock, Error>) -> () + Send + 'static,
) {
let event = HandleInput::VerifyToHighest(Callback::new(callback));
self.sender.send(event).unwrap();
fn verify_to_target(&self, height: Height) -> Result<LightBlock, Error> {
self.verify(|sender| HandleInput::VerifyToTarget(height, sender))
}

fn verify_to_target_async(
&mut self,
height: Height,
callback: impl FnOnce(Result<LightBlock, Error>) -> () + Send + 'static,
) {
let event = HandleInput::VerifyToTarget(height, Callback::new(callback));
self.sender.send(event).unwrap();
}

fn terminate(&mut self) {
fn terminate(&self) -> Result<(), Error> {
let (sender, receiver) = channel::bounded::<()>(1);

let callback = Callback::new(move |_| {
sender.send(()).unwrap();
});

self.sender.send(HandleInput::Terminate(callback)).unwrap();
self.sender
.send(HandleInput::Terminate(sender))
.map_err(ErrorKind::from)?;

receiver.recv().unwrap()
Ok(receiver.recv().map_err(ErrorKind::from)?)
}
}

0 comments on commit 78ff0c3

Please sign in to comment.