Skip to content

Commit

Permalink
fix: Allow grafts to add data sources (#3989)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens authored Oct 3, 2022
1 parent e747946 commit 3b3b458
Show file tree
Hide file tree
Showing 23 changed files with 312 additions and 104 deletions.
76 changes: 39 additions & 37 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use graph::blockchain::Blockchain;
use graph::blockchain::NodeCapabilities;
use graph::blockchain::{BlockchainKind, TriggerFilter};
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::SPEC_VERSION_0_0_6;
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator};
use graph_runtime_wasm::module::ToAscPtr;
Expand Down Expand Up @@ -176,47 +176,49 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
.writable(logger.clone(), deployment.id)
.await?;

let raw_yaml = serde_yaml::to_string(&manifest).unwrap();
let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?;

// Make sure the `raw_yaml` is present on both this subgraph and the graft base.
self.subgraph_store
.set_manifest_raw_yaml(&deployment.hash, raw_yaml)
.await?;
if let Some(graft) = &manifest.graft {
let file_bytes = self
.link_resolver
.cat(&logger, &graft.base.to_ipfs_link())
.await?;
let yaml = String::from_utf8(file_bytes)?;
self.subgraph_store
.set_manifest_raw_yaml(&graft.base, yaml)
.await?;
}

info!(logger, "Resolve subgraph files using IPFS");

// Allow for infinite retries for subgraph definition files.
let link_resolver = Arc::from(self.link_resolver.with_retries());
let mut manifest = manifest
.resolve(&link_resolver, &logger, ENV_VARS.max_spec_version.clone())
.await?;

info!(logger, "Successfully resolved subgraph files using IPFS");

let manifest_idx_and_name: Vec<(u32, String)> = manifest.template_idx_and_name().collect();

// Start the subgraph deployment before reading dynamic data
// sources; if the subgraph is a graft or a copy, starting it will
// do the copying and dynamic data sources won't show up until after
// that is done
store.start_subgraph_deployment(&logger).await?;

let (manifest, manifest_idx_and_name, static_data_sources) = {
info!(logger, "Resolve subgraph files using IPFS");

let mut manifest = SubgraphManifest::resolve_from_raw(
deployment.hash.cheap_clone(),
manifest,
// Allow for infinite retries for subgraph definition files.
&Arc::from(self.link_resolver.with_retries()),
&logger,
ENV_VARS.max_spec_version.clone(),
)
.await
.context("Failed to resolve subgraph from IPFS")?;

// We cannot include static data sources in the map because a static data source and a
// template may have the same name in the manifest.
let ds_len = manifest.data_sources.len() as u32;
let manifest_idx_and_name: Vec<(u32, String)> = manifest
.templates
.iter()
.map(|t| t.name().to_owned())
.enumerate()
.map(|(idx, name)| (ds_len + idx as u32, name))
.collect();

let data_sources = load_dynamic_data_sources(
store.clone(),
logger.clone(),
&manifest,
manifest_idx_and_name.clone(),
)
.await
.context("Failed to load dynamic data sources")?;

info!(logger, "Successfully resolved subgraph files using IPFS");
// Dynamic data sources are loaded by appending them to the manifest.
//
// Refactor: Preferrably we'd avoid any mutation of the manifest.
let (manifest, static_data_sources) = {
let data_sources = load_dynamic_data_sources(store.clone(), logger.clone(), &manifest)
.await
.context("Failed to load dynamic data sources")?;

let static_data_sources = manifest.data_sources.clone();

Expand All @@ -229,7 +231,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
manifest.data_sources.len()
);

(manifest, manifest_idx_and_name, static_data_sources)
(manifest, static_data_sources)
};

