diff --git a/core/src/subgraph/error.rs b/core/src/subgraph/error.rs index b3131255aed..c50712c08db 100644 --- a/core/src/subgraph/error.rs +++ b/core/src/subgraph/error.rs @@ -1,28 +1,100 @@ use graph::data::subgraph::schema::SubgraphError; -use graph::prelude::{thiserror, Error, StoreError}; +use graph::env::ENV_VARS; +use graph::prelude::{anyhow, thiserror, Error, StoreError}; +pub trait DeterministicError: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {} + +impl DeterministicError for SubgraphError {} + +impl DeterministicError for StoreError {} + +impl DeterministicError for anyhow::Error {} + +/// An error happened during processing and we need to classify errors into +/// deterministic and non-deterministic errors. This struct holds the result +/// of that classification #[derive(thiserror::Error, Debug)] -pub enum BlockProcessingError { +pub enum ProcessingError { #[error("{0:#}")] - Unknown(#[from] Error), + Unknown(Error), // The error had a deterministic cause but, for a possibly non-deterministic reason, we chose to // halt processing due to the error. #[error("{0}")] - Deterministic(SubgraphError), + Deterministic(Box), #[error("subgraph stopped while processing triggers")] Canceled, } -impl BlockProcessingError { +impl ProcessingError { pub fn is_deterministic(&self) -> bool { - matches!(self, BlockProcessingError::Deterministic(_)) + matches!(self, ProcessingError::Deterministic(_)) + } + + pub fn detail(self, ctx: &str) -> ProcessingError { + match self { + ProcessingError::Unknown(e) => { + let x = e.context(ctx.to_string()); + ProcessingError::Unknown(x) + } + ProcessingError::Deterministic(e) => { + ProcessingError::Deterministic(Box::new(anyhow!("{e}").context(ctx.to_string()))) + } + ProcessingError::Canceled => ProcessingError::Canceled, + } + } +} + +/// Similar to `anyhow::Context`, but for `Result`. We +/// call the method `detail` to avoid ambiguity with anyhow's `context` +/// method +pub trait DetailHelper { + fn detail(self: Self, ctx: &str) -> Result; +} + +impl DetailHelper for Result { + fn detail(self, ctx: &str) -> Result { + self.map_err(|e| e.detail(ctx)) } } -impl From for BlockProcessingError { - fn from(e: StoreError) -> Self { - BlockProcessingError::Unknown(e.into()) +/// Implement this for errors that are always non-deterministic. +pub(crate) trait NonDeterministicErrorHelper { + fn non_deterministic(self: Self) -> Result; +} + +impl NonDeterministicErrorHelper for Result { + fn non_deterministic(self) -> Result { + self.map_err(|e| ProcessingError::Unknown(e)) + } +} + +impl NonDeterministicErrorHelper for Result { + fn non_deterministic(self) -> Result { + self.map_err(|e| ProcessingError::Unknown(Error::from(e))) + } +} + +/// Implement this for errors where it depends on the details whether they +/// are deterministic or not. +pub(crate) trait ClassifyErrorHelper { + fn classify(self: Self) -> Result; +} + +impl ClassifyErrorHelper for Result { + fn classify(self) -> Result { + self.map_err(|e| { + if ENV_VARS.mappings.store_errors_are_nondeterministic { + // Old behavior, just in case the new behavior causes issues + ProcessingError::Unknown(Error::from(e)) + } else { + if e.is_deterministic() { + ProcessingError::Deterministic(Box::new(e)) + } else { + ProcessingError::Unknown(Error::from(e)) + } + } + }) } } diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 922c7a4003c..71c36886d2e 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -1,8 +1,12 @@ use crate::subgraph::context::IndexingContext; -use crate::subgraph::error::BlockProcessingError; +use crate::subgraph::error::{ + ClassifyErrorHelper as _, DetailHelper as _, NonDeterministicErrorHelper as _, ProcessingError, +}; use crate::subgraph::inputs::IndexingInputs; use crate::subgraph::state::IndexingState; use crate::subgraph::stream::new_block_stream; +use anyhow::Context as _; +use async_trait::async_trait; use graph::blockchain::block_stream::{ BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, }; @@ -27,8 +31,14 @@ use graph::data_source::{ use graph::env::EnvVars; use graph::futures03::stream::StreamExt; use graph::futures03::TryStreamExt; -use graph::prelude::*; +use graph::prelude::{ + anyhow, hex, retry, thiserror, BlockNumber, BlockPtr, BlockState, CancelGuard, CancelHandle, + CancelToken as _, CancelableError, CheapClone as _, EntityCache, EntityModification, Error, + InstanceDSTemplateInfo, LogCode, RunnerMetrics, RuntimeHostBuilder, StopwatchMetrics, + StoreError, StreamExtension, UnfailOutcome, Value, ENV_VARS, +}; use graph::schema::EntityKey; +use graph::slog::{debug, error, info, o, trace, warn, Logger}; use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -353,7 +363,7 @@ where block_stream_cancel_handle: &CancelHandle, block: BlockWithTriggers, firehose_cursor: FirehoseCursor, - ) -> Result { + ) -> Result { let triggers = block.trigger_data; let block = Arc::new(block.block); let block_ptr = block.ptr(); @@ -441,7 +451,7 @@ where Ok(state) => block_state = state, // Some form of unknown or non-deterministic error ocurred. - Err(MappingError::Unknown(e)) => return Err(BlockProcessingError::Unknown(e)), + Err(MappingError::Unknown(e)) => return Err(ProcessingError::Unknown(e)), Err(MappingError::PossibleReorg(e)) => { info!(logger, "Possible reorg detected, retrying"; @@ -516,7 +526,8 @@ where let chain = chain.cheap_clone(); async move { chain.refetch_firehose_block(&log, cur).await } }) - .await?, + .await + .non_deterministic()?, ) } else { block.cheap_clone() @@ -527,7 +538,8 @@ where .inputs .triggers_adapter .triggers_in_block(&logger, block.as_ref().clone(), filter) - .await?; + .await + .non_deterministic()?; let triggers = block_with_triggers.trigger_data; @@ -606,7 +618,7 @@ where // clean context as in b21fa73b-6453-4340-99fb-1a78ec62efb1. match e { MappingError::PossibleReorg(e) | MappingError::Unknown(e) => { - BlockProcessingError::Unknown(e) + ProcessingError::Unknown(e) } } })?; @@ -623,7 +635,7 @@ where // Avoid writing to store if block stream has been canceled if block_stream_cancel_handle.is_canceled() { - return Err(BlockProcessingError::Canceled); + return Err(ProcessingError::Canceled); } if let Some(proof_of_indexing) = proof_of_indexing.into_inner() { @@ -633,7 +645,8 @@ where &self.metrics.host.stopwatch, &mut block_state.entity_cache, ) - .await?; + .await + .non_deterministic()?; } let section = self @@ -648,7 +661,7 @@ where } = block_state .entity_cache .as_modifications(block.number()) - .map_err(|e| BlockProcessingError::Unknown(e.into()))?; + .map_err(|e| ProcessingError::Unknown(e.into()))?; section.end(); trace!(self.logger, "Entity cache statistics"; @@ -663,10 +676,15 @@ where // Check for offchain events and process them, including their entity modifications in the // set to be transacted. - let offchain_events = self.ctx.offchain_monitor.ready_offchain_events()?; + let offchain_events = self + .ctx + .offchain_monitor + .ready_offchain_events() + .non_deterministic()?; let (offchain_mods, processed_offchain_data_sources, persisted_off_chain_data_sources) = self.handle_offchain_triggers(offchain_events, &block) - .await?; + .await + .non_deterministic()?; mods.extend(offchain_mods); // Put the cache back in the state, asserting that the placeholder cache was not used. @@ -712,7 +730,7 @@ where let first_error = deterministic_errors.first().cloned(); - let is_caught_up = self.is_caught_up(&block_ptr).await?; + let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?; persisted_data_sources.extend(persisted_off_chain_data_sources); self.inputs @@ -730,7 +748,8 @@ where is_caught_up, ) .await - .context("Failed to transact block operations")?; + .classify() + .detail("Failed to transact block operations")?; // For subgraphs with `nonFatalErrors` feature disabled, we consider // any error as fatal. @@ -741,7 +760,9 @@ where // all of the others are discarded. if has_errors && !is_non_fatal_errors_active { // Only the first error is reported. - return Err(BlockProcessingError::Deterministic(first_error.unwrap())); + return Err(ProcessingError::Deterministic(Box::new( + first_error.unwrap(), + ))); } let elapsed = start.elapsed().as_secs_f64(); @@ -750,11 +771,9 @@ where .block_ops_transaction_duration .observe(elapsed); - block_state_metrics.flush_metrics_to_store( - &logger, - block_ptr, - self.inputs.deployment.id, - )?; + block_state_metrics + .flush_metrics_to_store(&logger, block_ptr, self.inputs.deployment.id) + .non_deterministic()?; // To prevent a buggy pending version from replacing a current version, if errors are // present the subgraph will be unassigned. @@ -762,11 +781,11 @@ where if has_errors && !ENV_VARS.disable_fail_fast && !store.is_deployment_synced() { store .unassign_subgraph() - .map_err(|e| BlockProcessingError::Unknown(e.into()))?; + .map_err(|e| ProcessingError::Unknown(e.into()))?; // Use `Canceled` to avoiding setting the subgraph health to failed, an error was // just transacted so it will be already be set to unhealthy. - return Err(BlockProcessingError::Canceled); + return Err(ProcessingError::Canceled); } match needs_restart { @@ -809,7 +828,7 @@ where fn create_dynamic_data_sources( &mut self, created_data_sources: Vec, - ) -> Result<(Vec>, Vec>), Error> { + ) -> Result<(Vec>, Vec>), ProcessingError> { let mut data_sources = vec![]; let mut runtime_hosts = vec![]; @@ -817,15 +836,15 @@ where let manifest_idx = info .template .manifest_idx() - .ok_or_else(|| anyhow!("Expected template to have an idx"))?; + .ok_or_else(|| anyhow!("Expected template to have an idx")) + .non_deterministic()?; let created_ds_template = self .inputs .templates .iter() .find(|t| t.manifest_idx() == manifest_idx) - .ok_or_else(|| { - anyhow!("Expected to find a template for this dynamic data source") - })?; + .ok_or_else(|| anyhow!("Expected to find a template for this dynamic data source")) + .non_deterministic()?; // Try to instantiate a data source from the template let data_source = { @@ -847,14 +866,15 @@ where warn!(self.logger, "{}", e.to_string()); continue; } - Err(DataSourceCreationError::Unknown(e)) => return Err(e), + Err(DataSourceCreationError::Unknown(e)) => return Err(e).non_deterministic(), } }; // Try to create a runtime host for the data source let host = self .ctx - .add_dynamic_data_source(&self.logger, data_source.clone())?; + .add_dynamic_data_source(&self.logger, data_source.clone()) + .non_deterministic()?; match host { Some(host) => { @@ -882,7 +902,7 @@ where &mut self, start: Instant, block_ptr: BlockPtr, - action: Result, + action: Result, ) -> Result { self.state.skip_ptr_updates_timer = Instant::now(); @@ -934,7 +954,7 @@ where return Ok(action); } - Err(BlockProcessingError::Canceled) => { + Err(ProcessingError::Canceled) => { debug!(self.logger, "Subgraph block stream shut down cleanly"); return Ok(Action::Stop); } @@ -1263,7 +1283,7 @@ trait StreamEventHandler { handler: String, cursor: FirehoseCursor, cancel_handle: &CancelHandle, - ) -> Result; + ) -> Result; async fn handle_process_block( &mut self, block: BlockWithTriggers, @@ -1297,7 +1317,7 @@ where handler: String, cursor: FirehoseCursor, cancel_handle: &CancelHandle, - ) -> Result { + ) -> Result { let logger = self.logger.new(o!( "block_number" => format!("{:?}", block_ptr.number), "block_hash" => format!("{}", block_ptr.hash) @@ -1332,9 +1352,7 @@ where Ok(block_state) => block_state, // Some form of unknown or non-deterministic error ocurred. - Err(MappingError::Unknown(e)) => { - return Err(BlockProcessingError::Unknown(e).into()) - } + Err(MappingError::Unknown(e)) => return Err(ProcessingError::Unknown(e).into()), Err(MappingError::PossibleReorg(e)) => { info!(logger, "Possible reorg detected, retrying"; @@ -1363,7 +1381,7 @@ where // Avoid writing to store if block stream has been canceled if cancel_handle.is_canceled() { - return Err(BlockProcessingError::Canceled.into()); + return Err(ProcessingError::Canceled.into()); } if let Some(proof_of_indexing) = proof_of_indexing.into_inner() { @@ -1373,7 +1391,8 @@ where &self.metrics.host.stopwatch, &mut block_state.entity_cache, ) - .await?; + .await + .non_deterministic()?; } let section = self @@ -1388,7 +1407,7 @@ where } = block_state .entity_cache .as_modifications(block_ptr.number) - .map_err(|e| BlockProcessingError::Unknown(e.into()))?; + .map_err(|e| ProcessingError::Unknown(e.into()))?; section.end(); trace!(self.logger, "Entity cache statistics"; @@ -1443,7 +1462,7 @@ where let first_error = deterministic_errors.first().cloned(); // We consider a subgraph caught up when it's at most 1 blocks behind the chain head. - let is_caught_up = self.is_caught_up(&block_ptr).await?; + let is_caught_up = self.is_caught_up(&block_ptr).await.non_deterministic()?; self.inputs .store @@ -1460,7 +1479,8 @@ where is_caught_up, ) .await - .context("Failed to transact block operations")?; + .classify() + .detail("Failed to transact block operations")?; // For subgraphs with `nonFatalErrors` feature disabled, we consider // any error as fatal. @@ -1471,7 +1491,9 @@ where // all of the others are discarded. if has_errors && !is_non_fatal_errors_active { // Only the first error is reported. - return Err(BlockProcessingError::Deterministic(first_error.unwrap()).into()); + return Err(ProcessingError::Deterministic(Box::new( + first_error.unwrap(), + ))); } let elapsed = start.elapsed().as_secs_f64(); @@ -1486,11 +1508,11 @@ where if has_errors && !ENV_VARS.disable_fail_fast && !store.is_deployment_synced() { store .unassign_subgraph() - .map_err(|e| BlockProcessingError::Unknown(e.into()))?; + .map_err(|e| ProcessingError::Unknown(e.into()))?; // Use `Canceled` to avoiding setting the subgraph health to failed, an error was // just transacted so it will be already be set to unhealthy. - return Err(BlockProcessingError::Canceled.into()); + return Err(ProcessingError::Canceled.into()); }; Ok(Action::Continue) diff --git a/graph/src/components/server/query.rs b/graph/src/components/server/query.rs index 6bf83ffbf76..4a9fe1557c2 100644 --- a/graph/src/components/server/query.rs +++ b/graph/src/components/server/query.rs @@ -28,7 +28,7 @@ impl From for ServerError { impl From for ServerError { fn from(e: StoreError) -> Self { match e { - StoreError::ConstraintViolation(s) => ServerError::InternalError(s), + StoreError::InternalError(s) => ServerError::InternalError(s), _ => ServerError::ClientError(e.to_string()), } } diff --git a/graph/src/components/store/err.rs b/graph/src/components/store/err.rs index 76be7c311ce..264c1b80df2 100644 --- a/graph/src/components/store/err.rs +++ b/graph/src/components/store/err.rs @@ -1,6 +1,6 @@ use super::{BlockNumber, DeploymentSchemaVersion}; +use crate::prelude::DeploymentHash; use crate::prelude::QueryExecutionError; -use crate::{data::store::EntityValidationError, prelude::DeploymentHash}; use anyhow::{anyhow, Error}; use diesel::result::Error as DieselError; @@ -11,8 +11,6 @@ use tokio::task::JoinError; pub enum StoreError { #[error("store error: {0:#}")] Unknown(Error), - #[error("Entity validation failed: {0}")] - EntityValidationError(EntityValidationError), #[error( "tried to set entity of type `{0}` with ID \"{1}\" but an entity of type `{2}`, \ which has an interface in common with `{0}`, exists with the same ID" @@ -24,8 +22,6 @@ pub enum StoreError { UnknownTable(String), #[error("entity type '{0}' does not have an attribute '{0}'")] UnknownAttribute(String, String), - #[error("malformed directive '{0}'")] - MalformedDirective(String), #[error("query execution failed: {0}")] QueryExecutionError(String), #[error("Child filter nesting not supported by value `{0}`: `{1}`")] @@ -40,8 +36,8 @@ pub enum StoreError { /// An internal error where we expected the application logic to enforce /// some constraint, e.g., that subgraph names are unique, but found that /// constraint to not hold - #[error("internal constraint violated: {0}")] - ConstraintViolation(String), + #[error("internal error: {0}")] + InternalError(String), #[error("deployment not found: {0}")] DeploymentNotFound(String), #[error("shard not found: {0} (this usually indicates a misconfiguration)")] @@ -54,8 +50,6 @@ pub enum StoreError { Canceled, #[error("database unavailable")] DatabaseUnavailable, - #[error("database disabled")] - DatabaseDisabled, #[error("subgraph forking failed: {0}")] ForkFailure(String), #[error("subgraph writer poisoned by previous error")] @@ -76,16 +70,18 @@ pub enum StoreError { WriteFailure(String, BlockNumber, String, String), #[error("database query timed out")] StatementTimeout, + #[error("database constraint violated: {0}")] + ConstraintViolation(String), } -// Convenience to report a constraint violation +// Convenience to report an internal error #[macro_export] -macro_rules! constraint_violation { +macro_rules! internal_error { ($msg:expr) => {{ - $crate::prelude::StoreError::ConstraintViolation(format!("{}", $msg)) + $crate::prelude::StoreError::InternalError(format!("{}", $msg)) }}; ($fmt:expr, $($arg:tt)*) => {{ - $crate::prelude::StoreError::ConstraintViolation(format!($fmt, $($arg)*)) + $crate::prelude::StoreError::InternalError(format!($fmt, $($arg)*)) }} } @@ -96,7 +92,6 @@ impl Clone for StoreError { fn clone(&self) -> Self { match self { Self::Unknown(arg0) => Self::Unknown(anyhow!("{}", arg0)), - Self::EntityValidationError(arg0) => Self::EntityValidationError(arg0.clone()), Self::ConflictingId(arg0, arg1, arg2) => { Self::ConflictingId(arg0.clone(), arg1.clone(), arg2.clone()) } @@ -105,7 +100,6 @@ impl Clone for StoreError { Self::UnknownAttribute(arg0, arg1) => { Self::UnknownAttribute(arg0.clone(), arg1.clone()) } - Self::MalformedDirective(arg0) => Self::MalformedDirective(arg0.clone()), Self::QueryExecutionError(arg0) => Self::QueryExecutionError(arg0.clone()), Self::ChildFilterNestingNotSupportedError(arg0, arg1) => { Self::ChildFilterNestingNotSupportedError(arg0.clone(), arg1.clone()) @@ -114,14 +108,13 @@ impl Clone for StoreError { Self::DuplicateBlockProcessing(arg0, arg1) => { Self::DuplicateBlockProcessing(arg0.clone(), arg1.clone()) } - Self::ConstraintViolation(arg0) => Self::ConstraintViolation(arg0.clone()), + Self::InternalError(arg0) => Self::InternalError(arg0.clone()), Self::DeploymentNotFound(arg0) => Self::DeploymentNotFound(arg0.clone()), Self::UnknownShard(arg0) => Self::UnknownShard(arg0.clone()), Self::FulltextSearchNonDeterministic => Self::FulltextSearchNonDeterministic, Self::FulltextColumnMissingConfig => Self::FulltextColumnMissingConfig, Self::Canceled => Self::Canceled, Self::DatabaseUnavailable => Self::DatabaseUnavailable, - Self::DatabaseDisabled => Self::DatabaseDisabled, Self::ForkFailure(arg0) => Self::ForkFailure(arg0.clone()), Self::Poisoned => Self::Poisoned, Self::WriterPanic(arg0) => Self::Unknown(anyhow!("writer panic: {}", arg0)), @@ -136,6 +129,7 @@ impl Clone for StoreError { Self::WriteFailure(arg0.clone(), arg1.clone(), arg2.clone(), arg3.clone()) } Self::StatementTimeout => Self::StatementTimeout, + Self::ConstraintViolation(arg0) => Self::ConstraintViolation(arg0.clone()), } } } @@ -144,6 +138,7 @@ impl StoreError { pub fn from_diesel_error(e: &DieselError) -> Option { const CONN_CLOSE: &str = "server closed the connection unexpectedly"; const STMT_TIMEOUT: &str = "canceling statement due to statement timeout"; + const UNIQUE_CONSTR: &str = "duplicate key value violates unique constraint"; let DieselError::DatabaseError(_, info) = e else { return None; }; @@ -155,6 +150,12 @@ impl StoreError { Some(StoreError::DatabaseUnavailable) } else if info.message().contains(STMT_TIMEOUT) { Some(StoreError::StatementTimeout) + } else if info.message().contains(UNIQUE_CONSTR) { + let msg = match info.details() { + Some(details) => format!("{}: {}", info.message(), details.replace('\n', " ")), + None => info.message().to_string(), + }; + Some(StoreError::ConstraintViolation(msg)) } else { None } @@ -170,6 +171,43 @@ impl StoreError { StoreError::WriteFailure(entity.to_string(), block, error.to_string(), query) }) } + + pub fn is_deterministic(&self) -> bool { + use StoreError::*; + + // This classification tries to err on the side of caution. If in doubt, + // assume the error is non-deterministic. + match self { + // deterministic errors + ConflictingId(_, _, _) + | UnknownField(_, _) + | UnknownTable(_) + | UnknownAttribute(_, _) + | InvalidIdentifier(_) + | UnsupportedFilter(_, _) + | ConstraintViolation(_) => true, + + // non-deterministic errors + Unknown(_) + | QueryExecutionError(_) + | ChildFilterNestingNotSupportedError(_, _) + | DuplicateBlockProcessing(_, _) + | InternalError(_) + | DeploymentNotFound(_) + | UnknownShard(_) + | FulltextSearchNonDeterministic + | FulltextColumnMissingConfig + | Canceled + | DatabaseUnavailable + | ForkFailure(_) + | Poisoned + | WriterPanic(_) + | UnsupportedDeploymentSchemaVersion(_) + | PruneFailure(_) + | WriteFailure(_, _, _, _) + | StatementTimeout => false, + } + } } impl From for StoreError { diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index b64f8b35964..efe16c90ee6 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -26,13 +26,13 @@ use std::time::Duration; use crate::blockchain::{Block, BlockHash, BlockPtr}; use crate::cheap_clone::CheapClone; use crate::components::store::write::EntityModification; -use crate::constraint_violation; use crate::data::store::scalar::Bytes; use crate::data::store::{Id, IdList, Value}; use crate::data::value::Word; use crate::data_source::CausalityRegion; use crate::derive::CheapClone; use crate::env::ENV_VARS; +use crate::internal_error; use crate::prelude::{s, Attribute, DeploymentHash, ValueType}; use crate::schema::{ast as sast, EntityKey, EntityType, InputSchema}; use crate::util::stats::MovingStats; @@ -1000,17 +1000,17 @@ impl PruneRequest { let rebuild_threshold = ENV_VARS.store.rebuild_threshold; let delete_threshold = ENV_VARS.store.delete_threshold; if rebuild_threshold < 0.0 || rebuild_threshold > 1.0 { - return Err(constraint_violation!( + return Err(internal_error!( "the copy threshold must be between 0 and 1 but is {rebuild_threshold}" )); } if delete_threshold < 0.0 || delete_threshold > 1.0 { - return Err(constraint_violation!( + return Err(internal_error!( "the delete threshold must be between 0 and 1 but is {delete_threshold}" )); } if history_blocks <= reorg_threshold { - return Err(constraint_violation!( + return Err(internal_error!( "the deployment {} needs to keep at least {} blocks \ of history and can't be pruned to only {} blocks of history", deployment, @@ -1019,7 +1019,7 @@ impl PruneRequest { )); } if first_block >= latest_block { - return Err(constraint_violation!( + return Err(internal_error!( "the earliest block {} must be before the latest block {}", first_block, latest_block diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 6f899633bd8..2c470fd32be 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -5,10 +5,10 @@ use crate::{ blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime}, cheap_clone::CheapClone, components::subgraph::Entity, - constraint_violation, data::{store::Id, subgraph::schema::SubgraphError}, data_source::CausalityRegion, derive::CacheWeight, + internal_error, util::cache_weight::CacheWeight, }; @@ -182,7 +182,7 @@ impl EntityModification { match self { Insert { end, .. } | Overwrite { end, .. } => { if end.is_some() { - return Err(constraint_violation!( + return Err(internal_error!( "can not clamp {:?} to block {}", self, block @@ -191,7 +191,7 @@ impl EntityModification { *end = Some(block); } Remove { .. } => { - return Err(constraint_violation!( + return Err(internal_error!( "can not clamp block range for removal of {:?} to {}", self, block @@ -219,7 +219,7 @@ impl EntityModification { end, }), Remove { key, .. } => { - return Err(constraint_violation!( + return Err(internal_error!( "a remove for {}[{}] can not be converted into an insert", entity_type, key.entity_id @@ -330,7 +330,7 @@ impl RowGroup { if !is_forward { // unwrap: we only get here when `last()` is `Some` let last_block = self.rows.last().map(|emod| emod.block()).unwrap(); - return Err(constraint_violation!( + return Err(internal_error!( "we already have a modification for block {}, can not append {:?}", last_block, emod @@ -412,7 +412,7 @@ impl RowGroup { self.rows.push(row); } EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => { - return Err(constraint_violation!( + return Err(internal_error!( "immutable entity type {} only allows inserts, not {:?}", self.entity_type, row @@ -426,7 +426,7 @@ impl RowGroup { use EntityModification::*; if row.block() <= prev_row.block() { - return Err(constraint_violation!( + return Err(internal_error!( "can not append operations that go backwards from {:?} to {:?}", prev_row, row @@ -444,7 +444,7 @@ impl RowGroup { Insert { end: Some(_), .. } | Overwrite { end: Some(_), .. }, Overwrite { .. } | Remove { .. }, ) => { - return Err(constraint_violation!( + return Err(internal_error!( "impossible combination of entity operations: {:?} and then {:?}", prev_row, row @@ -481,7 +481,7 @@ impl RowGroup { fn append(&mut self, group: RowGroup) -> Result<(), StoreError> { if self.entity_type != group.entity_type { - return Err(constraint_violation!( + return Err(internal_error!( "Can not append a row group for {} to a row group for {}", group.entity_type, self.entity_type @@ -710,7 +710,7 @@ impl Batch { fn append_inner(&mut self, mut batch: Batch) -> Result<(), StoreError> { if batch.block_ptr.number <= self.block_ptr.number { - return Err(constraint_violation!("Batches must go forward. Can't append a batch with block pointer {} to one with block pointer {}", batch.block_ptr, self.block_ptr)); + return Err(internal_error!("Batches must go forward. Can't append a batch with block pointer {} to one with block pointer {}", batch.block_ptr, self.block_ptr)); } self.block_ptr = batch.block_ptr; diff --git a/graph/src/data/query/error.rs b/graph/src/data/query/error.rs index 65fc1bcd259..d02b1c9c4bd 100644 --- a/graph/src/data/query/error.rs +++ b/graph/src/data/query/error.rs @@ -74,7 +74,7 @@ pub enum QueryExecutionError { DeploymentNotFound(String), IdMissing, IdNotString, - ConstraintViolation(String), + InternalError(String), } impl QueryExecutionError { @@ -132,7 +132,7 @@ impl QueryExecutionError { | DeploymentNotFound(_) | IdMissing | IdNotString - | ConstraintViolation(_) => false, + | InternalError(_) => false, } } } @@ -274,7 +274,7 @@ impl fmt::Display for QueryExecutionError { DeploymentNotFound(id_or_name) => write!(f, "deployment `{}` does not exist", id_or_name), IdMissing => write!(f, "entity is missing an `id` attribute"), IdNotString => write!(f, "entity `id` attribute is not a string"), - ConstraintViolation(msg) => write!(f, "internal constraint violated: {}", msg), + InternalError(msg) => write!(f, "internal error: {}", msg), } } } @@ -306,7 +306,7 @@ impl From for QueryExecutionError { StoreError::ChildFilterNestingNotSupportedError(attr, filter) => { QueryExecutionError::ChildFilterNestingNotSupportedError(attr, filter) } - StoreError::ConstraintViolation(msg) => QueryExecutionError::ConstraintViolation(msg), + StoreError::InternalError(msg) => QueryExecutionError::InternalError(msg), _ => QueryExecutionError::StoreError(CloneableAnyhowError(Arc::new(e.into()))), } } diff --git a/graph/src/data/store/id.rs b/graph/src/data/store/id.rs index 64be7545621..9726141e2d6 100644 --- a/graph/src/data/store/id.rs +++ b/graph/src/data/store/id.rs @@ -20,9 +20,9 @@ use crate::{ use crate::{ components::store::StoreError, - constraint_violation, data::value::Word, derive::CacheWeight, + internal_error, prelude::QueryExecutionError, runtime::gas::{Gas, GasSizeOf}, }; @@ -367,7 +367,7 @@ impl IdList { ids.push(id); Ok(ids) } - _ => Err(constraint_violation!( + _ => Err(internal_error!( "expected string id, got {}: {}", id.id_type(), id, @@ -381,7 +381,7 @@ impl IdList { ids.push(id); Ok(ids) } - _ => Err(constraint_violation!( + _ => Err(internal_error!( "expected bytes id, got {}: {}", id.id_type(), id, @@ -395,7 +395,7 @@ impl IdList { ids.push(id); Ok(ids) } - _ => Err(constraint_violation!( + _ => Err(internal_error!( "expected int8 id, got {}: {}", id.id_type(), id, @@ -423,7 +423,7 @@ impl IdList { ids.push(Word::from(id)); Ok(ids) } - _ => Err(constraint_violation!( + _ => Err(internal_error!( "expected string id, got {}: 0x{}", id.id_type(), id, @@ -438,7 +438,7 @@ impl IdList { ids.push(scalar::Bytes::from(id)); Ok(ids) } - _ => Err(constraint_violation!( + _ => Err(internal_error!( "expected bytes id, got {}: {}", id.id_type(), id, @@ -452,7 +452,7 @@ impl IdList { ids.push(id); Ok(ids) } - _ => Err(constraint_violation!( + _ => Err(internal_error!( "expected int8 id, got {}: {}", id.id_type(), id, @@ -533,7 +533,7 @@ impl IdList { ids.push(id); Ok(()) } - (list, id) => Err(constraint_violation!( + (list, id) => Err(internal_error!( "expected id of type {}, but got {}[{}]", list.id_type(), id.id_type(), diff --git a/graph/src/env/mappings.rs b/graph/src/env/mappings.rs index 41499056b5b..c1bbb8565e5 100644 --- a/graph/src/env/mappings.rs +++ b/graph/src/env/mappings.rs @@ -62,6 +62,13 @@ pub struct EnvVarsMapping { /// eth calls before running triggers; instead eth calls happen when /// mappings call `ethereum.call`. Off by default. pub disable_declared_calls: bool, + + /// Set by the flag `GRAPH_STORE_ERRORS_ARE_NON_DETERMINISTIC`. Off by + /// default. Setting this to `true` will revert to the old behavior of + /// treating all store errors as nondeterministic. This is a temporary + /// measure and can be removed after 2025-07-01, once we are sure the + /// new behavior works as intended. + pub store_errors_are_nondeterministic: bool, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -89,6 +96,7 @@ impl From for EnvVarsMapping { ipfs_request_limit: x.ipfs_request_limit, allow_non_deterministic_ipfs: x.allow_non_deterministic_ipfs.0, disable_declared_calls: x.disable_declared_calls.0, + store_errors_are_nondeterministic: x.store_errors_are_nondeterministic.0, } } } @@ -123,4 +131,6 @@ pub struct InnerMappingHandlers { allow_non_deterministic_ipfs: EnvVarBoolean, #[envconfig(from = "GRAPH_DISABLE_DECLARED_CALLS", default = "false")] disable_declared_calls: EnvVarBoolean, + #[envconfig(from = "GRAPH_STORE_ERRORS_ARE_NON_DETERMINISTIC", default = "false")] + store_errors_are_nondeterministic: EnvVarBoolean, } diff --git a/graph/src/util/ogive.rs b/graph/src/util/ogive.rs index 476bfd76ce8..38300e088e6 100644 --- a/graph/src/util/ogive.rs +++ b/graph/src/util/ogive.rs @@ -1,6 +1,6 @@ use std::ops::RangeInclusive; -use crate::{constraint_violation, prelude::StoreError}; +use crate::{internal_error, prelude::StoreError}; /// A helper to deal with cumulative histograms, also known as ogives. This /// implementation is restricted to histograms where each bin has the same @@ -37,9 +37,7 @@ impl Ogive { /// and deduplicated, i.e., they don't have to be in ascending order. pub fn from_equi_histogram(mut points: Vec, total: usize) -> Result { if points.is_empty() { - return Err(constraint_violation!( - "histogram must have at least one point" - )); + return Err(internal_error!("histogram must have at least one point")); } points.sort_unstable(); @@ -124,7 +122,7 @@ impl Ogive { fn inverse(&self, value: i64) -> Result { let value = value as f64; if value < 0.0 { - return Err(constraint_violation!("value {} can not be negative", value)); + return Err(internal_error!("value {} can not be negative", value)); } let idx = (value / self.bin_size) as usize; if idx >= self.points.len() - 1 { @@ -138,7 +136,7 @@ impl Ogive { fn check_in_range(&self, point: i64) -> Result<(), StoreError> { if !self.range.contains(&point) { - return Err(constraint_violation!( + return Err(internal_error!( "point {} is outside of the range [{}, {}]", point, self.range.start(), diff --git a/graphql/src/store/prefetch.rs b/graphql/src/store/prefetch.rs index 33f0b67452b..95f51d51944 100644 --- a/graphql/src/store/prefetch.rs +++ b/graphql/src/store/prefetch.rs @@ -632,7 +632,7 @@ impl<'a> Loader<'a> { let object_type = input_schema .object_or_aggregation(&object_type.name, parent_interval) .ok_or_else(|| { - vec![QueryExecutionError::ConstraintViolation(format!( + vec![QueryExecutionError::InternalError(format!( "the type `{}`(interval {}) is not an object type", object_type.name, parent_interval diff --git a/graphql/src/store/resolver.rs b/graphql/src/store/resolver.rs index d7032740768..8f5eaaccbd1 100644 --- a/graphql/src/store/resolver.rs +++ b/graphql/src/store/resolver.rs @@ -327,7 +327,7 @@ impl Resolver for StoreResolver { None => { let child0_id = child_id(&children[0]); let child1_id = child_id(&children[1]); - QueryExecutionError::ConstraintViolation(format!( + QueryExecutionError::InternalError(format!( "expected only one child for {}.{} but got {}. One child has id {}, another has id {}", object_type.name(), field.name, children.len(), child0_id, child1_id diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 762a2642524..84a19b601e5 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -17,7 +17,7 @@ use graph::{ prelude::{error, info, BlockNumber, BlockPtr, Logger, ENV_VARS}, slog::o, }; -use graph::{constraint_violation, prelude::CheapClone}; +use graph::{internal_error, prelude::CheapClone}; use graph::{prelude::StoreError, util::timed_cache::TimedCache}; use crate::{ @@ -55,7 +55,7 @@ pub mod primary { }; use graph::{ blockchain::{BlockHash, ChainIdentifier}, - constraint_violation, + internal_error, prelude::StoreError, }; @@ -92,7 +92,7 @@ pub mod primary { net_version: self.net_version.clone(), genesis_block_hash: BlockHash::try_from(self.genesis_block.as_str()).map_err( |e| { - constraint_violation!( + internal_error!( "the genesis block hash `{}` for chain `{}` is not a valid hash: {}", self.genesis_block, self.name, @@ -366,7 +366,7 @@ impl BlockStore { let pool = self .pools .get(&chain.shard) - .ok_or_else(|| constraint_violation!("there is no pool for shard {}", chain.shard))? + .ok_or_else(|| internal_error!("there is no pool for shard {}", chain.shard))? .clone(); let sender = ChainHeadUpdateSender::new( self.mirror.primary().clone(), @@ -427,7 +427,7 @@ impl BlockStore { pub fn chain_head_block(&self, chain: &str) -> Result, StoreError> { let store = self .store(chain) - .ok_or_else(|| constraint_violation!("unknown network `{}`", chain))?; + .ok_or_else(|| internal_error!("unknown network `{}`", chain))?; store.chain_head_block(chain) } @@ -466,7 +466,7 @@ impl BlockStore { pub fn drop_chain(&self, chain: &str) -> Result<(), StoreError> { let chain_store = self .store(chain) - .ok_or_else(|| constraint_violation!("unknown chain {}", chain))?; + .ok_or_else(|| internal_error!("unknown chain {}", chain))?; // Delete from the primary first since that's where // deployment_schemas has a fk constraint on chains diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 097aa799eff..0ec347d2bd5 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -30,7 +30,7 @@ use graph::prelude::{ BlockPtr, CachedEthereumCall, CancelableError, ChainStore as ChainStoreTrait, Error, EthereumCallCache, StoreError, }; -use graph::{constraint_violation, ensure}; +use graph::{ensure, internal_error}; use self::recent_blocks_cache::RecentBlocksCache; use crate::{ @@ -98,8 +98,8 @@ mod data { update, }; use graph::blockchain::{Block, BlockHash}; - use graph::constraint_violation; use graph::data::store::scalar::Bytes; + use graph::internal_error; use graph::prelude::ethabi::ethereum_types::H160; use graph::prelude::transaction_receipt::LightTransactionReceipt; use graph::prelude::web3::types::H256; @@ -176,7 +176,7 @@ mod data { if bytes.len() == H256::len_bytes() { Ok(H256::from_slice(bytes)) } else { - Err(constraint_violation!( + Err(internal_error!( "invalid H256 value `{}` has {} bytes instead of {}", graph::prelude::hex::encode(bytes), bytes.len(), @@ -1840,7 +1840,7 @@ impl ChainStore { number.map(|number| number.try_into()).transpose().map_err( |e: std::num::TryFromIntError| { - constraint_violation!( + internal_error!( "head block number for {} is {:?} which does not fit into a u32: {}", chain, number, @@ -2792,7 +2792,7 @@ impl EthereumCallCache for ChainStore { let mut resps = Vec::new(); for (id, retval, _) in rows { let idx = ids.iter().position(|i| i.as_ref() == id).ok_or_else(|| { - constraint_violation!( + internal_error!( "get_calls returned a call id that was not requested: {}", hex::encode(id) ) diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 6ff46649494..abe9109e1d6 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -9,10 +9,10 @@ use diesel::{sql_query, RunQueryDsl}; use diesel_migrations::{EmbeddedMigrations, HarnessWithOutput}; use graph::cheap_clone::CheapClone; use graph::components::store::QueryPermit; -use graph::constraint_violation; use graph::derive::CheapClone; use graph::futures03::future::join_all; use graph::futures03::FutureExt as _; +use graph::internal_error; use graph::prelude::tokio::time::Instant; use graph::prelude::{tokio, MetricsRegistry}; use graph::slog::warn; @@ -1076,7 +1076,7 @@ impl PoolInner { const MSG: &str = "internal error: trying to get fdw connection on a pool that doesn't have any"; error!(logger, "{}", MSG); - return Err(constraint_violation!(MSG)); + return Err(internal_error!(MSG)); } }; Ok(pool) @@ -1501,13 +1501,13 @@ impl PoolCoordinator { self.servers .iter() .find(|server| &server.shard == shard) - .ok_or_else(|| constraint_violation!("unknown shard {shard}")) + .ok_or_else(|| internal_error!("unknown shard {shard}")) } fn primary(&self) -> Result, StoreError> { let map = self.pools.lock().unwrap(); let pool_state = map.get(&*&PRIMARY_SHARD).ok_or_else(|| { - constraint_violation!("internal error: primary shard not found in pool coordinator") + internal_error!("internal error: primary shard not found in pool coordinator") })?; Ok(pool_state.get_unready()) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 2e8807b2aa8..75cc80fb3f6 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -31,8 +31,8 @@ use diesel::{ QueryDsl, RunQueryDsl, }; use graph::{ - constraint_violation, futures03::{future::select_all, FutureExt as _}, + internal_error, prelude::{ info, lazy_static, o, warn, BlockNumber, BlockPtr, CheapClone, Logger, StoreError, ENV_VARS, }, @@ -140,7 +140,7 @@ impl CopyState { Some((src_id, hash, number)) => { let stored_target_block = BlockPtr::from((hash, number)); if stored_target_block != target_block { - return Err(constraint_violation!( + return Err(internal_error!( "CopyState {} for copying {} to {} has incompatible block pointer {} instead of {}", dst.site.id, src.site.deployment, @@ -149,7 +149,7 @@ impl CopyState { target_block)); } if src_id != src.site.id { - return Err(constraint_violation!( + return Err(internal_error!( "CopyState {} for copying {} to {} has incompatible source {} instead of {}", dst.site.id, src.site.deployment, @@ -275,7 +275,7 @@ impl CopyState { // drop_foreign_schema does), see that we do not have // metadata for `src` if crate::deployment::exists(conn, &self.src.site)? { - return Err(constraint_violation!( + return Err(internal_error!( "we think we are copying {}[{}] across shards from {} to {}, but the \ source subgraph is actually in this shard", self.src.site.deployment, @@ -368,7 +368,7 @@ impl TableState { layout .table_for_entity(entity_type) .map_err(|e| { - constraint_violation!( + internal_error!( "invalid {} table {} in CopyState {} (table {}): {}", kind, entity_type, @@ -750,7 +750,7 @@ impl CopyTableWorker { self }) .await - .map_err(|e| constraint_violation!("copy worker for {} panicked: {}", object, e)) + .map_err(|e| internal_error!("copy worker for {} panicked: {}", object, e)) .into() } @@ -944,7 +944,7 @@ impl Connection { let logger = logger.new(o!("dst" => dst.site.namespace.to_string())); if src.site.schema_version != dst.site.schema_version { - return Err(StoreError::ConstraintViolation(format!( + return Err(StoreError::InternalError(format!( "attempted to copy between different schema versions, \ source version is {} but destination version is {}", src.site.schema_version, dst.site.schema_version @@ -981,7 +981,7 @@ impl Connection { F: FnOnce(&mut PgConnection) -> Result, { let Some(conn) = self.conn.as_mut() else { - return Err(constraint_violation!( + return Err(internal_error!( "copy connection has been handed to background task but not returned yet (transaction)" )); }; @@ -1066,13 +1066,11 @@ impl Connection { // Something bad happened. We should have at least one // worker if there are still tables to copy if self.conn.is_none() { - return Err(constraint_violation!( + return Err(internal_error!( "copy connection has been handed to background task but not returned yet (copy_data_internal)" )); } else { - return Err(constraint_violation!( - "no workers left but still tables to copy" - )); + return Err(internal_error!("no workers left but still tables to copy")); } } Ok(()) @@ -1268,9 +1266,7 @@ impl Connection { let dst_site = self.dst.site.cheap_clone(); let Some(conn) = self.conn.as_mut() else { - return Err(constraint_violation!( - "copy connection went missing (copy_data)" - )); + return Err(internal_error!("copy connection went missing (copy_data)")); }; conn.lock(&self.logger, &dst_site)?; diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 5d83a563181..d58b26370c8 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -42,7 +42,7 @@ use std::{str::FromStr, sync::Arc}; use crate::connection_pool::ForeignServer; use crate::{block_range::BLOCK_RANGE_COLUMN, primary::Site}; -use graph::constraint_violation; +use graph::internal_error; #[derive(DbEnum, Debug, Clone, Copy)] #[PgType = "text"] @@ -92,7 +92,7 @@ impl TryFrom> for OnSync { None => Ok(OnSync::None), Some("activate") => Ok(OnSync::Activate), Some("replace") => Ok(OnSync::Replace), - _ => Err(constraint_violation!("illegal value for on_sync: {value}")), + _ => Err(internal_error!("illegal value for on_sync: {value}")), } } } @@ -466,7 +466,7 @@ pub fn transact_block( ))), // More than one matching row was found. - _ => Err(StoreError::ConstraintViolation( + _ => Err(StoreError::InternalError( "duplicate deployments in shard".to_owned(), )), } @@ -515,7 +515,7 @@ pub fn forward_block_ptr( }, // More than one matching row was found. - _ => Err(StoreError::ConstraintViolation( + _ => Err(StoreError::InternalError( "duplicate deployments in shard".to_owned(), )), } @@ -612,7 +612,7 @@ pub fn initialize_block_ptr(conn: &mut PgConnection, site: &Site) -> Result<(), .select(d::latest_ethereum_block_hash) .first::>>(conn) .map_err(|e| { - constraint_violation!( + internal_error!( "deployment sgd{} must have been created before calling initialize_block_ptr but we got {}", site.id, e ) @@ -645,10 +645,10 @@ pub fn initialize_block_ptr(conn: &mut PgConnection, site: &Site) -> Result<(), fn convert_to_u32(number: Option, field: &str, subgraph: &str) -> Result { number - .ok_or_else(|| constraint_violation!("missing {} for subgraph `{}`", field, subgraph)) + .ok_or_else(|| internal_error!("missing {} for subgraph `{}`", field, subgraph)) .and_then(|number| { u32::try_from(number).map_err(|_| { - constraint_violation!( + internal_error!( "invalid value {:?} for {} in subgraph {}", number, field, @@ -1330,7 +1330,7 @@ pub fn set_on_sync( match n { 0 => Err(StoreError::DeploymentNotFound(site.to_string())), 1 => Ok(()), - _ => Err(constraint_violation!( + _ => Err(internal_error!( "multiple manifests for deployment {}", site.to_string() )), diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 92de85f316e..948b6e94410 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -37,8 +37,8 @@ use std::time::{Duration, Instant}; use graph::components::store::EntityCollection; use graph::components::subgraph::{ProofOfIndexingFinisher, ProofOfIndexingVersion}; -use graph::constraint_violation; use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError}; +use graph::internal_error; use graph::prelude::{ anyhow, debug, info, o, warn, web3, AttributeNames, BlockNumber, BlockPtr, CheapClone, DeploymentHash, DeploymentState, Entity, EntityQuery, Error, Logger, QueryExecutionError, @@ -806,7 +806,7 @@ impl DeploymentStore { reorg_threshold: BlockNumber, ) -> Result<(), StoreError> { if history_blocks <= reorg_threshold { - return Err(constraint_violation!( + return Err(internal_error!( "the amount of history to keep for sgd{} can not be set to \ {history_blocks} since it must be more than the \ reorg threshold {reorg_threshold}", @@ -1208,9 +1208,7 @@ impl DeploymentStore { Some(Ok(Ok(()))) => Ok(false), Some(Ok(Err(err))) => Err(StoreError::PruneFailure(err.to_string())), Some(Err(join_err)) => Err(StoreError::PruneFailure(join_err.to_string())), - None => Err(constraint_violation!( - "prune handle is finished but not ready" - )), + None => Err(internal_error!("prune handle is finished but not ready")), } } Some(false) => { @@ -1324,7 +1322,7 @@ impl DeploymentStore { // Sanity check on block numbers let from_number = block_ptr_from.map(|ptr| ptr.number); if from_number <= Some(block_ptr_to.number) { - constraint_violation!( + internal_error!( "truncate must go backwards, but would go from block {} to block {}", from_number.unwrap_or(0), block_ptr_to.number @@ -1350,7 +1348,7 @@ impl DeploymentStore { // Sanity check on block numbers let from_number = block_ptr_from.map(|ptr| ptr.number); if from_number <= Some(block_ptr_to.number) { - constraint_violation!( + internal_error!( "rewind must go backwards, but would go from block {} to block {}", from_number.unwrap_or(0), block_ptr_to.number @@ -1387,7 +1385,7 @@ impl DeploymentStore { let info = self.subgraph_info_with_conn(&mut conn, site.cheap_clone())?; if let Some(graft_block) = info.graft_block { if graft_block > block_ptr_to.number { - return Err(constraint_violation!( + return Err(internal_error!( "Can not revert subgraph `{}` to block {} as it was \ grafted at block {} and reverting past a graft point \ is not possible", diff --git a/store/postgres/src/detail.rs b/store/postgres/src/detail.rs index 807e238f4fe..168af5b5d51 100644 --- a/store/postgres/src/detail.rs +++ b/store/postgres/src/detail.rs @@ -17,7 +17,7 @@ use graph::prelude::{ BigDecimal, BlockPtr, DeploymentHash, StoreError, SubgraphDeploymentEntity, }; use graph::schema::InputSchema; -use graph::{constraint_violation, data::subgraph::status, prelude::web3::types::H256}; +use graph::{data::subgraph::status, internal_error, prelude::web3::types::H256}; use itertools::Itertools; use std::collections::HashMap; use std::convert::TryFrom; @@ -134,7 +134,7 @@ impl TryFrom for SubgraphError { _ => None, }; let subgraph_id = DeploymentHash::new(subgraph_id).map_err(|id| { - StoreError::ConstraintViolation(format!("invalid subgraph id `{}` in fatal error", id)) + StoreError::InternalError(format!("invalid subgraph id `{}` in fatal error", id)) })?; Ok(SubgraphError { subgraph_id, @@ -155,7 +155,7 @@ pub(crate) fn block( match (hash, number) { (Some(hash), Some(number)) => { let number = number.to_i32().ok_or_else(|| { - constraint_violation!( + internal_error!( "the block number {} for {} in {} is not representable as an i32", number, name, @@ -168,7 +168,7 @@ pub(crate) fn block( ))) } (None, None) => Ok(None), - (hash, number) => Err(constraint_violation!( + (hash, number) => Err(internal_error!( "the hash and number \ of a block pointer must either both be null or both have a \ value, but for `{}` the hash of {} is `{:?}` and the number is `{:?}`", @@ -208,7 +208,7 @@ pub(crate) fn info_from_details( let site = sites .iter() .find(|site| site.deployment.as_str() == deployment) - .ok_or_else(|| constraint_violation!("missing site for subgraph `{}`", deployment))?; + .ok_or_else(|| internal_error!("missing site for subgraph `{}`", deployment))?; // This needs to be filled in later since it lives in a // different shard @@ -227,7 +227,7 @@ pub(crate) fn info_from_details( latest_block, }; let entity_count = entity_count.to_u64().ok_or_else(|| { - constraint_violation!( + internal_error!( "the entityCount for {} is not representable as a u64", deployment ) @@ -438,13 +438,13 @@ impl StoredDeploymentEntity { .graft_base .map(DeploymentHash::new) .transpose() - .map_err(|b| constraint_violation!("invalid graft base `{}`", b))?; + .map_err(|b| internal_error!("invalid graft base `{}`", b))?; let debug_fork = detail .debug_fork .map(DeploymentHash::new) .transpose() - .map_err(|b| constraint_violation!("invalid debug fork `{}`", b))?; + .map_err(|b| internal_error!("invalid debug fork `{}`", b))?; Ok(SubgraphDeploymentEntity { manifest: manifest.as_manifest(schema), diff --git a/store/postgres/src/dynds/mod.rs b/store/postgres/src/dynds/mod.rs index 09385fb8a7d..27ab4e78a10 100644 --- a/store/postgres/src/dynds/mod.rs +++ b/store/postgres/src/dynds/mod.rs @@ -7,8 +7,8 @@ use crate::primary::Site; use diesel::PgConnection; use graph::{ components::store::{write, StoredDynamicDataSource}, - constraint_violation, data_source::CausalityRegion, + internal_error, prelude::{BlockNumber, StoreError}, }; @@ -60,7 +60,7 @@ pub(crate) fn update_offchain_status( true => { DataSourcesTable::new(site.namespace.clone()).update_offchain_status(conn, data_sources) } - false => Err(constraint_violation!( + false => Err(internal_error!( "shared schema does not support data source offchain_found", )), } diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index 243a7dc5a57..d4d21ad39c1 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -12,8 +12,8 @@ use diesel::{ use graph::{ anyhow::{anyhow, Context}, components::store::{write, StoredDynamicDataSource}, - constraint_violation, data_source::CausalityRegion, + internal_error, prelude::{serde_json, BlockNumber, StoreError}, }; @@ -164,7 +164,7 @@ impl DataSourcesTable { // Nested offchain data sources might not pass this check, as their `creation_block` // will be their parent's `creation_block`, not necessarily `block`. if causality_region == &CausalityRegion::ONCHAIN && creation_block != &Some(block) { - return Err(constraint_violation!( + return Err(internal_error!( "mismatching creation blocks `{:?}` and `{}`", creation_block, block @@ -293,7 +293,7 @@ impl DataSourcesTable { .execute(conn)?; if count > 1 { - return Err(constraint_violation!( + return Err(internal_error!( "expected to remove at most one offchain data source but would remove {}, causality region: {}", count, ds.causality_region diff --git a/store/postgres/src/dynds/shared.rs b/store/postgres/src/dynds/shared.rs index 34615a720e3..5a2af316fcf 100644 --- a/store/postgres/src/dynds/shared.rs +++ b/store/postgres/src/dynds/shared.rs @@ -11,9 +11,9 @@ use diesel::{insert_into, pg::PgConnection}; use graph::{ components::store::{write, StoredDynamicDataSource}, - constraint_violation, data::store::scalar::ToPrimitive, data_source::CausalityRegion, + internal_error, prelude::{serde_json, BigDecimal, BlockNumber, DeploymentHash, StoreError}, }; @@ -62,7 +62,7 @@ pub(super) fn load( let mut data_sources: Vec = Vec::new(); for (vid, name, context, address, creation_block) in dds.into_iter() { if address.len() != 20 { - return Err(constraint_violation!( + return Err(internal_error!( "Data source address `0x{:?}` for dynamic data source {} should be 20 bytes long but is {} bytes long", address, vid, address.len() @@ -72,7 +72,7 @@ pub(super) fn load( let manifest_idx = manifest_idx_and_name .iter() .find(|(_, manifest_name)| manifest_name == &name) - .ok_or_else(|| constraint_violation!("data source name {} not found", name))? + .ok_or_else(|| internal_error!("data source name {} not found", name))? .0; let creation_block = creation_block.to_i32(); let data_source = StoredDynamicDataSource { @@ -88,7 +88,7 @@ pub(super) fn load( }; if data_sources.last().and_then(|d| d.creation_block) > data_source.creation_block { - return Err(StoreError::ConstraintViolation( + return Err(StoreError::InternalError( "data sources not ordered by creation block".to_string(), )); } @@ -126,7 +126,7 @@ pub(super) fn insert( } = ds; if causality_region != &CausalityRegion::ONCHAIN { - return Err(constraint_violation!( + return Err(internal_error!( "using shared data source schema with file data sources" )); } @@ -134,17 +134,13 @@ pub(super) fn insert( let address = match param { Some(param) => param, None => { - return Err(constraint_violation!( - "dynamic data sources must have an address", - )); + return Err(internal_error!("dynamic data sources must have an address",)); } }; let name = manifest_idx_and_name .iter() .find(|(idx, _)| *idx == ds.manifest_idx) - .ok_or_else(|| { - constraint_violation!("manifest idx {} not found", ds.manifest_idx) - })? + .ok_or_else(|| internal_error!("manifest idx {} not found", ds.manifest_idx))? .1 .clone(); Ok(( diff --git a/store/postgres/src/fork.rs b/store/postgres/src/fork.rs index 1a8e7a7c4ec..40457fb1739 100644 --- a/store/postgres/src/fork.rs +++ b/store/postgres/src/fork.rs @@ -7,10 +7,10 @@ use std::{ use graph::{ block_on, components::store::SubgraphFork as SubgraphForkTrait, - constraint_violation, + internal_error, prelude::{ - info, r::Value as RValue, reqwest, serde_json, DeploymentHash, Entity, Logger, Serialize, - StoreError, Value, ValueType, + anyhow, info, r::Value as RValue, reqwest, serde_json, DeploymentHash, Entity, Logger, + Serialize, StoreError, Value, ValueType, }, schema::Field, url::Url, @@ -69,9 +69,7 @@ impl SubgraphForkTrait for SubgraphFork { let entity_type = self.schema.entity_type(&entity_type_name)?; let fields = &entity_type .object_type() - .map_err(|_| { - constraint_violation!("no object type called `{}` found", entity_type_name) - })? + .map_err(|_| internal_error!("no object type called `{}` found", entity_type_name))? .fields; let query = Query { @@ -211,11 +209,9 @@ query Query ($id: String) {{ map }; - Ok(Some( - schema - .make_entity(map) - .map_err(|e| StoreError::EntityValidationError(e))?, - )) + Ok(Some(schema.make_entity(map).map_err(|e| { + StoreError::Unknown(anyhow!("entity validation failed: {e}")) + })?)) } } diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 39df898ba32..2d4b2624289 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -31,12 +31,12 @@ use diesel::{ }; use graph::{ components::store::DeploymentLocator, - constraint_violation, data::{ store::scalar::ToPrimitive, subgraph::{status, DeploymentFeatures}, }, derive::CheapClone, + internal_error, prelude::{ anyhow, chrono::{DateTime, Utc}, @@ -384,9 +384,9 @@ impl TryFrom for Site { fn try_from(schema: Schema) -> Result { let deployment = DeploymentHash::new(&schema.subgraph) - .map_err(|s| constraint_violation!("Invalid deployment id {}", s))?; + .map_err(|s| internal_error!("Invalid deployment id {}", s))?; let namespace = Namespace::new(schema.name.clone()).map_err(|nsp| { - constraint_violation!( + internal_error!( "Invalid schema name {} for deployment {}", nsp, &schema.subgraph @@ -450,8 +450,8 @@ mod queries { use diesel::sql_types::Text; use graph::prelude::NodeId; use graph::{ - constraint_violation, data::subgraph::status, + internal_error, prelude::{DeploymentHash, StoreError, SubgraphName}, }; use std::{collections::HashMap, convert::TryFrom, convert::TryInto}; @@ -510,7 +510,7 @@ mod queries { .optional()?; match id { Some(id) => DeploymentHash::new(id) - .map_err(|id| constraint_violation!("illegal deployment id: {}", id)), + .map_err(|id| internal_error!("illegal deployment id: {}", id)), None => Err(StoreError::DeploymentNotFound(name.to_string())), } } @@ -673,7 +673,7 @@ mod queries { .optional()? .map(|node| { NodeId::new(&node).map_err(|()| { - constraint_violation!( + internal_error!( "invalid node id `{}` in assignment for `{}`", node, site.deployment @@ -698,7 +698,7 @@ mod queries { .optional()? .map(|(node, ts)| { let node_id = NodeId::new(&node).map_err(|()| { - constraint_violation!( + internal_error!( "invalid node id `{}` in assignment for `{}`", node, site.deployment @@ -837,7 +837,7 @@ impl<'a> Connection<'a> { DeploymentHash::new(hash) .map(|hash| AssignmentChange::removed(DeploymentLocator::new(id.into(), hash))) .map_err(|id| { - StoreError::ConstraintViolation(format!( + StoreError::InternalError(format!( "invalid id `{}` for deployment assignment", id )) @@ -1318,7 +1318,7 @@ impl<'a> Connection<'a> { .cloned() .ok_or_else(|| anyhow!("failed to read schema name for {} back", deployment))?; let namespace = Namespace::new(namespace).map_err(|name| { - constraint_violation!("Generated database schema name {} is invalid", name) + internal_error!("Generated database schema name {} is invalid", name) })?; Ok(Site { @@ -1522,7 +1522,7 @@ impl<'a> Connection<'a> { .transpose() // This can't really happen since we filtered by valid NodeId's .map_err(|node| { - constraint_violation!("database has assignment for illegal node name {:?}", node) + internal_error!("database has assignment for illegal node name {:?}", node) }) } @@ -1559,7 +1559,7 @@ impl<'a> Connection<'a> { .map(|(shard, _)| Shard::new(shard.to_string())) .transpose() // This can't really happen since we filtered by valid shards - .map_err(|e| constraint_violation!("database has illegal shard name: {}", e)) + .map_err(|e| internal_error!("database has illegal shard name: {}", e)) } #[cfg(debug_assertions)] @@ -1729,10 +1729,7 @@ impl<'a> Connection<'a> { let ts = chrono::offset::Local::now() .checked_sub_signed(duration) .ok_or_else(|| { - StoreError::ConstraintViolation(format!( - "duration {} is too large", - duration - )) + StoreError::InternalError(format!("duration {} is too large", duration)) })?; Ok(u::table .filter(u::removed_at.is_null()) diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index fb181b7e74d..6bf8759a202 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -32,11 +32,11 @@ use graph::blockchain::block_stream::{EntityOperationKind, EntitySourceOperation use graph::blockchain::BlockTime; use graph::cheap_clone::CheapClone; use graph::components::store::write::{RowGroup, WriteChunk}; -use graph::constraint_violation; use graph::data::graphql::TypeExt as _; use graph::data::query::Trace; use graph::data::value::Word; use graph::data_source::CausalityRegion; +use graph::internal_error; use graph::prelude::{q, EntityQuery, StopwatchMetrics, ENV_VARS}; use graph::schema::{ EntityKey, EntityType, Field, FulltextConfig, FulltextDefinition, InputSchema, @@ -503,7 +503,7 @@ impl Layout { let key = entity_type.key_in(entity_data.id(), CausalityRegion::from_entity(&entity_data)); if entities.contains_key(&key) { - return Err(constraint_violation!( + return Err(internal_error!( "duplicate entity {}[{}] in result set, block = {}", key.entity_type, key.entity_id, @@ -910,7 +910,7 @@ impl Layout { .map(|id| id.to_string()) .collect::>() .join(", "); - return Err(constraint_violation!( + return Err(internal_error!( "entities of type `{}` can not be updated since they are immutable. Entity ids are [{}]", group.entity_type, ids @@ -968,7 +968,7 @@ impl Layout { let table = self.table_for_entity(&group.entity_type)?; if table.immutable { - return Err(constraint_violation!( + return Err(internal_error!( "entities of type `{}` can not be deleted since they are immutable. Entity ids are [{}]", table.object, group.ids().join(", ") )); @@ -1138,11 +1138,11 @@ impl Layout { let source_type = mapping.source_type(schema); let source_table = tables .get(&source_type) - .ok_or_else(|| constraint_violation!("Table for {source_type} is missing"))?; + .ok_or_else(|| internal_error!("Table for {source_type} is missing"))?; let agg_type = mapping.agg_type(schema); let agg_table = tables .get(&agg_type) - .ok_or_else(|| constraint_violation!("Table for {agg_type} is missing"))?; + .ok_or_else(|| internal_error!("Table for {agg_type} is missing"))?; let aggregation = mapping.aggregation(schema); let rollup = Rollup::new( mapping.interval, @@ -1612,9 +1612,9 @@ impl Table { ) -> Result { SqlName::check_valid_identifier(defn.as_str(), "object")?; - let object_type = defn.object_type().map_err(|_| { - constraint_violation!("The type `{}` is not an object type", defn.as_str()) - })?; + let object_type = defn + .object_type() + .map_err(|_| internal_error!("The type `{}` is not an object type", defn.as_str()))?; let table_name = SqlName::from(defn.as_str()); let columns = object_type diff --git a/store/postgres/src/relational/rollup.rs b/store/postgres/src/relational/rollup.rs index b9177a0052b..9a9830f6b5a 100644 --- a/store/postgres/src/relational/rollup.rs +++ b/store/postgres/src/relational/rollup.rs @@ -63,8 +63,8 @@ use diesel::{sql_query, PgConnection, RunQueryDsl as _}; use diesel::sql_types::{Integer, Nullable, Timestamptz}; use graph::blockchain::BlockTime; use graph::components::store::{BlockNumber, StoreError}; -use graph::constraint_violation; use graph::data::store::IdType; +use graph::internal_error; use graph::schema::{ Aggregate, AggregateFn, Aggregation, AggregationInterval, ExprVisitor, VisitExpr, }; @@ -111,7 +111,7 @@ fn rewrite<'a>(table: &'a Table, expr: &str) -> Result<(String, Vec<&'a str>), S fn not_supported(&mut self, msg: String) { if self.error.is_none() { - self.error = Some(constraint_violation!( + self.error = Some(internal_error!( "Schema validation should have found expression errors: {}", msg )); diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 028f6044c34..533990c42b9 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -94,9 +94,9 @@ impl From for diesel::result::Error { } } -// Similar to graph::prelude::constraint_violation, but returns a Diesel +// Similar to graph::prelude::internal_error, but returns a Diesel // error for use in the guts of query generation -macro_rules! constraint_violation { +macro_rules! internal_error { ($msg:expr) => {{ diesel::result::Error::QueryBuilderError(anyhow!("{}", $msg).into()) }}; @@ -431,7 +431,7 @@ pub fn parse_id(id_type: IdType, json: serde_json::Value) -> Result SqlValue<'a> { String(s) => match column_type { ColumnType::String|ColumnType::Enum(_)|ColumnType::TSVector(_) => S::Text(s), ColumnType::Int8 => S::Int8(s.parse::().map_err(|e| { - constraint_violation!("failed to convert `{}` to an Int8: {}", s, e.to_string()) + internal_error!("failed to convert `{}` to an Int8: {}", s, e.to_string()) })?), ColumnType::Bytes => { let bytes = scalar::Bytes::from_str(s) @@ -913,7 +913,7 @@ impl PrefixType { match column.column_type() { ColumnType::String => Ok(PrefixType::String), ColumnType::Bytes => Ok(PrefixType::Bytes), - _ => Err(constraint_violation!( + _ => Err(internal_error!( "cannot setup prefix comparison for column {} of type {}", column, column.column_type().sql_type() @@ -1086,7 +1086,7 @@ impl<'a> QueryFragment for PrefixComparison<'a> { // For `op` either `<=` or `>=`, we can write (using '<=' as an example) // uv <= st <=> u < s || u = s && uv <= st let large = self.kind.is_large(&self.value).map_err(|()| { - constraint_violation!( + internal_error!( "column {} has type {} and can't be compared with the value `{}` using {}", self.column, self.column.column_type().sql_type(), @@ -2237,7 +2237,7 @@ impl<'a> InsertRow<'a> { .filter_map(|field| row.entity.get(field)) .map(|value| match value { Value::String(s) => Ok(s), - _ => Err(constraint_violation!( + _ => Err(internal_error!( "fulltext fields must be strings but got {:?}", value )), @@ -3178,7 +3178,7 @@ impl<'a> FilterCollection<'a> { if windows.iter().map(FilterWindow::parent_type).all_equal() { Ok(Some(windows[0].parent_type()?)) } else { - Err(graph::constraint_violation!( + Err(graph::internal_error!( "all implementors of an interface must use the same type for their `id`" )) } @@ -3448,7 +3448,7 @@ impl<'a> SortKey<'a> { true => ( parent_table.primary_key(), child_table.column_for_field(&join_attribute).map_err(|_| { - graph::constraint_violation!( + graph::internal_error!( "Column for a join attribute `{}` of `{}` table not found", join_attribute, child_table.name() @@ -3459,7 +3459,7 @@ impl<'a> SortKey<'a> { parent_table .column_for_field(&join_attribute) .map_err(|_| { - graph::constraint_violation!( + graph::internal_error!( "Column for a join attribute `{}` of `{}` table not found", join_attribute, parent_table.name() @@ -3535,7 +3535,7 @@ impl<'a> SortKey<'a> { child_table .column_for_field(&child.join_attribute) .map_err(|_| { - graph::constraint_violation!( + graph::internal_error!( "Column for a join attribute `{}` of `{}` table not found", child.join_attribute, child_table.name() @@ -3546,7 +3546,7 @@ impl<'a> SortKey<'a> { parent_table .column_for_field(&child.join_attribute) .map_err(|_| { - graph::constraint_violation!( + graph::internal_error!( "Column for a join attribute `{}` of `{}` table not found", child.join_attribute, parent_table.name() @@ -3586,7 +3586,7 @@ impl<'a> SortKey<'a> { direction: SortDirection, ) -> Result, QueryExecutionError> { if entity_types.is_empty() { - return Err(QueryExecutionError::ConstraintViolation( + return Err(QueryExecutionError::InternalError( "Cannot order by child interface with no implementing entity types".to_string(), )); } @@ -3744,7 +3744,7 @@ impl<'a> SortKey<'a> { direction: _, } => { if column.is_primary_key() { - return Err(constraint_violation!("SortKey::Key never uses 'id'")); + return Err(internal_error!("SortKey::Key never uses 'id'")); } match select_statement_level { @@ -3764,7 +3764,7 @@ impl<'a> SortKey<'a> { match nested { ChildKey::Single(child) => { if child.sort_by_column.is_primary_key() { - return Err(constraint_violation!("SortKey::Key never uses 'id'")); + return Err(internal_error!("SortKey::Key never uses 'id'")); } match select_statement_level { @@ -3781,7 +3781,7 @@ impl<'a> SortKey<'a> { ChildKey::Many(_, children) => { for child in children.iter() { if child.sort_by_column.is_primary_key() { - return Err(constraint_violation!("SortKey::Key never uses 'id'")); + return Err(internal_error!("SortKey::Key never uses 'id'")); } out.push_sql(", "); child.sort_by_column.walk_ast(out.reborrow())?; @@ -3930,9 +3930,7 @@ impl<'a> SortKey<'a> { ) -> QueryResult<()> { if column.is_primary_key() { // This shouldn't happen since we'd use SortKey::IdAsc/Desc - return Err(constraint_violation!( - "sort_expr called with primary key column" - )); + return Err(internal_error!("sort_expr called with primary key column")); } fn push_prefix(prefix: Option<&str>, out: &mut AstPass) { @@ -3990,14 +3988,14 @@ impl<'a> SortKey<'a> { let sort_by = &child.sort_by_column; if sort_by.is_primary_key() { // This shouldn't happen since we'd use SortKey::ManyIdAsc/ManyDesc - return Err(constraint_violation!( + return Err(internal_error!( "multi_sort_expr called with primary key column" )); } match sort_by.column_type() { ColumnType::TSVector(_) => { - return Err(constraint_violation!("TSVector is not supported")); + return Err(internal_error!("TSVector is not supported")); } _ => {} } @@ -4565,7 +4563,7 @@ impl<'a> ClampRangeQuery<'a> { block: BlockNumber, ) -> Result { if table.immutable { - Err(graph::constraint_violation!( + Err(graph::internal_error!( "immutable entities can not be deleted or updated (table `{}`)", table.qualified_name )) @@ -4674,7 +4672,7 @@ pub struct RevertClampQuery<'a> { impl<'a> RevertClampQuery<'a> { pub(crate) fn new(table: &'a Table, block: BlockNumber) -> Result { if table.immutable { - Err(graph::constraint_violation!( + Err(graph::internal_error!( "can not revert clamping in immutable table `{}`", table.qualified_name )) @@ -4894,7 +4892,7 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { out.push_sql(", 0"); } (true, false) => { - return Err(constraint_violation!( + return Err(internal_error!( "can not copy entity type {} to {} because the src has a causality region but the dst does not", self.src.object.as_str(), self.dst.object.as_str() diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index 7eb428a5058..bda5b2da136 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -9,8 +9,8 @@ use graph::{ StatusStore, Store as StoreTrait, }, }, - constraint_violation, data::subgraph::status, + internal_error, prelude::{ web3::types::Address, BlockNumber, BlockPtr, CheapClone, DeploymentHash, PartialBlockPtr, QueryExecutionError, StoreError, @@ -87,7 +87,7 @@ impl QueryStoreManager for Store { .and_then(|x| x)?; let chain_store = self.block_store.chain_store(&site.network).ok_or_else(|| { - constraint_violation!( + internal_error!( "Subgraphs index a known network, but {} indexes `{}` which we do not know about. This is most likely a configuration error.", site.deployment, site.network diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 339c66cee3f..5564a7d1726 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -21,9 +21,9 @@ use graph::{ PruneReporter, PruneRequest, SubgraphFork, }, }, - constraint_violation, data::query::QueryTarget, data::subgraph::{schema::DeploymentCreate, status, DeploymentFeatures}, + internal_error, prelude::{ anyhow, lazy_static, o, web3::types::Address, ApiVersion, BlockNumber, BlockPtr, ChainStore, DeploymentHash, EntityOperation, Logger, MetricsRegistry, NodeId, @@ -443,7 +443,7 @@ impl SubgraphStoreInner { fn evict(&self, id: &DeploymentHash) -> Result<(), StoreError> { if let Some((site, _)) = self.sites.remove(id) { let store = self.stores.get(&site.shard).ok_or_else(|| { - constraint_violation!( + internal_error!( "shard {} for deployment sgd{} not found when evicting", site.shard, site.id @@ -540,9 +540,7 @@ impl SubgraphStoreInner { let placement = self .placer .place(name.as_str(), network_name) - .map_err(|msg| { - constraint_violation!("illegal indexer name in deployment rule: {}", msg) - })?; + .map_err(|msg| internal_error!("illegal indexer name in deployment rule: {}", msg))?; match placement { None => Ok((PRIMARY_SHARD.clone(), default_node)), @@ -985,7 +983,7 @@ impl SubgraphStoreInner { pub(crate) fn version_info(&self, version: &str) -> Result { if let Some((deployment_id, created_at)) = self.mirror.version_info(version)? { let id = DeploymentHash::new(deployment_id.clone()) - .map_err(|id| constraint_violation!("illegal deployment id {}", id))?; + .map_err(|id| internal_error!("illegal deployment id {}", id))?; let (store, site) = self.store(&id)?; let statuses = store.deployment_statuses(&[site.clone()])?; let status = statuses @@ -994,7 +992,7 @@ impl SubgraphStoreInner { let chain = status .chains .first() - .ok_or_else(|| constraint_violation!("no chain info for {}", deployment_id))?; + .ok_or_else(|| internal_error!("no chain info for {}", deployment_id))?; let latest_ethereum_block_number = chain.latest_block.as_ref().map(|block| block.number()); let subgraph_info = store.subgraph_info(site.cheap_clone())?; @@ -1601,7 +1599,7 @@ impl SubgraphStoreTrait for SubgraphStore { fn active_locator(&self, hash: &str) -> Result, StoreError> { let sites = self.mirror.find_sites(&[hash.to_string()], true)?; if sites.len() > 1 { - return Err(constraint_violation!( + return Err(internal_error!( "There are {} active deployments for {hash}, there should only be one", sites.len() )); diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 3d85042d07c..628b1741e24 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -9,10 +9,10 @@ use async_trait::async_trait; use graph::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor}; use graph::blockchain::BlockTime; use graph::components::store::{Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore}; -use graph::constraint_violation; use graph::data::store::IdList; use graph::data::subgraph::schema; use graph::data_source::CausalityRegion; +use graph::internal_error; use graph::prelude::{ BlockNumber, CacheWeight, Entity, MetricsRegistry, SubgraphDeploymentEntity, SubgraphStore as _, BLOCK_NUMBER_MAX, @@ -133,7 +133,7 @@ impl LastRollupTracker { *last = LastRollup::Some(block_time); } (LastRollup::Some(_) | LastRollup::Unknown, None) => { - constraint_violation!("block time cannot be unset"); + internal_error!("block time cannot be unset"); } } @@ -684,8 +684,8 @@ impl Request { let batch = batch.read().unwrap(); if let Some(err) = &batch.error { // This can happen when appending to the batch failed - // because of a constraint violation. Returning an `Err` - // here will poison and shut down the queue + // because of an internal error. Returning an `Err` here + // will poison and shut down the queue return Err(err.clone()); } let res = store @@ -1342,7 +1342,7 @@ impl Writer { // If there was an error, report that instead of a naked 'writer not running' queue.check_err()?; if join_handle.is_finished() { - Err(constraint_violation!( + Err(internal_error!( "Subgraph writer for {} is not running", queue.store.site )) @@ -1679,7 +1679,7 @@ impl WritableStoreTrait for WritableStore { if let Some(block_ptr) = self.block_ptr.lock().unwrap().as_ref() { if block_ptr_to.number <= block_ptr.number { - return Err(constraint_violation!( + return Err(internal_error!( "transact_block_operations called for block {} but its head is already at {}", block_ptr_to, block_ptr diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index 2e3e138d567..d83ec8cbf48 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -449,6 +449,7 @@ fn read_range_pool_created_test() { let pool_created_type = TEST_SUBGRAPH_SCHEMA.entity_type("PoolCreated").unwrap(); let entity_types = vec![pool_created_type.clone()]; + let mut last_op: Option = None; for count in (1..=2).map(|x| x as i64) { let id = if count == 1 { "0xff80818283848586" @@ -478,6 +479,7 @@ fn read_range_pool_created_test() { data, }; + last_op = Some(op.clone()); transact_entity_operations( &subgraph_store, &deployment, @@ -500,5 +502,21 @@ fn read_range_pool_created_test() { let a = result_entities[index as usize].clone(); assert_eq!(a, format!("{:?}", en)); } + + // Make sure we get a constraint violation + let op = last_op.take().unwrap(); + + transact_entity_operations(&subgraph_store, &deployment, block_pointer(3), vec![op]) + .await + .unwrap(); + let res = writable.flush().await; + let exp = "duplicate key value violates unique constraint \"pool_created_pkey\": Key (vid)=(2) already exists."; + match res { + Ok(_) => panic!("Expected error, but got success"), + Err(StoreError::ConstraintViolation(msg)) => { + assert_eq!(msg, exp); + } + Err(e) => panic!("Expected constraint violation, but got {:?}", e), + } }) }