Skip to content

Commit

Permalink
feature(offchain): Add causality_region column to entity tables
Browse files Browse the repository at this point in the history
For now this just tracks the tables that need the column and adds the
column to the DDL, but still unconditionally inserts 0. Inserting the
correct causality region is follow up work.
  • Loading branch information
leoyvens committed Nov 11, 2022
1 parent c3e1634 commit 33e7ee7
Show file tree
Hide file tree
Showing 24 changed files with 236 additions and 60 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,11 +617,30 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
"block" => format!("{:?}", base_block.as_ref().map(|(_,ptr)| ptr.number))
);

// Entity types that may be touched by offchain data sources need a causality region column.
let needs_causality_region = manifest
.data_sources
.iter()
.filter_map(|ds| ds.as_offchain())
.map(|ds| ds.mapping.entities.iter())
.chain(
manifest
.templates
.iter()
.filter_map(|ds| ds.as_offchain())
.map(|ds| ds.mapping.entities.iter()),
)
.flatten()
.cloned()
.collect();

// Apply the subgraph versioning and deployment operations,
// creating a new subgraph deployment if one doesn't exist.
let deployment = DeploymentCreate::new(raw_string, &manifest, start_block)
.graft(base_block)
.debug(debug_fork);
.debug(debug_fork)
.has_causality_region(needs_causality_region);

deployment_store
.create_subgraph_deployment(
name,
Expand Down
26 changes: 25 additions & 1 deletion graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod traits;

pub use cache::{CachedEthereumCall, EntityCache, ModificationsAndCache};

use diesel::types::{FromSql, ToSql};
pub use err::StoreError;
use itertools::Itertools;
pub use traits::*;
Expand All @@ -12,13 +13,14 @@ use futures::stream::poll_fn;
use futures::{Async, Poll, Stream};
use graphql_parser::schema as s;
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fmt;
use std::fmt::Display;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::{fmt, io};

use crate::blockchain::{Block, Blockchain};
use crate::data::store::scalar::Bytes;
Expand Down Expand Up @@ -71,6 +73,12 @@ impl<'a> From<&s::InterfaceType<'a, String>> for EntityType {
}
}

impl Borrow<str> for EntityType {
fn borrow(&self) -> &str {
&self.0
}
}