let static_filters =
Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub async fn load_dynamic_data_sources<C: Blockchain>(
store: Arc<dyn WritableStore>,
logger: Logger,
manifest: &SubgraphManifest<C>,
manifest_idx_and_name: Vec<(u32, String)>,
) -> Result<Vec<DataSource<C>>, Error> {
let manifest_idx_and_name = manifest.template_idx_and_name().collect();
let start_time = Instant::now();

let mut data_sources: Vec<DataSource<C>> = vec![];
Expand Down
3 changes: 2 additions & 1 deletion core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
version_switching_mode: SubgraphVersionSwitchingMode,
resolver: &Arc<dyn LinkResolver>,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
deployment,
raw,
Expand Down Expand Up @@ -618,7 +619,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(

// Apply the subgraph versioning and deployment operations,
// creating a new subgraph deployment if one doesn't exist.
let deployment = DeploymentCreate::new(&manifest, start_block)
let deployment = DeploymentCreate::new(raw_string, &manifest, start_block)
.graft(base_block)
.debug(debug_fork);
deployment_store
Expand Down
7 changes: 7 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ pub trait SubgraphStore: Send + Sync + 'static {

/// Find the deployment locators for the subgraph with the given hash
fn locators(&self, hash: &str) -> Result<Vec<DeploymentLocator>, StoreError>;

/// This migrates subgraphs that existed before the raw_yaml column was added.
async fn set_manifest_raw_yaml(
&self,
hash: &DeploymentHash,
raw_yaml: String,
) -> Result<(), StoreError>;
}

pub trait ReadStore: Send + Sync + 'static {
Expand Down
77 changes: 46 additions & 31 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ pub mod status;

pub use features::{SubgraphFeature, SubgraphFeatureValidationError};

use anyhow::ensure;
use anyhow::{anyhow, Error};
use futures03::{future::try_join3, stream::FuturesOrdered, TryStreamExt as _};
use semver::Version;
use serde::{de, ser};
use serde_yaml;
use slog::{debug, info, Logger};
use slog::{info, Logger};
use stable_hash::{FieldAddress, StableHash};
use stable_hash_legacy::SequenceNumber;
use std::{collections::BTreeSet, marker::PhantomData};
Expand All @@ -25,6 +24,7 @@ use wasmparser;
use web3::types::Address;

use crate::{
bail,
blockchain::{BlockPtr, Blockchain, DataSource as _},
components::{
link_resolver::LinkResolver,
Expand All @@ -41,6 +41,7 @@ use crate::{
offchain::OFFCHAIN_KINDS, DataSource, DataSourceTemplate, UnresolvedDataSource,
UnresolvedDataSourceTemplate,
},
ensure,
prelude::{r, CheapClone, ENV_VARS},
};

Expand Down Expand Up @@ -356,7 +357,7 @@ pub enum SubgraphManifestResolveError {
#[error("subgraph is not valid YAML")]
InvalidFormat,
#[error("resolve error: {0}")]
ResolveError(anyhow::Error),
ResolveError(#[from] anyhow::Error),
}

/// Data source contexts are conveniently represented as entities.
Expand Down Expand Up @@ -496,7 +497,7 @@ pub struct BaseSubgraphManifest<C, S, D, T> {
}

/// SubgraphManifest with IPFS links unresolved
type UnresolvedSubgraphManifest<C> = BaseSubgraphManifest<
pub type UnresolvedSubgraphManifest<C> = BaseSubgraphManifest<
C,
UnresolvedSchema,
UnresolvedDataSource<C>,
Expand Down Expand Up @@ -614,35 +615,16 @@ impl<C: Blockchain> SubgraphManifest<C> {
/// Entry point for resolving a subgraph definition.
pub async fn resolve_from_raw(
id: DeploymentHash,
mut raw: serde_yaml::Mapping,
raw: serde_yaml::Mapping,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
max_spec_version: semver::Version,
) -> Result<Self, SubgraphManifestResolveError> {
// Inject the IPFS hash as the ID of the subgraph into the definition.
raw.insert("id".into(), id.to_string().into());

// Parse the YAML data into an UnresolvedSubgraphManifest
let unresolved: UnresolvedSubgraphManifest<C> = serde_yaml::from_value(raw.into())?;

debug!(logger, "Features {:?}", unresolved.features);
let unresolved = UnresolvedSubgraphManifest::parse(id, raw)?;

let resolved = unresolved
.resolve(resolver, logger, max_spec_version)
.await
.map_err(SubgraphManifestResolveError::ResolveError)?;

if (resolved.spec_version < SPEC_VERSION_0_0_7)
&& resolved
.data_sources
.iter()
.any(|ds| OFFCHAIN_KINDS.contains(&ds.kind()))
{
return Err(SubgraphManifestResolveError::ResolveError(anyhow!(
"Offchain data sources not supported prior to {}",
SPEC_VERSION_0_0_7
)));
}
.await?;

Ok(resolved)
}
Expand Down Expand Up @@ -685,15 +667,37 @@ impl<C: Blockchain> SubgraphManifest<C> {
) -> Result<UnifiedMappingApiVersion, DifferentMappingApiVersions> {
UnifiedMappingApiVersion::try_from_versions(self.api_versions())
}

pub fn template_idx_and_name(&self) -> impl Iterator<Item = (u32, String)> + '_ {
// We cannot include static data sources in the map because a static data source and a
// template may have the same name in the manifest. Duplicated with
// `UnresolvedSubgraphManifest::template_idx_and_name`.
let ds_len = self.data_sources.len() as u32;
self.templates
.iter()
.map(|t| t.name().to_owned())
.enumerate()
.map(move |(idx, name)| (ds_len + idx as u32, name))
}
}

impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
pub fn parse(
id: DeploymentHash,
mut raw: serde_yaml::Mapping,
) -> Result<Self, SubgraphManifestResolveError> {
// Inject the IPFS hash as the ID of the subgraph into the definition.
raw.insert("id".into(), id.to_string().into());

serde_yaml::from_value(raw.into()).map_err(Into::into)
}

pub async fn resolve(
self,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
max_spec_version: semver::Version,
) -> Result<SubgraphManifest<C>, anyhow::Error> {
) -> Result<SubgraphManifest<C>, SubgraphManifestResolveError> {
let UnresolvedSubgraphManifest {
id,
spec_version,
Expand All @@ -714,14 +718,14 @@ impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
max_spec_version,
id,
spec_version
));
).into());
}

