Skip to content

Commit

Permalink
implement recursive auto sync for grafts
Browse files Browse the repository at this point in the history
  • Loading branch information
dwerner committed Sep 11, 2024
1 parent b72621e commit a83082a
Show file tree
Hide file tree
Showing 21 changed files with 686 additions and 40 deletions.
12 changes: 12 additions & 0 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use graph::prelude::{
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
};
use graph::schema::InputSchema;
use graph::slog::info;
use graph::substreams::Clock;
use graph::{
blockchain::{
Expand Down Expand Up @@ -451,6 +452,17 @@ impl Blockchain for Chain {
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
// temporary hack until we can refactor this to use the block stream
if self.block_stream_builder.can_directly_resolve_blocks() {
info!(&logger, "block_pointer_from_number - can_directly_resolve_blocks"; "number" => number);
if let Some(block_ptr) = self
.block_stream_builder
.directly_resolve_block_from_number(number)
{
return Ok(block_ptr);
}
}
info!(&logger, "block_pointer_from_number"; "number" => number);
match self.client.as_ref() {
ChainClient::Firehose(endpoints) => endpoints
.endpoint()
Expand Down
218 changes: 185 additions & 33 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use graph::tokio_retry::Retry;
use graph::util::futures::retry_strategy;
use graph::util::futures::RETRY_DEFAULT_LIMIT;

const MAX_AUTO_GRAFT_SYNC_DEPTH: u32 = 42;

pub struct SubgraphRegistrar<P, S, SM> {
logger: Logger,
logger_factory: LoggerFactory,
Expand Down Expand Up @@ -303,20 +305,8 @@ where
.logger_factory
.subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone()));

let raw: serde_yaml::Mapping = {
let file_bytes = self
.resolver
.cat(&logger, &hash.to_ipfs_link())
.await
.map_err(|e| {
SubgraphRegistrarError::ResolveError(
SubgraphManifestResolveError::ResolveError(e),
)
})?;

serde_yaml::from_slice(&file_bytes)
.map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))?
};
let raw: serde_yaml::Mapping =
resolve_raw_manifest(&self.resolver, &self.logger, &hash).await?;

let kind = BlockchainKind::from_manifest(&raw).map_err(|e| {
SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e))
Expand All @@ -326,9 +316,15 @@ where
let history_blocks =
history_blocks.or(self.settings.for_name(&name).map(|c| c.history_blocks));

let auto_graft_sync_depth = if self.store.auto_graft_sync() {
Some(0)
} else {
None
};

let deployment_locator = match kind {
BlockchainKind::Arweave => {
create_subgraph_version::<graph_chain_arweave::Chain, _>(
create_subgraph_version::<graph_chain_arweave::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -342,11 +338,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Ethereum => {
create_subgraph_version::<graph_chain_ethereum::Chain, _>(
create_subgraph_version::<graph_chain_ethereum::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -360,11 +358,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Near => {
create_subgraph_version::<graph_chain_near::Chain, _>(
create_subgraph_version::<graph_chain_near::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -378,11 +378,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Cosmos => {
create_subgraph_version::<graph_chain_cosmos::Chain, _>(
create_subgraph_version::<graph_chain_cosmos::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -396,11 +398,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Substreams => {
create_subgraph_version::<graph_chain_substreams::Chain, _>(
create_subgraph_version::<graph_chain_substreams::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -414,11 +418,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Starknet => {
create_subgraph_version::<graph_chain_starknet::Chain, _>(
create_subgraph_version::<graph_chain_starknet::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -432,6 +438,8 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
Expand Down Expand Up @@ -555,9 +563,9 @@ async fn start_subgraph(
}

/// Resolves the subgraph's earliest block
async fn resolve_start_block(
manifest: &SubgraphManifest<impl Blockchain>,
chain: &impl Blockchain,
async fn resolve_start_block<C: Blockchain>(
manifest: &SubgraphManifest<C>,
chain: &C,
logger: &Logger,
) -> Result<Option<BlockPtr>, SubgraphRegistrarError> {
// If the minimum start block is 0 (i.e. the genesis block),
Expand Down Expand Up @@ -591,20 +599,26 @@ async fn resolve_graft_block(
chain: &impl Blockchain,
logger: &Logger,
) -> Result<BlockPtr, SubgraphRegistrarError> {
debug!(&logger, "Resolve graft block"; "base" => base.base.to_string(), "block" => base.block);
chain
.block_pointer_from_number(logger, base.block)
.await
.map_err(|_| {
.map_err(|err| {
error!(&logger, "Failed to resolve graft block"; "error" => err.to_string());
SubgraphRegistrarError::ManifestValidationError(vec![
SubgraphManifestValidationError::BlockNotFound(format!(
"graft base block {} not found",
base.block
"graft base {} block {} not found",
base.base, base.block
)),
])
})
}

async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
async fn create_subgraph_version<
C: Blockchain,
S: SubgraphStore,
P: SubgraphAssignmentProviderTrait,
>(
logger: &Logger,
store: Arc<S>,
chains: Arc<BlockchainMap>,
Expand All @@ -618,9 +632,13 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
version_switching_mode: SubgraphVersionSwitchingMode,
resolver: &Arc<dyn LinkResolver>,
history_blocks_override: Option<i32>,
depth: Option<u32>,
provider: Arc<P>,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(

// We need to defer validation of the manifest until after we have synced the base subgraph.
let unvalidated_manifest = UnvalidatedSubgraphManifest::<C>::resolve(
deployment.clone(),
raw,
resolver,
Expand All @@ -630,16 +648,38 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
.map_err(SubgraphRegistrarError::ResolveError)
.await?;

// Determine if the graft_base should be validated.
// Validate the graft_base if there is a pending graft, ensuring its presence.
// If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated.
// If the subgraph already exists and there is no pending graft, graft_base validation is not required.
if let (Some(depth), Some(graft)) = (depth, unvalidated_manifest.unvalidated_graft()) {
if depth < MAX_AUTO_GRAFT_SYNC_DEPTH {
Box::pin(auto_sync_graft::<C, S, P>(
graft,
resolver,
logger,
&store,
&chains,
&name,
&node_id,
&debug_fork,
version_switching_mode,
history_blocks_override,
depth,
provider,
))
.await?;
} else {
warn!(
logger,
"auto-graft-sync: subgraph grafts depth limit reached";
"depth" => depth
);
}
}

let should_validate = match store.graft_pending(&deployment) {
Ok(graft_pending) => graft_pending,
Err(StoreError::DeploymentNotFound(_)) => true,
Err(e) => return Err(SubgraphRegistrarError::StoreError(e)),
};
let manifest = unvalidated
let manifest = unvalidated_manifest
.validate(store.cheap_clone(), should_validate)
.await
.map_err(SubgraphRegistrarError::ManifestValidationError)?;
Expand Down Expand Up @@ -732,3 +772,115 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
)
.map_err(SubgraphRegistrarError::SubgraphDeploymentError)
}

/// Automatically syncs a subgraph graft from the base subgraph.
/// This will await the syncing of the base subgraph before proceeding.
/// Recursively calls `create_subgraph_version` to create any grafts of
/// this graft up to `MAX_AUTO_GRAFT_SYNC_DEPTH`.`
async fn auto_sync_graft<C: Blockchain, S: SubgraphStore, P: SubgraphAssignmentProviderTrait>(
graft: &Graft,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
store: &Arc<S>,
chains: &Arc<BlockchainMap>,
name: &SubgraphName,
node_id: &NodeId,
debug_fork: &Option<DeploymentHash>,
version_switching_mode: SubgraphVersionSwitchingMode,
history_blocks_override: Option<i32>,
depth: u32,
provider: Arc<P>,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
info!(
logger,
"auto-graft-sync: begin graft sync";
"subgraph" => name.to_string(),
"hash" => graft.base.to_string(),
"depth" => depth,
"block" => graft.block
);
let subgraft_raw_manifest = resolve_raw_manifest(resolver, logger, &graft.base).await?;

let deployment = graft.base.clone();

let name = &deployment[deployment.len().saturating_sub(10)..];
let name = format!("auto-graft-sync/{}", name);
let name =
SubgraphName::new(name.clone()).map_err(|_| SubgraphRegistrarError::NameNotValid(name))?;

info!(
logger,
"auto-graft-sync: create subgraph";
"subgraph" => name.to_string(),
"hash" => graft.base.to_string()
);

let _ = store.create_subgraph(name.clone())?;
info!(logger, "Created subgraph"; "subgraph_name" => name.to_string(), "id" => deployment.to_string());

let locator = create_subgraph_version::<C, S, P>(
logger,
store.clone(),
chains.clone(),
name.clone(),
graft.base.clone(),
None,
None,
subgraft_raw_manifest.clone(),
node_id.clone(),
debug_fork.clone(),
version_switching_mode,
resolver,
history_blocks_override,
Some(depth + 1),
provider.clone(),
)
.await?;

info!(
logger,
"auto-graft-sync: awaiting subgraph sync";
"subgraph" => name.to_string(),
"hash" => graft.base.to_string()
);

info!(&logger, "auto-graft-sync: starting graft sync"; "subgraph" => name.to_string(), "hash" => graft.base.to_string());
provider
.start(locator.clone(), Some(graft.block))
.await
.map_err(SubgraphRegistrarError::AutoGraftSubgraphAssignmentError)?;

info!(&logger, "auto-graft-sync: waiting for graft sync"; "subgraph" => name.to_string(), "hash" => graft.base.to_string());
graft
.await_sync(store.clone(), Duration::from_secs(1))
.await?;

info!(
logger,
"auto-graft-sync: sync complete";
"subgraph" => name.to_string(),
"graft-hash" => graft.base.to_string(),
"depth" => depth,
"hash" => graft.base.to_string()
);
Ok(locator)
}

async fn resolve_raw_manifest(
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
deployment_hash: &DeploymentHash,
) -> Result<serde_yaml::Mapping, SubgraphRegistrarError> {
let subgraft_raw_manifest: serde_yaml::Mapping = {
let file_bytes = resolver
.cat(&logger, &deployment_hash.to_ipfs_link())
.await
.map_err(|e| {
SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e))
})?;

serde_yaml::from_slice(&file_bytes)
.map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))?
};
Ok(subgraft_raw_manifest)
}
8 changes: 8 additions & 0 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ pub trait BlockRefetcher<C: Blockchain>: Send + Sync {
/// BlockStreamBuilder is an abstraction that would separate the logic for building streams from the blockchain trait
#[async_trait]
pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
fn can_directly_resolve_blocks(&self) -> bool {
false
}

fn directly_resolve_block_from_number(&self, _number: BlockNumber) -> Option<BlockPtr> {
None
}

async fn build_firehose(
&self,
chain: &C,
Expand Down
Loading

0 comments on commit a83082a

Please sign in to comment.