Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(drive): state transition observability #1846

Merged
merged 21 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/dashmate/templates/platform/gateway/envoy.yaml.dot
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,13 @@
path: "/org.dash.platform.dapi.v0.Platform/waitForStateTransitionResult"
route:
cluster: dapi_api
idle_timeout: 80s
idle_timeout: 85s
# Upstream response timeout
timeout: 80s
timeout: 85s
max_stream_duration:
# Entire stream/request timeout
max_stream_duration: 80s
grpc_timeout_header_max: 80s
max_stream_duration: 85s
grpc_timeout_header_max: 85s
# DAPI getConsensusParams endpoint
- match:
path: "/org.dash.platform.dapi.v0.Platform/getConsensusParams"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use platform_serialization_derive::{PlatformDeserialize, PlatformSerialize};
use thiserror::Error;

#[derive(Error, Debug, Clone, Encode, Decode, PlatformSerialize, PlatformDeserialize)]
#[error("Parsing of serialized object failed due to: {parsing_error}")]
#[error("State transition decoding failed: {parsing_error}")]
QuantumExplorer marked this conversation as resolved.
Show resolved Hide resolved
#[platform_serialize(unversioned)]
pub struct SerializedObjectParsingError {
/*
Expand Down
86 changes: 76 additions & 10 deletions packages/rs-drive-abci/src/abci/handler/check_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ use crate::abci::handler::error::consensus::AbciResponseInfoGetter;
use crate::abci::handler::error::HandlerError;
use crate::error::execution::ExecutionError;
use crate::error::Error;
use crate::metrics::{LABEL_ABCI_RESPONSE_CODE, LABEL_CHECK_TX_MODE, LABEL_STATE_TRANSITION_NAME};
use crate::platform_types::platform::Platform;
use crate::rpc::core::CoreRPCLike;
use dpp::consensus::codes::ErrorWithCode;
use dpp::fee::SignedCredits;
use dpp::util::hash::hash_single;
use metrics::Label;
use tenderdash_abci::proto::abci as proto;

pub fn check_tx<C>(
Expand All @@ -15,7 +18,7 @@ pub fn check_tx<C>(
where
C: CoreRPCLike,
{
let _timer = crate::metrics::abci_request_duration("check_tx");
let mut timer = crate::metrics::abci_request_duration("check_tx");

let platform_state = platform.state.load();
let platform_version = platform_state.current_platform_version()?;
Expand All @@ -31,7 +34,14 @@ where

validation_result
.and_then(|validation_result| {
let first_consensus_error = validation_result.errors.first();
let (check_tx_result, errors) =
validation_result.into_data_and_errors().map_err(|_| {
Error::Execution(ExecutionError::CorruptedCodeExecution(
"validation result should contain check tx result",
))
})?;

let first_consensus_error = errors.first();

let (code, info) = if let Some(consensus_error) = first_consensus_error {
(
Expand All @@ -43,14 +53,9 @@ where
(0, "".to_string())
};

let check_tx_result = validation_result.into_data().map_err(|_| {
Error::Execution(ExecutionError::CorruptedCodeExecution(
"validation result should contain check tx result",
))
})?;

let gas_wanted = check_tx_result
.fee_result
.as_ref()
.map(|fee_result| fee_result.total_base_fee())
.unwrap_or_default();

Expand All @@ -61,20 +66,81 @@ where
.cloned()
.unwrap_or_default();

let state_transition_name =
if let Some(ref name) = check_tx_result.state_transition_name {
name.to_owned()
} else {
"Unknown".to_string()
};
QuantumExplorer marked this conversation as resolved.
Show resolved Hide resolved

let priority = check_tx_result.priority as i64;

if tracing::enabled!(tracing::Level::TRACE) {
let message = match (r#type, code) {
(0, 0) => "added to mempool".to_string(),
(1, 0) => "kept in mempool after re-check".to_string(),
(0, _) => format!(
"rejected with code {code} due to error: {}",
first_consensus_error
.expect("consensus error must be present with non-zero error code")
QuantumExplorer marked this conversation as resolved.
Show resolved Hide resolved
),
(1, _) => format!(
"removed from mempool with code {code} after re-check due to error: {}",
first_consensus_error
.expect("consensus error must be present with non-zero error code")
QuantumExplorer marked this conversation as resolved.
Show resolved Hide resolved
),
_ => unreachable!("we have only 2 modes of check tx"),
QuantumExplorer marked this conversation as resolved.
Show resolved Hide resolved
};

let state_transition_hash = check_tx_result
.state_transition_hash
.expect("state transition hash must be present if trace level is enabled");
QuantumExplorer marked this conversation as resolved.
Show resolved Hide resolved

let st_hash = hex::encode(state_transition_hash);

tracing::trace!(
?check_tx_result,
error = ?first_consensus_error,
st_hash,
"{} state transition ({}) {}",
state_transition_name,
st_hash,
message
);
}

timer.add_label(Label::new(
LABEL_STATE_TRANSITION_NAME,
state_transition_name,
));
timer.add_label(Label::new(LABEL_CHECK_TX_MODE, r#type.to_string()));
timer.add_label(Label::new(LABEL_ABCI_RESPONSE_CODE, code.to_string()));

Ok(proto::ResponseCheckTx {
code,
data: vec![],
info,
gas_wanted: gas_wanted as SignedCredits,
codespace: "".to_string(),
sender: first_unique_identifier,
priority: check_tx_result.priority as i64,
priority,
})
})
.or_else(|error| {
let handler_error = HandlerError::Internal(error.to_string());

tracing::error!(?error, "check_tx failed");
if tracing::enabled!(tracing::Level::ERROR) {
let st_hash = hex::encode(hash_single(tx));

tracing::error!(
?error,
st_hash,
check_tx_mode = r#type,
"Failed to check state transition ({}): {}",
st_hash,
error
);
}

Ok(proto::ResponseCheckTx {
code: handler_error.code(),
Expand Down
10 changes: 7 additions & 3 deletions packages/rs-drive-abci/src/execution/check_tx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ mod v0;
#[repr(u8)]
#[derive(Copy, Clone, Debug)]
pub enum CheckTxLevel {
FirstTimeCheck,
Recheck,
FirstTimeCheck = 0,
Recheck = 1,
}

impl TryFrom<u8> for CheckTxLevel {
Expand Down Expand Up @@ -51,7 +51,7 @@ impl TryFrom<i32> for CheckTxLevel {
}

/// The result of a check tx
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct CheckTxResult {
/// The level used when checking the transaction
pub level: CheckTxLevel,
Expand All @@ -65,6 +65,10 @@ pub struct CheckTxResult {
/// Priority to return to tenderdash. State Transitions with higher priority take precedence
/// over state transitions with lower priority
pub priority: u32,
/// State transition type name. Using for logging
pub state_transition_name: Option<String>,
/// State transition ID. Using for logging
pub state_transition_hash: Option<[u8; 32]>,
}

impl<C> Platform<C>
Expand Down
71 changes: 40 additions & 31 deletions packages/rs-drive-abci/src/execution/check_tx/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ use crate::rpc::core::CoreRPCLike;
use dpp::consensus::ConsensusError;

use crate::error::execution::ExecutionError;
use crate::execution::types::state_transition_container::v0::StateTransitionContainerGettersV0;
use crate::execution::types::state_transition_container::v0::{
DecodedStateTransition, FaultyStateTransition, InvalidEncodedStateTransition,
SuccessfullyDecodedStateTransition,
};
#[cfg(test)]
use crate::execution::validation::state_transition::processor::process_state_transition;
use crate::platform_types::platform_state::PlatformState;
#[cfg(test)]
use dpp::serialization::PlatformDeserializable;
#[cfg(test)]
use dpp::state_transition::StateTransition;
use dpp::util::hash::hash_single;
use dpp::validation::ValidationResult;
use dpp::version::PlatformVersion;
#[cfg(test)]
Expand Down Expand Up @@ -77,7 +81,8 @@ where
/// Checks a state transition to determine if it should be added to the mempool.
///
/// This function performs a few checks, including validating the state transition and ensuring that the
/// user can pay for it. It may be inaccurate in rare cases, so the proposer needs to re-check transactions
/// user can pay for it. From the time a state transition is added to the mempool to the time it is included in a proposed block,
/// a previously valid state transition may have become invalid, so the proposer needs to re-check transactions
/// before proposing a block.
///
/// # Arguments
Expand All @@ -95,47 +100,49 @@ where
platform_state: &PlatformState,
platform_version: &PlatformVersion,
) -> Result<ValidationResult<CheckTxResult, ConsensusError>, Error> {
let mut state_transition_hash = None;
if tracing::enabled!(tracing::Level::TRACE) {
state_transition_hash = Some(hash_single(raw_tx));
}

let mut check_tx_result = CheckTxResult {
level: check_tx_level,
fee_result: None,
unique_identifiers: vec![],
priority: 0,
state_transition_name: None,
state_transition_hash,
};

let raw_state_transitions = vec![raw_tx];
let state_transition_container =
self.decode_raw_state_transitions(&raw_state_transitions, platform_version)?;

let (
mut valid_state_transitions,
mut invalid_state_transitions,
mut invalid_state_transitions_with_protocol_error,
) = state_transition_container.destructure();

// Return internal error if happened
if !invalid_state_transitions_with_protocol_error.is_empty() {
let (_, protocol_error) = invalid_state_transitions_with_protocol_error.remove(0);

return Err(protocol_error.into());
}

// Return consensus error if happened
if !invalid_state_transitions.is_empty() {
let (_, consensus_error) = invalid_state_transitions.remove(0);

return Ok(ValidationResult::new_with_data_and_errors(
check_tx_result,
vec![consensus_error],
));
}
let mut decoded_state_transitions: Vec<DecodedStateTransition> = self
.decode_raw_state_transitions(&raw_state_transitions, platform_version)?
.into();

// If there are no errors, then state transition is valid
if valid_state_transitions.is_empty() {
if decoded_state_transitions.len() != 1 {
return Err(Error::Execution(ExecutionError::CorruptedCodeExecution(
"valid state transition must be present",
"expected exactly one decoded state transition",
)));
}
let (_, state_transition) = valid_state_transitions.remove(0);

let state_transition = match decoded_state_transitions.remove(0) {
DecodedStateTransition::SuccessfullyDecoded(SuccessfullyDecodedStateTransition {
decoded,
..
}) => decoded,
DecodedStateTransition::InvalidEncoding(InvalidEncodedStateTransition {
error,
..
}) => {
return Ok(ValidationResult::new_with_data_and_errors(
check_tx_result,
vec![error],
));
}
DecodedStateTransition::FailedToDecode(FaultyStateTransition { error, .. }) => {
return Err(error.into());
}
};

let platform_ref = PlatformRef {
drive: &self.drive,
Expand All @@ -149,6 +156,8 @@ where
check_tx_result.priority =
user_fee_increase.saturating_mul(PRIORITY_USER_FEE_INCREASE_MULTIPLIER);

check_tx_result.state_transition_name = Some(state_transition.name().to_string());

check_tx_result.unique_identifiers = state_transition.unique_identifiers();

let validation_result = state_transition_to_execution_event_for_check_tx(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where
/// state transitions, processing state transitions, or executing events.
pub(in crate::execution) fn decode_raw_state_transitions<'a>(
&self,
raw_state_transitions: &'a Vec<impl AsRef<[u8]>>,
raw_state_transitions: &'a [impl AsRef<[u8]>],
platform_version: &PlatformVersion,
) -> Result<StateTransitionContainer<'a>, Error> {
match platform_version
Expand Down
Loading
Loading