let ds_count = data_sources.len();
if ds_count as u64 + templates.len() as u64 > u32::MAX as u64 {
return Err(anyhow!(
"Subgraph has too many declared data sources and templates",
));
return Err(
anyhow!("Subgraph has too many declared data sources and templates",).into(),
);
}

let (schema, data_sources, templates) = try_join3(
Expand Down Expand Up @@ -754,6 +758,17 @@ impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
);
}

if spec_version < SPEC_VERSION_0_0_7
&& data_sources
.iter()
.any(|ds| OFFCHAIN_KINDS.contains(&ds.kind()))
{
bail!(
"Offchain data sources not supported prior to {}",
SPEC_VERSION_0_0_7
);
}

Ok(SubgraphManifest {
id,
spec_version,
Expand Down
43 changes: 39 additions & 4 deletions graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Entity types that contain the graph-node state.

use anyhow::{anyhow, Error};
use anyhow::{anyhow, bail, Error};
use hex;
use lazy_static::lazy_static;
use rand::rngs::OsRng;
Expand Down Expand Up @@ -110,11 +110,12 @@ pub struct DeploymentCreate {

impl DeploymentCreate {
pub fn new(
raw_manifest: String,
source_manifest: &SubgraphManifest<impl Blockchain>,
earliest_block: Option<BlockPtr>,
) -> Self {
Self {
manifest: SubgraphManifestEntity::from(source_manifest),
manifest: SubgraphManifestEntity::new(raw_manifest, source_manifest),
earliest_block: earliest_block.cheap_clone(),
graft_base: None,
graft_block: None,
Expand Down Expand Up @@ -163,18 +164,52 @@ pub struct SubgraphManifestEntity {
pub repository: Option<String>,
pub features: Vec<String>,
pub schema: String,
pub raw_yaml: Option<String>,
}

impl<'a, C: Blockchain> From<&'a super::SubgraphManifest<C>> for SubgraphManifestEntity {
fn from(manifest: &'a super::SubgraphManifest<C>) -> Self {
impl SubgraphManifestEntity {
pub fn new(raw_yaml: String, manifest: &super::SubgraphManifest<impl Blockchain>) -> Self {
Self {
spec_version: manifest.spec_version.to_string(),
description: manifest.description.clone(),
repository: manifest.repository.clone(),
features: manifest.features.iter().map(|f| f.to_string()).collect(),
schema: manifest.schema.document.clone().to_string(),
raw_yaml: Some(raw_yaml),
}
}

pub fn template_idx_and_name(&self) -> Result<Vec<(i32, String)>, Error> {
#[derive(Debug, Deserialize)]
struct MinimalDs {
name: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct MinimalManifest {
data_sources: Vec<MinimalDs>,
#[serde(default)]
templates: Vec<MinimalDs>,
}

let raw_yaml = match &self.raw_yaml {
Some(raw_yaml) => raw_yaml,
None => bail!("raw_yaml not present"),
};

let manifest: MinimalManifest = serde_yaml::from_str(raw_yaml)?;

let ds_len = manifest.data_sources.len() as i32;
let template_idx_and_name = manifest
.templates
.iter()
.map(|t| t.name.to_owned())
.enumerate()
.map(move |(idx, name)| (ds_len + idx as i32, name))
.collect();

Ok(template_idx_and_name)
}
}

#[derive(Clone, Debug)]
Expand Down
9 changes: 9 additions & 0 deletions graph/src/util/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,12 @@ macro_rules! ensure {
}
};
}

// `bail!` from `anyhow`, but calling `from`.
// For context see https://github.com/dtolnay/anyhow/issues/112#issuecomment-704549251.
#[macro_export]
macro_rules! bail {
($($err:tt)*) => {
return Err(anyhow::anyhow!($($err)*).into());
};
}
Loading

0 comments on commit 3b3b458

Please sign in to comment.