Skip to content

Commit

Permalink
feature(execution-engine): Canon data with CID (#419)
Browse files Browse the repository at this point in the history
* Use CID values for tetraplets and `canon` vectors.

* Rename `cid_store` to `value_store`

It is consistent with the new `tetraplet_store` and `canon_store`
fields.

* Make canon data more typeful

The `CanonResult` doesn't take a JSON value anymore that is further
deserialized elsewhere, but is a struct that has all data deserialized.

* Typeful `CID` type

The `CID` type has a phantom type paramter defining its value's type.

* Group cid stores and trackers

Group cid stores into `CidInfo` struct, and trackers into `ExecutionCidState` struct.
  • Loading branch information
monoid authored Jan 9, 2023
1 parent f73e246 commit 8f587b7
Show file tree
Hide file tree
Showing 28 changed files with 726 additions and 104 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## Version 0.35.0 (2022-12-27)

[PR 419](https://github.com/fluencelabs/aquavm/pull/419):
- Rename data's `cid_store` field to `value_store`.
- Canon data is stored with CIDs. Values, tetraplets and canon elements
are stored as CIDs resolved with data's `value_store`, `tetraplet_store`
and `canon_store` fields respectively.
- Group stores in the data into `cid_info: CidInfo` field.

## Version 0.34.0 (2022-12-26)

[PR 414](https://github.com/fluencelabs/aquavm/pull/414):
Expand Down
11 changes: 7 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion air-interpreter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "air-interpreter"
version = "0.34.0"
version = "0.35.0"
description = "Crate-wrapper for air"
authors = ["Fluence Labs"]
edition = "2018"
Expand Down
4 changes: 2 additions & 2 deletions air/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "air"
version = "0.34.0"
version = "0.35.0"
description = "Interpreter of AIR scripts intended to coordinate request flow in the Fluence network"
authors = ["Fluence Labs"]
edition = "2018"
Expand All @@ -17,7 +17,7 @@ doctest = false
[dependencies]
air-parser = { path = "../crates/air-lib/air-parser" }
air-execution-info-collector = { path = "../crates/air-lib/execution-info-collector" }
air-interpreter-cid = { version = "0.1.0", path = "../crates/air-lib/interpreter-cid" }
air-interpreter-cid = { version = "0.2.0", path = "../crates/air-lib/interpreter-cid" }
air-interpreter-data = { path = "../crates/air-lib/interpreter-data" }
air-interpreter-interface = { path = "../crates/air-lib/interpreter-interface", default-features = false }
air-log-targets = { path = "../crates/air-lib/log-targets" }
Expand Down
4 changes: 4 additions & 0 deletions air/src/execution_step/boxed_value/canon_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct CanonStream {
}

impl CanonStream {
pub(crate) fn new(values: Vec<ValueAggregate>, tetraplet: Rc<SecurityTetraplet>) -> Self {
Self { values, tetraplet }
}

pub(crate) fn from_stream(stream: &Stream, peer_pk: String) -> Self {
// it's always possible to iter over all generations of a stream
let values = stream.iter(Generation::Last).unwrap().cloned().collect::<Vec<_>>();
Expand Down
9 changes: 4 additions & 5 deletions air/src/execution_step/errors/uncatchable_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::JValue;
use crate::ToErrorCode;

use air_interpreter_cid::CidCalculationError;
use air_interpreter_cid::CID;
use air_interpreter_data::TracePos;
use air_interpreter_data::ValueRef;
use air_trace_handler::merger::MergerApResult;
Expand All @@ -29,8 +28,6 @@ use strum_macros::EnumDiscriminants;
use strum_macros::EnumIter;
use thiserror::Error as ThisError;

use std::rc::Rc;

/// Uncatchable errors arisen during AIR script execution. Uncatchable here means that these errors
/// couldn't be handled by a xor instruction and their error_code couldn't be used in a match
/// instruction. They are similar to JVM runtime errors and some of them could be caught only
Expand Down Expand Up @@ -98,8 +95,10 @@ pub enum UncatchableError {
#[error("failed to calculate value's CID")]
CidError(#[from] CidCalculationError),

#[error("value for CID {0:?} not found")]
ValueForCidNotFound(Rc<CID>),
/// We consider now that every CID should present in the data;
/// and not having any CID is considered a non-catching error.
#[error("{0} for CID {1:?} not found")]
ValueForCidNotFound(&'static str, String),
}

impl ToErrorCode for UncatchableError {
Expand Down
86 changes: 76 additions & 10 deletions air/src/execution_step/execution_context/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@ use super::LastError;
use super::LastErrorDescriptor;
use super::Scalars;
use super::Streams;
use crate::execution_step::RcSecurityTetraplet;
use crate::execution_step::ValueAggregate;
use crate::JValue;
use crate::UncatchableError;

use air_execution_info_collector::InstructionTracker;
use air_interpreter_cid::CID;
use air_interpreter_data::CidStore;
use air_interpreter_data::CanonCidAggregate;
use air_interpreter_data::CidInfo;
use air_interpreter_data::CidTracker;
use air_interpreter_data::GlobalStreamGens;
use air_interpreter_data::RestrictedStreamGens;
use air_interpreter_data::TracePos;
use air_interpreter_interface::*;
use polyplets::SecurityTetraplet;

use std::rc::Rc;

Expand Down Expand Up @@ -69,8 +75,8 @@ pub(crate) struct ExecutionCtx<'i> {
/// Tracks all functions that should be called from services.
pub(crate) call_requests: CallRequests,

/// Merged CID-to-value dictionaries
pub(crate) cid_tracker: CidTracker,
/// CID-to-something trackers.
pub(crate) cid_state: ExecutionCidState,
}

impl<'i> ExecutionCtx<'i> {
Expand All @@ -88,15 +94,15 @@ impl<'i> ExecutionCtx<'i> {
current_ingredients.restricted_streams,
);

let cid_tracker = CidTracker::from_cid_stores(prev_ingredients.cid_store, current_ingredients.cid_store);
let cid_state = ExecutionCidState::from_cid_info(prev_ingredients.cid_info, current_ingredients.cid_info);

Self {
run_parameters,
subgraph_completeness: true,
last_call_request_id: prev_ingredients.last_call_request_id,
call_results,
streams,
cid_tracker,
cid_state,
..<_>::default()
}
}
Expand All @@ -109,10 +115,6 @@ impl<'i> ExecutionCtx<'i> {
self.last_call_request_id += 1;
self.last_call_request_id
}

pub(crate) fn get_value_by_cid(&self, cid: &CID) -> Option<Rc<JValue>> {
self.cid_tracker.get(cid)
}
}

impl ExecutionCtx<'_> {
Expand All @@ -139,7 +141,71 @@ pub(crate) struct ExecCtxIngredients {
pub(crate) global_streams: GlobalStreamGens,
pub(crate) last_call_request_id: u32,
pub(crate) restricted_streams: RestrictedStreamGens,
pub(crate) cid_store: CidStore<JValue>,
pub(crate) cid_info: CidInfo,
}

#[derive(Debug, Default, Clone)]
pub struct ExecutionCidState {
pub(crate) value_tracker: CidTracker<JValue>,
pub(crate) tetraplet_tracker: CidTracker<SecurityTetraplet>,
pub(crate) canon_tracker: CidTracker<CanonCidAggregate>,
}

impl ExecutionCidState {
fn from_cid_info(prev_cid_info: CidInfo, current_cid_info: CidInfo) -> Self {
Self {
value_tracker: CidTracker::from_cid_stores(prev_cid_info.value_store, current_cid_info.value_store),
tetraplet_tracker: CidTracker::from_cid_stores(
prev_cid_info.tetraplet_store,
current_cid_info.tetraplet_store,
),
canon_tracker: CidTracker::from_cid_stores(prev_cid_info.canon_store, current_cid_info.canon_store),
}
}

pub(crate) fn get_value_by_cid(&self, cid: &CID<JValue>) -> Result<Rc<JValue>, UncatchableError> {
self.value_tracker
.get(cid)
.ok_or_else(|| UncatchableError::ValueForCidNotFound("value", cid.clone().into()))
}

pub(crate) fn get_tetraplet_by_cid(
&self,
cid: &CID<SecurityTetraplet>,
) -> Result<RcSecurityTetraplet, UncatchableError> {
self.tetraplet_tracker
.get(cid)
.ok_or_else(|| UncatchableError::ValueForCidNotFound("tetraplet", cid.clone().into()))
}

pub(crate) fn get_canon_value_by_cid(
&self,
cid: &CID<CanonCidAggregate>,
) -> Result<ValueAggregate, UncatchableError> {
let canon_aggregate = self
.canon_tracker
.get(cid)
.ok_or_else(|| UncatchableError::ValueForCidNotFound("canon aggregate", cid.clone().into()))?;
let result = self.get_value_by_cid(&canon_aggregate.value)?;
let tetraplet = self.get_tetraplet_by_cid(&canon_aggregate.tetraplet)?;

let fake_trace_pos = TracePos::default();
Ok(ValueAggregate {
result,
tetraplet,
trace_pos: fake_trace_pos,
})
}
}

impl From<ExecutionCidState> for CidInfo {
fn from(value: ExecutionCidState) -> Self {
Self {
value_store: value.value_tracker.into(),
tetraplet_store: value.tetraplet_tracker.into(),
canon_store: value.canon_tracker.into(),
}
}
}

use serde::Deserialize;
Expand Down
11 changes: 4 additions & 7 deletions air/src/execution_step/instructions/call/call_result_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ pub(crate) fn populate_context_from_peer_service_result<'i>(
exec_ctx: &mut ExecutionCtx<'i>,
) -> ExecutionResult<CallResult> {
let cid = exec_ctx
.cid_tracker
.cid_state
.value_tracker
.record_value(executed_result.result.clone())
.map_err(UncatchableError::from)?;

Expand Down Expand Up @@ -69,17 +70,13 @@ pub(crate) fn populate_context_from_data<'i>(
) -> ExecutionResult<ValueRef> {
match (output, value) {
(CallOutputValue::Scalar(scalar), ValueRef::Scalar(cid)) => {
let value = exec_ctx
.get_value_by_cid(&cid)
.ok_or_else(|| UncatchableError::ValueForCidNotFound(cid.clone()))?;
let value = exec_ctx.cid_state.get_value_by_cid(&cid)?;
let result = ValueAggregate::new(value, tetraplet, trace_pos);
exec_ctx.scalars.set_scalar_value(scalar.name, result)?;
Ok(ValueRef::Scalar(cid))
}
(CallOutputValue::Stream(stream), ValueRef::Stream { cid, generation }) => {
let value = exec_ctx
.get_value_by_cid(&cid)
.ok_or_else(|| UncatchableError::ValueForCidNotFound(cid.clone()))?;
let value = exec_ctx.cid_state.get_value_by_cid(&cid)?;
let result = ValueAggregate::new(value, tetraplet, trace_pos);
let value_descriptor = StreamValueDescriptor::new(
result,
Expand Down
Loading

0 comments on commit 8f587b7

Please sign in to comment.