Skip to content

Commit

Permalink
[feature] #3468: return iterator as query execution result
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic committed Jun 20, 2023
1 parent db75181 commit 133f667
Show file tree
Hide file tree
Showing 27 changed files with 810 additions and 604 deletions.
47 changes: 18 additions & 29 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// FIXME: This can't be fixed, because one trait in `warp` is private.
#![allow(opaque_hidden_inferred_bound)]

use std::{cmp::Ordering, num::TryFromIntError};
use std::cmp::Ordering;

use eyre::WrapErr;
use futures::TryStreamExt;
Expand All @@ -15,7 +15,10 @@ use iroha_config::{
torii::uri,
GetConfiguration, PostConfiguration,
};
use iroha_core::{smartcontracts::isi::query::ValidQueryRequest, sumeragi::SumeragiHandle};
use iroha_core::{
smartcontracts::{isi::query::ValidQueryRequest, query::LazyValue},
sumeragi::SumeragiHandle,
};
use iroha_data_model::{
block::{
stream::{
Expand All @@ -25,7 +28,6 @@ use iroha_data_model::{
VersionedCommittedBlock,
},
prelude::*,
query::error::QueryExecutionFail,
};
use iroha_logger::prelude::*;
#[cfg(feature = "telemetry")]
Expand Down Expand Up @@ -71,43 +73,34 @@ pub(crate) async fn handle_queries(
pagination: Pagination,
sorting: Sorting,
request: VersionedSignedQuery,
) -> Result<Scale<VersionedPaginatedQueryResult>> {
let result = {
let mut wsv = sumeragi.wsv_clone();
let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
valid_request.execute(&wsv).map_err(ValidationFail::from)?
};
) -> Result<Scale<VersionedQueryResult>> {
let mut wsv = sumeragi.wsv_clone();

let (total, result) = if let Value::Vec(vec_of_val) = result {
let len = vec_of_val.len();
let vec_of_val = apply_sorting_and_pagination(vec_of_val.into_iter(), &sorting, pagination);
let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
let result = valid_request.execute(&wsv).map_err(ValidationFail::from)?;

(len, Value::Vec(vec_of_val))
} else {
(1, result)
let result = match result {
LazyValue::Value(value) => value,
LazyValue::Iter(iter) => {
Value::Vec(apply_sorting_and_pagination(iter, &sorting, pagination))
}
};

let total = total
.try_into()
.map_err(|e: TryFromIntError| QueryExecutionFail::Conversion(e.to_string()))
.map_err(ValidationFail::from)?;
let result = QueryResult(result);
let paginated_result = PaginatedQueryResult {
let paginated_result = QueryResult {
result,
pagination,
sorting,
total,
};
Ok(Scale(paginated_result.into()))
}

fn apply_sorting_and_pagination(
vec_of_val: impl Iterator<Item = Value>,
iter: impl Iterator<Item = Value>,
sorting: &Sorting,
pagination: Pagination,
) -> Vec<Value> {
if let Some(key) = &sorting.sort_by_metadata_key {
let mut pairs: Vec<(Option<Value>, Value)> = vec_of_val
let mut pairs: Vec<(Option<Value>, Value)> = iter
.map(|value| {
let key = match &value {
Value::Identifiable(IdentifiableBox::Asset(asset)) => match asset.value() {
Expand Down Expand Up @@ -137,7 +130,7 @@ fn apply_sorting_and_pagination(
.paginate(pagination)
.collect()
} else {
vec_of_val.paginate(pagination).collect()
iter.paginate(pagination).collect()
}
}

Expand Down Expand Up @@ -167,7 +160,6 @@ async fn handle_pending_transactions(
Ok(Scale(
queue
.all_transactions(&wsv)
.into_iter()
.map(Into::into)
.paginate(pagination)
.collect::<Vec<_>>(),
Expand Down Expand Up @@ -348,7 +340,6 @@ mod subscription {
async fn handle_version(sumeragi: SumeragiHandle) -> Json {
use iroha_version::Version;

#[allow(clippy::expect_used)]
let string = sumeragi
.apply_wsv(WorldStateView::latest_block_ref)
.expect("Genesis not applied. Nothing we can do. Solve the issue and rerun.")
Expand Down Expand Up @@ -422,7 +413,6 @@ impl Torii {
}
}

#[allow(opaque_hidden_inferred_bound)]
#[cfg(feature = "telemetry")]
/// Helper function to create router. This router can tested without starting up an HTTP server
fn create_telemetry_router(
Expand Down Expand Up @@ -458,7 +448,6 @@ impl Torii {
}

/// Helper function to create router. This router can tested without starting up an HTTP server
#[allow(opaque_hidden_inferred_bound)]
pub(crate) fn create_api_router(
&self,
) -> impl warp::Filter<Extract = impl warp::Reply> + Clone + Send {
Expand Down
24 changes: 10 additions & 14 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ where
// Separate-compilation friendly response handling
fn _handle_query_response_base(
resp: &Response<Vec<u8>>,
) -> QueryHandlerResult<VersionedPaginatedQueryResult> {
) -> QueryHandlerResult<VersionedQueryResult> {
match resp.status() {
StatusCode::OK => {
let res = VersionedPaginatedQueryResult::decode_all_versioned(resp.body());
let res = VersionedQueryResult::decode_all_versioned(resp.body());
res.wrap_err(
"Failed to decode response from Iroha. \
You are likely using a version of the client library \
Expand Down Expand Up @@ -143,7 +143,7 @@ where
}
}

_handle_query_response_base(&resp).and_then(|VersionedPaginatedQueryResult::V1(result)| {
_handle_query_response_base(&resp).and_then(|VersionedQueryResult::V1(result)| {
ClientQueryRequest::try_from(result).map_err(Into::into)
})
}
Expand Down Expand Up @@ -238,7 +238,7 @@ impl From<ResponseReport> for eyre::Report {
}
}

/// More convenient version of [`iroha_data_model::prelude::PaginatedQueryResult`].
/// More convenient version of [`iroha_data_model::prelude::QueryResult`].
/// The only difference is that this struct has `output` field extracted from the result
/// accordingly to the source query.
#[derive(Clone, Debug)]
Expand All @@ -249,12 +249,10 @@ where
{
/// Query output
pub output: R::Output,
/// See [`iroha_data_model::prelude::PaginatedQueryResult`]
/// See [`iroha_data_model::prelude::QueryResult`]
pub pagination: Pagination,
/// See [`iroha_data_model::prelude::PaginatedQueryResult`]
/// See [`iroha_data_model::prelude::QueryResult`]
pub sorting: Sorting,
/// See [`iroha_data_model::prelude::PaginatedQueryResult`]
pub total: u64,
}

impl<R> ClientQueryRequest<R>
Expand All @@ -268,30 +266,28 @@ where
}
}

impl<R> TryFrom<PaginatedQueryResult> for ClientQueryRequest<R>
impl<R> TryFrom<QueryResult> for ClientQueryRequest<R>
where
R: Query + Debug,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
type Error = eyre::Report;

fn try_from(
PaginatedQueryResult {
QueryResult {
result,
pagination,
sorting,
total,
}: PaginatedQueryResult,
}: QueryResult,
) -> Result<Self> {
let output = R::Output::try_from(result.into())
let output = R::Output::try_from(result)
.map_err(Into::into)
.wrap_err("Unexpected type")?;

Ok(Self {
output,
pagination,
sorting,
total,
})
}
}
Expand Down
10 changes: 6 additions & 4 deletions client/tests/integration/triggers/time_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@ fn change_asset_metadata_after_1_sec() -> Result<()> {
usize::try_from(PERIOD_MS / DEFAULT_CONSENSUS_ESTIMATION_MS + 1)?,
)?;

let value = test_client.request(FindAssetDefinitionKeyValueByIdAndKey {
id: asset_definition_id.into(),
key: key.into(),
})?;
let value = test_client
.request(FindAssetDefinitionKeyValueByIdAndKey {
id: asset_definition_id.into(),
key: key.into(),
})
.map(Into::into)?;
assert!(matches!(value, Value::Numeric(NumericValue::U32(3_u32))));

Ok(())
Expand Down
Binary file modified configs/peer/validator.wasm
Binary file not shown.
10 changes: 10 additions & 0 deletions core/quang
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
1. repositories:
- cbindgen(git@github.com:mozilla/cbindgen.git),
- iroha2(git@github.com:hyperledger/iroha.git) iroha2-stable branch
- iroha2-java(git@github.com:hyperledger/iroha-java.git)

2. run iroha nodes and connect to it via iroha_client_cli and http app
3. run cbindgen on a small example:
- export a function from Rust
- generate C bindings with cbindgen
- link this C lib from C and Java
12 changes: 6 additions & 6 deletions core/src/kura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,17 +319,17 @@ impl Kura {
}

/// Put a block in kura's in memory block store.
pub fn store_block(&self, block: impl Into<Arc<VersionedCommittedBlock>>) {
let block = block.into();
self.block_data.lock().push((block.hash(), Some(block)));
pub fn store_block(&self, block: VersionedCommittedBlock) {
self.block_data
.lock()
.push((block.hash(), Some(Arc::new(block))));
}

/// Replace the block in `Kura`'s in memory block store.
pub fn replace_top_block(&self, block: impl Into<Arc<VersionedCommittedBlock>>) {
let block = block.into();
pub fn replace_top_block(&self, block: VersionedCommittedBlock) {
let mut data = self.block_data.lock();
data.pop();
data.push((block.hash(), Some(block)));
data.push((block.hash(), Some(Arc::new(block))));
}
}

Expand Down
6 changes: 4 additions & 2 deletions core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,14 @@ impl Queue {
}

/// Returns all pending transactions.
pub fn all_transactions(&self, wsv: &WorldStateView) -> Vec<AcceptedTransaction> {
pub fn all_transactions<'wsv>(
&'wsv self,
wsv: &'wsv WorldStateView,
) -> impl Iterator<Item = AcceptedTransaction> + 'wsv {
self.txs
.iter()
.filter(|e| self.is_pending(e.value(), wsv))
.map(|e| e.value().clone())
.collect()
}

/// Returns `n` randomly selected transaction from the queue.
Expand Down
Loading

0 comments on commit 133f667

Please sign in to comment.