// This conversion should only be used in tests since it makes it too
// easy to convert random strings into entity types
#[cfg(debug_assertions)]
Expand All @@ -82,6 +90,22 @@ impl From<&str> for EntityType {

impl CheapClone for EntityType {}

impl FromSql<diesel::sql_types::Text, diesel::pg::Pg> for EntityType {
fn from_sql(bytes: Option<&[u8]>) -> diesel::deserialize::Result<Self> {
let s = <String as FromSql<_, diesel::pg::Pg>>::from_sql(bytes)?;
Ok(EntityType::new(s))
}
}

impl ToSql<diesel::sql_types::Text, diesel::pg::Pg> for EntityType {
fn to_sql<W: io::Write>(
&self,
out: &mut diesel::serialize::Output<W, diesel::pg::Pg>,
) -> diesel::serialize::Result {
<str as ToSql<diesel::sql_types::Text, diesel::pg::Pg>>::to_sql(self.0.as_str(), out)
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EntityFilterDerivative(bool);

Expand Down
4 changes: 2 additions & 2 deletions graph/src/data/graphql/object_or_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<'a> ObjectOrInterface<'a> {
ObjectOrInterface::Object(object) => Some(vec![object]),
ObjectOrInterface::Interface(interface) => schema
.types_for_interface()
.get(&interface.into())
.get(interface.name.as_str())
.map(|object_types| object_types.iter().collect()),
}
}
Expand All @@ -131,7 +131,7 @@ impl<'a> ObjectOrInterface<'a> {
) -> bool {
match self {
ObjectOrInterface::Object(o) => o.name == typename,
ObjectOrInterface::Interface(i) => types_for_interface[&i.into()]
ObjectOrInterface::Interface(i) => types_for_interface[i.name.as_str()]
.iter()
.any(|o| o.name == typename),
}
Expand Down
9 changes: 9 additions & 0 deletions graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use hex;
use lazy_static::lazy_static;
use rand::rngs::OsRng;
use rand::Rng;
use std::collections::BTreeSet;
use std::str::FromStr;
use std::{fmt, fmt::Display};

Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct DeploymentCreate {
pub graft_base: Option<DeploymentHash>,
pub graft_block: Option<BlockPtr>,
pub debug_fork: Option<DeploymentHash>,
pub has_causality_region: BTreeSet<EntityType>,
}

impl DeploymentCreate {
Expand All @@ -120,6 +122,7 @@ impl DeploymentCreate {
graft_base: None,
graft_block: None,
debug_fork: None,
has_causality_region: BTreeSet::new(),
}
}

Expand All @@ -135,6 +138,11 @@ impl DeploymentCreate {
self.debug_fork = fork;
self
}

pub fn has_causality_region(mut self, has_causality_region: BTreeSet<EntityType>) -> Self {
self.has_causality_region = has_causality_region;
self
}
}

/// The representation of a subgraph deployment when reading an existing
Expand All @@ -158,6 +166,7 @@ pub struct SubgraphDeploymentEntity {
pub reorg_count: i32,
pub current_reorg_depth: i32,
pub max_reorg_depth: i32,
pub has_causality_region: Vec<EntityType>,
}

#[derive(Debug)]
Expand Down
7 changes: 7 additions & 0 deletions graph/src/data_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ impl<C: Blockchain> DataSourceTemplate<C> {
}
}

pub fn as_offchain(&self) -> Option<&offchain::DataSourceTemplate> {
match self {
Self::Onchain(_) => None,
Self::Offchain(t) => Some(&t),
}
}

pub fn into_onchain(self) -> Option<C::DataSourceTemplate> {
match self {
Self::Onchain(ds) => Some(ds),
Expand Down
2 changes: 1 addition & 1 deletion graphql/src/introspection/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ fn interface_type_object(
description: interface_type.description.clone(),
fields:
field_objects(schema, type_objects, &interface_type.fields),
possibleTypes: schema.types_for_interface()[&interface_type.into()]
possibleTypes: schema.types_for_interface()[interface_type.name.as_str()]
.iter()
.map(|object_type| r::Value::String(object_type.name.to_owned()))
.collect::<Vec<_>>(),
Expand Down
1 change: 1 addition & 0 deletions store/postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ git-testament = "0.2.0"
itertools = "0.10.5"
pin-utils = "0.1"
hex = "0.4.3"
pretty_assertions = "1.3.0"

[dev-dependencies]
futures = "0.3"
Expand Down
3 changes: 2 additions & 1 deletion store/postgres/examples/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern crate clap;
extern crate graph_store_postgres;

use clap::{arg, Command};
use std::collections::BTreeSet;
use std::process::exit;
use std::{fs, sync::Arc};

Expand Down Expand Up @@ -145,7 +146,7 @@ pub fn main() {
);
let site = Arc::new(make_dummy_site(subgraph, namespace, "anet".to_string()));
let catalog = ensure(
Catalog::for_tests(site.clone()),
Catalog::for_tests(site.clone(), BTreeSet::new()),
"Failed to construct catalog",
);
let layout = ensure(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table subgraphs.subgraph_deployment drop column has_causality_region;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table subgraphs.subgraph_deployment add column has_causality_region text[] not null default array[]::text[];
3 changes: 3 additions & 0 deletions store/postgres/src/block_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use crate::relational::Table;
/// entities
pub(crate) const BLOCK_RANGE_COLUMN: &str = "block_range";

/// The name of the column that stores the causality region of an entity.
pub(crate) const CAUSALITY_REGION_COLUMN: &str = "causality_region";

/// The SQL clause we use to check that an entity version is current;
/// that version has an unbounded block range, but checking for
/// `upper_inf(block_range)` is slow and can't use the exclusion
Expand Down
19 changes: 15 additions & 4 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use diesel::{
sql_types::{Array, Double, Nullable, Text},
ExpressionMethods, QueryDsl,
};
use graph::components::store::VersionStats;
use std::collections::{HashMap, HashSet};
use graph::components::store::{EntityType, VersionStats};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::Write;
use std::iter::FromIterator;
use std::sync::Arc;
Expand Down Expand Up @@ -77,11 +77,15 @@ const CREATE_EXCLUSION_CONSTRAINT: bool = false;
pub struct Catalog {
pub site: Arc<Site>,
text_columns: HashMap<String, HashSet<String>>,

pub use_poi: bool,
/// Whether `bytea` columns are indexed with just a prefix (`true`) or
/// in their entirety. This influences both DDL generation and how
/// queries are generated
pub use_bytea_prefix: bool,

/// Set of tables which have an explicit causality region column.
pub(crate) has_causality_region: BTreeSet<EntityType>,
}

impl Catalog {
Expand All @@ -90,6 +94,7 @@ impl Catalog {
conn: &PgConnection,
site: Arc<Site>,
use_bytea_prefix: bool,
has_causality_region: Vec<EntityType>,
) -> Result<Self, StoreError> {
let text_columns = get_text_columns(conn, &site.namespace)?;
let use_poi = supports_proof_of_indexing(conn, &site.namespace)?;
Expand All @@ -98,11 +103,12 @@ impl Catalog {
text_columns,
use_poi,
use_bytea_prefix,
has_causality_region: has_causality_region.into_iter().collect(),
})
}

/// Return a new catalog suitable for creating a new subgraph
pub fn for_creation(site: Arc<Site>) -> Self {
pub fn for_creation(site: Arc<Site>, has_causality_region: BTreeSet<EntityType>) -> Self {
Catalog {
site,
text_columns: HashMap::default(),
Expand All @@ -111,18 +117,23 @@ impl Catalog {
// DDL generation creates indexes for prefixes of bytes columns
// see: attr-bytea-prefix
use_bytea_prefix: true,
has_causality_region,
}
}

/// Make a catalog as if the given `schema` did not exist in the database
/// yet. This function should only be used in situations where a database
/// connection is definitely not available, such as in unit tests
pub fn for_tests(site: Arc<Site>) -> Result<Self, StoreError> {
pub fn for_tests(
site: Arc<Site>,
has_causality_region: BTreeSet<EntityType>,
) -> Result<Self, StoreError> {
Ok(Catalog {
site,
text_columns: HashMap::default(),
use_poi: false,
use_bytea_prefix: true,
has_causality_region,
})
}

Expand Down
31 changes: 27 additions & 4 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ use diesel::{
sql_query,
sql_types::{Nullable, Text},
};
use graph::prelude::{
anyhow, bigdecimal::ToPrimitive, hex, web3::types::H256, BigDecimal, BlockNumber, BlockPtr,
DeploymentHash, DeploymentState, Schema, StoreError,
};
use graph::{blockchain::block_stream::FirehoseCursor, data::subgraph::schema::SubgraphError};
use graph::{
components::store::EntityType,
prelude::{
anyhow, bigdecimal::ToPrimitive, hex, web3::types::H256, BigDecimal, BlockNumber, BlockPtr,
DeploymentHash, DeploymentState, Schema, StoreError,
},
};
use graph::{
data::subgraph::{
schema::{DeploymentCreate, SubgraphManifestEntity},
Expand Down Expand Up @@ -84,6 +87,10 @@ table! {
current_reorg_depth -> Integer,
max_reorg_depth -> Integer,
firehose_cursor -> Nullable<Text>,

// Entity types that have a `causality_region` column.
// Names stored as present in the schema, not in snake case.
has_causality_region -> Array<Text>,
}
}

Expand Down Expand Up @@ -769,6 +776,19 @@ pub(crate) fn health(conn: &PgConnection, id: DeploymentId) -> Result<SubgraphHe
.map_err(|e| e.into())
}

pub(crate) fn has_causality_region(
conn: &PgConnection,
id: DeploymentId,
) -> Result<Vec<EntityType>, StoreError> {
use subgraph_deployment as d;

d::table
.filter(d::id.eq(id))
.select(d::has_causality_region)
.get_result(conn)
.map_err(|e| e.into())
}

/// Reverts the errors and updates the subgraph health if necessary.
pub(crate) fn revert_subgraph_errors(
conn: &PgConnection,
Expand Down Expand Up @@ -916,8 +936,10 @@ pub fn create_deployment(
graft_base,
graft_block,
debug_fork,
has_causality_region,
} = deployment;
let earliest_block_number = start_block.as_ref().map(|ptr| ptr.number).unwrap_or(0);
let has_causality_region = Vec::from_iter(has_causality_region.into_iter());

let deployment_values = (
d::id.eq(site.id),
Expand All @@ -935,6 +957,7 @@ pub fn create_deployment(
d::graft_block_hash.eq(b(&graft_block)),
d::graft_block_number.eq(n(&graft_block)),
d::debug_fork.eq(debug_fork.as_ref().map(|s| s.as_str())),
d::has_causality_region.eq(has_causality_region),
);

let graph_node_version_id = GraphNodeVersion::create_or_get(conn)?;
Expand Down
10 changes: 8 additions & 2 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl DeploymentStore {
let exists = deployment::exists(&conn, &site)?;

// Create (or update) the metadata. Update only happens in tests
let has_causality_region = deployment.has_causality_region.clone();
if replace || !exists {
deployment::create_deployment(&conn, &site, deployment, exists, replace)?;
};
Expand All @@ -184,7 +185,12 @@ impl DeploymentStore {
let query = format!("create schema {}", &site.namespace);
conn.batch_execute(&query)?;

let layout = Layout::create_relational_schema(&conn, site.clone(), schema)?;
let layout = Layout::create_relational_schema(
&conn,
site.clone(),
schema,
has_causality_region,
)?;
// See if we are grafting and check that the graft is permissible
if let Some(base) = graft_base {
let errors = layout.can_copy_from(&base);
Expand Down Expand Up @@ -280,7 +286,7 @@ impl DeploymentStore {
.interfaces_for_type(&key.entity_type)
.into_iter()
.flatten()
.map(|interface| &types_for_interface[&interface.into()])
.map(|interface| &types_for_interface[interface.name.as_str()])
.flatten()
.map(EntityType::from)
.filter(|type_name| type_name != &key.entity_type),
Expand Down
Loading

0 comments on commit 33e7ee7

Please sign in to comment.