Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] #3468: Server-side cursor #3621

Merged
merged 4 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
898 changes: 506 additions & 392 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ ursa = "0.3.7"
aead = "0.3.2"

rand = "0.8.5"
warp = { version = "0.3.3", default-features = false }
warp = { version = "0.3.5", default-features = false }
wasmtime = "0.39.1"

tracing = "0.1.37"
Expand Down
9 changes: 7 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,17 @@ thiserror = { workspace = true }
displaydoc = { workspace = true }
tokio = { workspace = true, features = ["sync", "time", "rt", "io-util", "rt-multi-thread", "macros", "fs", "signal"] }
warp = { workspace = true, features = ["multipart", "websocket"] }
serial_test = "0.8.0"
once_cell = { workspace = true }
owo-colors = { workspace = true, features = ["supports-colors"] }
supports-color = { workspace = true }
thread-local-panic-hook = { version = "0.1.0", optional = true }
tempfile = { workspace = true }
dashmap = { workspace = true }

thread-local-panic-hook = { version = "0.1.0", optional = true }
uuid = { version = "1.4.1", features = ["v4"] }

[dev-dependencies]
serial_test = "0.8.0"

[build-dependencies]
iroha_wasm_builder = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion cli/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const TIMEOUT: Duration = Duration::from_millis(10_000);
const TIMEOUT: Duration = Duration::from_millis(1000);

/// Error type with generic for actual Stream/Sink error type
#[derive(thiserror::Error, displaydoc::Display, Debug)]
#[derive(Debug, displaydoc::Display, thiserror::Error)]
#[ignore_extra_doc_attributes]
pub enum Error<InternalStreamError>
where
Expand Down
78 changes: 78 additions & 0 deletions cli/src/torii/cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::num::{NonZeroU64, NonZeroUsize};

use crate::torii::{Error, Result};

pub trait Batch: IntoIterator + Sized {
fn batched(self, fetch_size: NonZeroUsize) -> Batched<Self>;
}

impl<I: IntoIterator> Batch for I {
fn batched(self, batch_size: NonZeroUsize) -> Batched<Self> {
Batched {
iter: self.into_iter(),
batch_size,
cursor: Some(0),
}
}
}

/// Paginated [`Iterator`].
/// Not recommended to use directly, only use in iterator chains.
#[derive(Debug)]
pub struct Batched<I: IntoIterator> {
iter: I::IntoIter,
batch_size: NonZeroUsize,
cursor: Option<u64>,
}

impl<I: IntoIterator + FromIterator<I::Item>> Batched<I> {
pub(crate) fn next_batch(&mut self, cursor: Option<u64>) -> Result<(I, Option<NonZeroU64>)> {
if cursor != self.cursor {
return Err(Error::UnknownCursor);
}

let mut batch_size = 0;
let batch: I = self
.iter
.by_ref()
.inspect(|_| batch_size += 1)
.take(self.batch_size.get())
.collect();

self.cursor = if let Some(cursor) = self.cursor {
if batch_size >= self.batch_size.get() {
let batch_size = self
.batch_size
.get()
.try_into()
.expect("usize should fit in u64");
Some(
cursor
.checked_add(batch_size)
.expect("Cursor size should never reach the platform limit"),
)
} else {
None
}
} else if batch_size >= self.batch_size.get() {
Some(
self.batch_size
.get()
.try_into()
.expect("usize should fit in u64"),
)
} else {
None
};

Ok((
batch,
self.cursor
.map(|cursor| NonZeroU64::new(cursor).expect("Cursor is never 0")),
))
}

pub fn is_depleted(&self) -> bool {
self.cursor.is_none()
}
}
55 changes: 49 additions & 6 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use std::{
fmt::{Debug, Write as _},
net::ToSocketAddrs,
sync::Arc,
time::{Duration, Instant},
};

use dashmap::DashMap;
use futures::{stream::FuturesUnordered, StreamExt};
use iroha_core::{
kura::Kura,
Expand All @@ -17,8 +19,9 @@ use iroha_core::{
sumeragi::SumeragiHandle,
EventsSender,
};
use thiserror::Error;
use tokio::sync::Notify;
use iroha_data_model::Value;
use parity_scale_codec::Encode;
use tokio::{sync::Notify, time::sleep};
use utils::*;
use warp::{
http::StatusCode,
Expand All @@ -27,10 +30,45 @@ use warp::{
Filter as _, Reply,
};

use self::cursor::Batched;

#[macro_use]
pub(crate) mod utils;
mod cursor;
mod pagination;
pub mod routing;
mod routing;

type LiveQuery = Batched<Vec<Value>>;

#[derive(Default)]
struct LiveQueryStore {
queries: DashMap<(String, Vec<u8>), (LiveQuery, Instant)>,
}

impl LiveQueryStore {
fn insert<T: Encode>(&self, query_id: String, request: T, live_query: LiveQuery) {
self.queries
.insert((query_id, request.encode()), (live_query, Instant::now()));
}

fn remove<T: Encode>(&self, query_id: &str, request: &T) -> Option<LiveQuery> {
self.queries
.remove(&(query_id.to_string(), request.encode()))
.map(|(_, (output, _))| output)
}

// TODO: Add notifier channel to enable graceful shutdown
fn expired_query_cleanup(self: Arc<Self>, idle_time: Duration) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
loop {
sleep(idle_time).await;

self.queries
.retain(|_, (_, last_access_time)| last_access_time.elapsed() <= idle_time);
Arjentix marked this conversation as resolved.
Show resolved Hide resolved
}
})
}
}

