Skip to content

Commit

Permalink
Add causality region column and implement isolation rules (#4162)
Browse files Browse the repository at this point in the history
* feature(offchain): Add `causality_region` column to entity tables

For now this just tracks the tables that need the column and adds the
column to the DDL, but still unconditionally inserts 0. Inserting the
correct causality region is follow up work.

* store: Move `has_causality_region` to manifest, rename to `entities_with_causality_region`

* *: Add `causality_region` to EntityKey

The tricky part was changing `get_many` to return the entity key.

* store: Insert the causality region

* store: Read isolation between causality regions

It was just necessary to make sure that `find` and `find_many` use
the causality region in their where clause.

* fix: Fix release build

* provider: Make stop idempotent

Callers wanted that anyways, and it helps tests.

* tests: Refactor file ds test to use events

* tests: Test conflict between onchain and offchain

* tests: Test conflict between offchain and offchain

* test: Fix unit test

* tests: Improve tests and comments to address review

* fix: Change migration to add column 'if not exists'
  • Loading branch information
leoyvens authored Jan 25, 2023
1 parent 0628b2c commit 04fbd9d
Show file tree
Hide file tree
Showing 46 changed files with 925 additions and 356 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use anyhow::Error;
use graph::{
blockchain::{self, block_stream::BlockWithTriggers, BlockPtr, EmptyNodeCapabilities},
components::{
store::{DeploymentLocator, EntityKey, SubgraphFork},
store::{DeploymentLocator, EntityKey, EntityType, SubgraphFork},
subgraph::{MappingError, ProofOfIndexingEvent, SharedProofOfIndexing},
},
data::store::scalar::Bytes,
data_source,
data_source::{self, CausalityRegion},
prelude::{
anyhow, async_trait, BigDecimal, BigInt, BlockHash, BlockNumber, BlockState, Entity,
RuntimeHostBuilder, Value,
Expand Down Expand Up @@ -183,7 +183,11 @@ where
Operation::Create | Operation::Update => {
let entity_type: &str = &entity_change.entity;
let entity_id: String = entity_change.id.clone();
let key = EntityKey::data(entity_type.to_string(), entity_id.clone());
let key = EntityKey {
entity_type: EntityType::new(entity_type.to_string()),
entity_id: entity_id.clone().into(),
causality_region: CausalityRegion::ONCHAIN, // Substreams don't currently support offchain data
};
let mut data: HashMap<String, Value> = HashMap::from_iter(vec![]);

for field in entity_change.fields.iter() {
Expand Down Expand Up @@ -214,7 +218,11 @@ where
Operation::Delete => {
let entity_type: &str = &entity_change.entity;
let entity_id: String = entity_change.id.clone();
let key = EntityKey::data(entity_type.to_string(), entity_id.clone());
let key = EntityKey {
entity_type: EntityType::new(entity_type.to_string()),
entity_id: entity_id.clone().into(),
causality_region: CausalityRegion::ONCHAIN, // Substreams don't currently support offchain data
};

state.entity_cache.remove(key);

Expand Down
4 changes: 1 addition & 3 deletions core/src/subgraph/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
{
// Shut down subgraph processing
self.instance_manager.stop_subgraph(deployment).await;
Ok(())
} else {
Err(SubgraphAssignmentProviderError::NotRunning(deployment))
}
Ok(())
}
}
25 changes: 21 additions & 4 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ where
// - The event stream sees a Remove event for subgraph B, but the table query finds that
// subgraph B has already been removed.
// The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning
// (on subgraph start) or NotRunning (on subgraph stop) error types, which makes the
// operations idempotent.
// (on subgraph start) which makes the operations idempotent. Subgraph stop is already idempotent.

// Start event stream
let assignment_event_stream = self.assignment_events();
Expand Down Expand Up @@ -455,7 +454,6 @@ async fn handle_assignment_event(
node_id: _,
} => match provider.stop(deployment).await {
Ok(()) => Ok(()),
Err(SubgraphAssignmentProviderError::NotRunning(_)) => Ok(()),
Err(e) => Err(CancelableError::Error(e)),
},
}
Expand Down Expand Up @@ -620,11 +618,30 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
"block" => format!("{:?}", base_block.as_ref().map(|(_,ptr)| ptr.number))
);

// Entity types that may be touched by offchain data sources need a causality region column.
let needs_causality_region = manifest
.data_sources
.iter()
.filter_map(|ds| ds.as_offchain())
.map(|ds| ds.mapping.entities.iter())
.chain(
manifest
.templates
.iter()
.filter_map(|ds| ds.as_offchain())
.map(|ds| ds.mapping.entities.iter()),
)
.flatten()
.cloned()
.collect();

// Apply the subgraph versioning and deployment operations,
// creating a new subgraph deployment if one doesn't exist.
let deployment = DeploymentCreate::new(raw_string, &manifest, start_block)
.graft(base_block)
.debug(debug_fork);
.debug(debug_fork)
.entities_with_causality_region(needs_causality_region);

deployment_store
.create_subgraph_deployment(
name,
Expand Down
10 changes: 9 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use graph::data::subgraph::{
SubgraphFeature,
};
use graph::data_source::{
offchain, DataSource, DataSourceCreationError, DataSourceTemplate, TriggerData,
offchain, CausalityRegion, DataSource, DataSourceCreationError, DataSourceTemplate, TriggerData,
};
use graph::env::EnvVars;
use graph::prelude::*;
Expand Down Expand Up @@ -653,6 +653,7 @@ where
let mut block_state = BlockState::<C>::new(EmptyStore::new(schema), LfuCache::new());

// PoI ignores offchain events.
// See also: poi-ignores-offchain
let proof_of_indexing = None;
let causality_region = "";

Expand Down Expand Up @@ -998,7 +999,14 @@ async fn update_proof_of_indexing(
// Create the special POI entity key specific to this causality_region
let entity_key = EntityKey {
entity_type: POI_OBJECT.to_owned(),

// There are two things called causality regions here, one is the causality region for
// the poi which is a string and the PoI entity id. The other is the data source
// causality region to which the PoI belongs as an entity. Currently offchain events do
// not affect PoI so it is assumed to be `ONCHAIN`.
// See also: poi-ignores-offchain
entity_id: causality_region.into(),
causality_region: CausalityRegion::ONCHAIN,
};

// Grab the current digest attribute on this entity
Expand Down
28 changes: 8 additions & 20 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use anyhow::anyhow;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::sync::Arc;

use crate::components::store::{
self as s, Entity, EntityKey, EntityOp, EntityOperation, EntityType,
};
use crate::components::store::{self as s, Entity, EntityKey, EntityOp, EntityOperation};
use crate::prelude::{Schema, ENV_VARS};
use crate::util::lfu_cache::LfuCache;

Expand Down Expand Up @@ -102,6 +100,10 @@ impl EntityCache {
// Get the current entity, apply any updates from `updates`, then
// from `handler_updates`.
let mut entity = self.current.get_entity(&*self.store, eref)?;

// Always test the cache consistency in debug mode.
debug_assert!(entity == self.store.get(&eref).unwrap());

if let Some(op) = self.updates.get(eref).cloned() {
entity = op.apply_to(entity)
}
Expand Down Expand Up @@ -233,22 +235,8 @@ impl EntityCache {
// violation in the database, ensuring correctness
let missing = missing.filter(|key| !self.schema.is_immutable(&key.entity_type));

let mut missing_by_type: BTreeMap<&EntityType, Vec<&str>> = BTreeMap::new();
for key in missing {
missing_by_type
.entry(&key.entity_type)
.or_default()
.push(&key.entity_id);
}

for (entity_type, entities) in self.store.get_many(missing_by_type)? {
for entity in entities {
let key = EntityKey {
entity_type: entity_type.clone(),
entity_id: entity.id().unwrap().into(),
};
self.current.insert(key, Some(entity));
}
for (entity_key, entity) in self.store.get_many(missing.cloned().collect())? {
self.current.insert(entity_key, Some(entity));
}

let mut mods = Vec::new();
Expand Down
47 changes: 39 additions & 8 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod traits;

pub use entity_cache::{EntityCache, ModificationsAndCache};

use diesel::types::{FromSql, ToSql};
pub use err::StoreError;
use itertools::Itertools;
pub use traits::*;
Expand All @@ -12,13 +13,14 @@ use futures::stream::poll_fn;
use futures::{Async, Poll, Stream};
use graphql_parser::schema as s;
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fmt;
use std::fmt::Display;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::{fmt, io};

use crate::blockchain::Block;
use crate::data::store::scalar::Bytes;
Expand Down Expand Up @@ -71,6 +73,12 @@ impl<'a> From<&s::InterfaceType<'a, String>> for EntityType {
}
}

impl Borrow<str> for EntityType {
fn borrow(&self) -> &str {
&self.0
}
}

// This conversion should only be used in tests since it makes it too
// easy to convert random strings into entity types
#[cfg(debug_assertions)]
Expand All @@ -82,6 +90,22 @@ impl From<&str> for EntityType {

impl CheapClone for EntityType {}

impl FromSql<diesel::sql_types::Text, diesel::pg::Pg> for EntityType {
fn from_sql(bytes: Option<&[u8]>) -> diesel::deserialize::Result<Self> {
let s = <String as FromSql<_, diesel::pg::Pg>>::from_sql(bytes)?;
Ok(EntityType::new(s))
}
}

impl ToSql<diesel::sql_types::Text, diesel::pg::Pg> for EntityType {
fn to_sql<W: io::Write>(
&self,
out: &mut diesel::serialize::Output<W, diesel::pg::Pg>,
) -> diesel::serialize::Result {
<str as ToSql<diesel::sql_types::Text, diesel::pg::Pg>>::to_sql(self.0.as_str(), out)
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EntityFilterDerivative(bool);

Expand All @@ -104,13 +128,23 @@ pub struct EntityKey {

/// ID of the individual entity.
pub entity_id: Word,

/// This is the causality region of the data source that created the entity.
///
/// In the case of an entity lookup, this is the causality region of the data source that is
/// doing the lookup. So if the entity exists but was created on a different causality region,
/// the lookup will return empty.
pub causality_region: CausalityRegion,
}

impl EntityKey {
pub fn data(entity_type: String, entity_id: String) -> Self {
// For use in tests only
#[cfg(debug_assertions)]
pub fn data(entity_type: impl Into<String>, entity_id: impl Into<String>) -> Self {
Self {
entity_type: EntityType::new(entity_type),
entity_id: entity_id.into(),
entity_type: EntityType::new(entity_type.into()),
entity_id: entity_id.into().into(),
causality_region: CausalityRegion::ONCHAIN,
}
}
}
Expand Down Expand Up @@ -1071,10 +1105,7 @@ impl ReadStore for EmptyStore {
Ok(None)
}

fn get_many(
&self,
_ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
fn get_many(&self, _: BTreeSet<EntityKey>) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
Ok(BTreeMap::new())
}

Expand Down
13 changes: 6 additions & 7 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,11 @@ pub trait ReadStore: Send + Sync + 'static {
/// Looks up an entity using the given store key at the latest block.
fn get(&self, key: &EntityKey) -> Result<Option<Entity>, StoreError>;

/// Look up multiple entities as of the latest block. Returns a map of
/// entities by type.
/// Look up multiple entities as of the latest block.
fn get_many(
&self,
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError>;
keys: BTreeSet<EntityKey>,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError>;

fn input_schema(&self) -> Arc<Schema>;
}
Expand All @@ -189,9 +188,9 @@ impl<T: ?Sized + ReadStore> ReadStore for Arc<T> {

fn get_many(
&self,
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
(**self).get_many(ids_for_type)
keys: BTreeSet<EntityKey>,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
(**self).get_many(keys)
}

fn input_schema(&self) -> Arc<Schema> {
Expand Down
4 changes: 2 additions & 2 deletions graph/src/data/graphql/object_or_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<'a> ObjectOrInterface<'a> {
ObjectOrInterface::Object(object) => Some(vec![object]),
ObjectOrInterface::Interface(interface) => schema
.types_for_interface()
.get(&interface.into())
.get(interface.name.as_str())
.map(|object_types| object_types.iter().collect()),
}
}
Expand All @@ -131,7 +131,7 @@ impl<'a> ObjectOrInterface<'a> {
) -> bool {
match self {
ObjectOrInterface::Object(o) => o.name == typename,
ObjectOrInterface::Interface(i) => types_for_interface[&i.into()]
ObjectOrInterface::Interface(i) => types_for_interface[i.name.as_str()]
.iter()
.any(|o| o.name == typename),
}
Expand Down
4 changes: 1 addition & 3 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
blockchain::{BlockPtr, Blockchain, DataSource as _},
components::{
link_resolver::LinkResolver,
store::{DeploymentLocator, StoreError, SubgraphStore},
store::{StoreError, SubgraphStore},
},
data::{
graphql::TryFromValue,
Expand Down Expand Up @@ -304,8 +304,6 @@ pub enum SubgraphAssignmentProviderError {
/// Occurs when attempting to remove a subgraph that's not hosted.
#[error("Subgraph with ID {0} already running")]
AlreadyRunning(DeploymentHash),
#[error("Subgraph with ID {0} is not running")]
NotRunning(DeploymentLocator),
#[error("Subgraph provider error: {0}")]
Unknown(#[from] anyhow::Error),
}
Expand Down
Loading

0 comments on commit 04fbd9d

Please sign in to comment.