Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: graft auto sync #5633

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use graph::prelude::{
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
};
use graph::schema::InputSchema;
use graph::slog::info;
use graph::substreams::Clock;
use graph::{
blockchain::{
Expand Down Expand Up @@ -451,6 +452,17 @@ impl Blockchain for Chain {
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
// temporary hack until we can refactor this to use the block stream
if self.block_stream_builder.can_directly_resolve_blocks() {
info!(&logger, "block_pointer_from_number - can_directly_resolve_blocks"; "number" => number);
if let Some(block_ptr) = self
.block_stream_builder
.directly_resolve_block_from_number(number)
{
return Ok(block_ptr);
}
}
info!(&logger, "block_pointer_from_number"; "number" => number);
match self.client.as_ref() {
ChainClient::Firehose(endpoints) => endpoints
.endpoint()
Expand Down
218 changes: 185 additions & 33 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use graph::tokio_retry::Retry;
use graph::util::futures::retry_strategy;
use graph::util::futures::RETRY_DEFAULT_LIMIT;

const MAX_AUTO_GRAFT_SYNC_DEPTH: u32 = 42;

pub struct SubgraphRegistrar<P, S, SM> {
logger: Logger,
logger_factory: LoggerFactory,
Expand Down Expand Up @@ -303,20 +305,8 @@ where
.logger_factory
.subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone()));

let raw: serde_yaml::Mapping = {
let file_bytes = self
.resolver
.cat(&logger, &hash.to_ipfs_link())
.await
.map_err(|e| {
SubgraphRegistrarError::ResolveError(
SubgraphManifestResolveError::ResolveError(e),
)
})?;

serde_yaml::from_slice(&file_bytes)
.map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))?
};
let raw: serde_yaml::Mapping =
resolve_raw_manifest(&self.resolver, &self.logger, &hash).await?;

let kind = BlockchainKind::from_manifest(&raw).map_err(|e| {
SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e))
Expand All @@ -326,9 +316,15 @@ where
let history_blocks =
history_blocks.or(self.settings.for_name(&name).map(|c| c.history_blocks));

let auto_graft_sync_depth = if self.store.auto_graft_sync() {
Some(0)
} else {
None
};

let deployment_locator = match kind {
BlockchainKind::Arweave => {
create_subgraph_version::<graph_chain_arweave::Chain, _>(
create_subgraph_version::<graph_chain_arweave::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -342,11 +338,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Ethereum => {
create_subgraph_version::<graph_chain_ethereum::Chain, _>(
create_subgraph_version::<graph_chain_ethereum::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -360,11 +358,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Near => {
create_subgraph_version::<graph_chain_near::Chain, _>(
create_subgraph_version::<graph_chain_near::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -378,11 +378,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Cosmos => {
create_subgraph_version::<graph_chain_cosmos::Chain, _>(
create_subgraph_version::<graph_chain_cosmos::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -396,11 +398,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Substreams => {
create_subgraph_version::<graph_chain_substreams::Chain, _>(
create_subgraph_version::<graph_chain_substreams::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -414,11 +418,13 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
BlockchainKind::Starknet => {
create_subgraph_version::<graph_chain_starknet::Chain, _>(
create_subgraph_version::<graph_chain_starknet::Chain, _, _>(
&logger,
self.store.clone(),
self.chains.cheap_clone(),
Expand All @@ -432,6 +438,8 @@ where
self.version_switching_mode,
&self.resolver,
history_blocks,
auto_graft_sync_depth,
self.provider.clone(),
)
.await?
}
Expand Down Expand Up @@ -555,9 +563,9 @@ async fn start_subgraph(
}

/// Resolves the subgraph's earliest block
async fn resolve_start_block(
manifest: &SubgraphManifest<impl Blockchain>,
chain: &impl Blockchain,
async fn resolve_start_block<C: Blockchain>(
manifest: &SubgraphManifest<C>,
chain: &C,
logger: &Logger,
) -> Result<Option<BlockPtr>, SubgraphRegistrarError> {
// If the minimum start block is 0 (i.e. the genesis block),
Expand Down Expand Up @@ -591,20 +599,26 @@ async fn resolve_graft_block(
chain: &impl Blockchain,
logger: &Logger,
) -> Result<BlockPtr, SubgraphRegistrarError> {
debug!(&logger, "Resolve graft block"; "base" => base.base.to_string(), "block" => base.block);
chain
.block_pointer_from_number(logger, base.block)
.await
.map_err(|_| {
.map_err(|err| {
error!(&logger, "Failed to resolve graft block"; "error" => err.to_string());
SubgraphRegistrarError::ManifestValidationError(vec![
SubgraphManifestValidationError::BlockNotFound(format!(
"graft base block {} not found",
base.block
"graft base {} block {} not found",
base.base, base.block
)),
])
})
}

async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
async fn create_subgraph_version<
C: Blockchain,
S: SubgraphStore,
P: SubgraphAssignmentProviderTrait,
>(
logger: &Logger,
store: Arc<S>,
chains: Arc<BlockchainMap>,
Expand All @@ -618,9 +632,13 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
version_switching_mode: SubgraphVersionSwitchingMode,
resolver: &Arc<dyn LinkResolver>,
history_blocks_override: Option<i32>,
depth: Option<u32>,
provider: Arc<P>,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(

// We need to defer validation of the manifest until after we have synced the base subgraph.
let unvalidated_manifest = UnvalidatedSubgraphManifest::<C>::resolve(
deployment.clone(),
raw,
resolver,
Expand All @@ -630,16 +648,38 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
.map_err(SubgraphRegistrarError::ResolveError)
.await?;

// Determine if the graft_base should be validated.
// Validate the graft_base if there is a pending graft, ensuring its presence.
// If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated.
// If the subgraph already exists and there is no pending graft, graft_base validation is not required.
if let (Some(depth), Some(graft)) = (depth, unvalidated_manifest.unvalidated_graft()) {
if depth < MAX_AUTO_GRAFT_SYNC_DEPTH {
Box::pin(auto_sync_graft::<C, S, P>(
graft,
resolver,
logger,
&store,
&chains,
&name,
&node_id,
&debug_fork,
version_switching_mode,
history_blocks_override,
depth,
provider,
))
.await?;
} else {
warn!(
logger,
"auto-graft-sync: subgraph grafts depth limit reached";
"depth" => depth
);
}
}

let should_validate = match store.graft_pending(&deployment) {
Ok(graft_pending) => graft_pending,
Err(StoreError::DeploymentNotFound(_)) => true,
Err(e) => return Err(SubgraphRegistrarError::StoreError(e)),
};
let manifest = unvalidated
let manifest = unvalidated_manifest
.validate(store.cheap_clone(), should_validate)
.await
.map_err(SubgraphRegistrarError::ManifestValidationError)?;
Expand Down Expand Up @@ -732,3 +772,115 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
)
.map_err(SubgraphRegistrarError::SubgraphDeploymentError)
}

/// Automatically syncs a subgraph graft from the base subgraph.
/// This will await the syncing of the base subgraph before proceeding.
/// Recursively calls `create_subgraph_version` to create any grafts of
/// this graft up to `MAX_AUTO_GRAFT_SYNC_DEPTH`.`
async fn auto_sync_graft<C: Blockchain, S: SubgraphStore, P: SubgraphAssignmentProviderTrait>(
graft: &Graft,
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
store: &Arc<S>,
chains: &Arc<BlockchainMap>,
name: &SubgraphName,
node_id: &NodeId,
debug_fork: &Option<DeploymentHash>,
version_switching_mode: SubgraphVersionSwitchingMode,
history_blocks_override: Option<i32>,
depth: u32,
provider: Arc<P>,
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
info!(
logger,
"auto-graft-sync: begin graft sync";
"subgraph" => name.to_string(),
"hash" => graft.base.to_string(),
"depth" => depth,
"block" => graft.block
);
let subgraft_raw_manifest = resolve_raw_manifest(resolver, logger, &graft.base).await?;

let deployment = graft.base.clone();

let name = &deployment[deployment.len().saturating_sub(10)..];
let name = format!("auto-graft-sync/{}", name);
let name =
SubgraphName::new(name.clone()).map_err(|_| SubgraphRegistrarError::NameNotValid(name))?;

info!(
logger,
"auto-graft-sync: create subgraph";
"subgraph" => name.to_string(),
"hash" => graft.base.to_string()
);

let _ = store.create_subgraph(name.clone())?;
info!(logger, "Created subgraph"; "subgraph_name" => name.to_string(), "id" => deployment.to_string());

let locator = create_subgraph_version::<C, S, P>(
logger,
store.clone(),
chains.clone(),
name.clone(),
graft.base.clone(),
None,
None,
subgraft_raw_manifest.clone(),
node_id.clone(),
debug_fork.clone(),
version_switching_mode,
resolver,
history_blocks_override,
Some(depth + 1),
provider.clone(),
)
.await?;

info!(
logger,
"auto-graft-sync: awaiting subgraph sync";
"subgraph" => name.to_string(),
"hash" => graft.base.to_string()
);

info!(&logger, "auto-graft-sync: starting graft sync"; "subgraph" => name.to_string(), "hash" => graft.base.to_string());
provider
.start(locator.clone(), Some(graft.block))
.await
.map_err(SubgraphRegistrarError::AutoGraftSubgraphAssignmentError)?;

info!(&logger, "auto-graft-sync: waiting for graft sync"; "subgraph" => name.to_string(), "hash" => graft.base.to_string());
graft
.await_sync(store.clone(), Duration::from_secs(1))
.await?;

info!(
logger,
"auto-graft-sync: sync complete";
"subgraph" => name.to_string(),
"graft-hash" => graft.base.to_string(),
"depth" => depth,
"hash" => graft.base.to_string()
);
Ok(locator)
}

async fn resolve_raw_manifest(
resolver: &Arc<dyn LinkResolver>,
logger: &Logger,
deployment_hash: &DeploymentHash,
) -> Result<serde_yaml::Mapping, SubgraphRegistrarError> {
let subgraft_raw_manifest: serde_yaml::Mapping = {
let file_bytes = resolver
.cat(&logger, &deployment_hash.to_ipfs_link())
.await
.map_err(|e| {
SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e))
})?;

serde_yaml::from_slice(&file_bytes)
.map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))?
};
Ok(subgraft_raw_manifest)
}
8 changes: 8 additions & 0 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ pub trait BlockRefetcher<C: Blockchain>: Send + Sync {
/// BlockStreamBuilder is an abstraction that would separate the logic for building streams from the blockchain trait
#[async_trait]
pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
fn can_directly_resolve_blocks(&self) -> bool {
false
}

fn directly_resolve_block_from_number(&self, _number: BlockNumber) -> Option<BlockPtr> {
None
}

async fn build_firehose(
&self,
chain: &C,
Expand Down
Loading
Loading