/// Main network handler and the only entrypoint of the Iroha.
pub struct Torii {
Expand All @@ -39,11 +77,12 @@ pub struct Torii {
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: SumeragiHandle,
query_store: Arc<LiveQueryStore>,
kura: Arc<Kura>,
}

/// Torii errors.
#[derive(Debug, Error, displaydoc::Display)]
#[derive(Debug, thiserror::Error, displaydoc::Display)]
pub enum Error {
/// Failed to execute or validate query
Query(#[from] iroha_data_model::ValidationFail),
Expand All @@ -58,10 +97,12 @@ pub enum Error {
#[cfg(feature = "telemetry")]
/// Error while getting Prometheus metrics
Prometheus(#[source] eyre::Report),
/// Error while resuming cursor
UnknownCursor,
}

/// Status code for query error response.
pub(crate) fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
use iroha_data_model::{
isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*,
ValidationFail::*,
Expand Down Expand Up @@ -104,7 +145,9 @@ impl Error {
use Error::*;
match self {
Query(e) => query_status_code(e),
AcceptTransaction(_) | ConfigurationReload(_) => StatusCode::BAD_REQUEST,
AcceptTransaction(_) | ConfigurationReload(_) | UnknownCursor => {
StatusCode::BAD_REQUEST
}
Config(_) => StatusCode::NOT_FOUND,
PushIntoQueue(err) => match **err {
queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down
97 changes: 47 additions & 50 deletions cli/src/torii/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use iroha_data_model::prelude::*;
use iroha_data_model::query::Pagination;

/// Describes a collection to which pagination can be applied.
/// Implemented for the [`Iterator`] implementors.
Expand All @@ -7,61 +7,57 @@ pub trait Paginate: Iterator + Sized {
fn paginate(self, pagination: Pagination) -> Paginated<Self>;
}

impl<I: Iterator + Sized> Paginate for I {
impl<I: Iterator> Paginate for I {
fn paginate(self, pagination: Pagination) -> Paginated<Self> {
Paginated {
pagination,
iter: self,
}
Paginated::new(pagination, self)
}
}

/// Paginated [`Iterator`].
/// Not recommended to use directly, only use in iterator chains.
#[derive(Debug)]
pub struct Paginated<I: Iterator> {
pagination: Pagination,
iter: I,
pub struct Paginated<I: Iterator>(core::iter::Take<core::iter::Skip<I>>);

impl<I: Iterator> Paginated<I> {
fn new(pagination: Pagination, iter: I) -> Self {
Self(
iter.skip(pagination.start.map_or_else(
|| 0,
|start| start.get().try_into().expect("U64 should fit into usize"),
))
.take(pagination.limit.map_or_else(
|| usize::MAX,
|limit| limit.get().try_into().expect("U32 should fit into usize"),
)),
)
}
}

impl<I: Iterator> Iterator for Paginated<I> {
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
if let Some(limit) = self.pagination.limit.as_mut() {
if *limit == 0 {
return None;
}

*limit -= 1
}

#[allow(clippy::option_if_let_else)]
// Required because of E0524. 2 closures with unique refs to self
if let Some(start) = self.pagination.start.take() {
self.iter
.nth(start.try_into().expect("u32 should always fit in usize"))
} else {
self.iter.next()
}
self.0.next()
}
}

/// Filter for warp which extracts pagination
pub fn paginate() -> impl warp::Filter<Extract = (Pagination,), Error = warp::Rejection> + Copy {
warp::query()
}

#[cfg(test)]
mod tests {
use std::num::{NonZeroU32, NonZeroU64};

use iroha_data_model::query::pagination::Pagination;

use super::*;

#[test]
fn empty() {
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(None, None))
.paginate(Pagination {
limit: None,
start: None
})
.collect::<Vec<_>>(),
vec![1_i32, 2_i32, 3_i32]
)
Expand All @@ -72,21 +68,20 @@ mod tests {
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(Some(0), None))
.collect::<Vec<_>>(),
vec![1_i32, 2_i32, 3_i32]
);
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(Some(1), None))
.paginate(Pagination {
limit: None,
start: NonZeroU64::new(1)
})
.collect::<Vec<_>>(),
vec![2_i32, 3_i32]
);
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(Some(3), None))
.paginate(Pagination {
limit: None,
start: NonZeroU64::new(3)
})
.collect::<Vec<_>>(),
Vec::<i32>::new()
);
Expand All @@ -97,21 +92,20 @@ mod tests {
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(None, Some(0)))
.collect::<Vec<_>>(),
Vec::<i32>::new()
);
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(None, Some(2)))
.paginate(Pagination {
limit: NonZeroU32::new(2),
start: None
})
.collect::<Vec<_>>(),
vec![1_i32, 2_i32]
);
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(None, Some(4)))
.paginate(Pagination {
limit: NonZeroU32::new(4),
start: None
})
.collect::<Vec<_>>(),
vec![1_i32, 2_i32, 3_i32]
);
Expand All @@ -122,7 +116,10 @@ mod tests {
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(Some(1), Some(1)))
.paginate(Pagination {
limit: NonZeroU32::new(1),
start: NonZeroU64::new(1),
})
.collect::<Vec<_>>(),
vec![2_i32]
)
Expand Down
Loading