Skip to content

node, store: Add 'graphman chain ingest' command #5945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use graph::components::network_provider::ChainName;
use graph::endpoint::EndpointMetrics;
use graph::env::ENV_VARS;
use graph::log::logger_with_levels;
use graph::prelude::{MetricsRegistry, BLOCK_NUMBER_MAX};
use graph::prelude::{BlockNumber, MetricsRegistry, BLOCK_NUMBER_MAX};
use graph::{data::graphql::load_manager::LoadManager, prelude::chrono, prometheus::Registry};
use graph::{
prelude::{
Expand Down Expand Up @@ -585,6 +585,19 @@ pub enum ChainCommand {
#[clap(value_parser = clap::builder::NonEmptyStringValueParser::new())]
chain_name: String,
},

/// Ingest a block into the block cache.
///
/// This will overwrite any blocks we may already have in the block
/// cache, and can therefore be used to get rid of duplicate blocks in
/// the block cache as well as making sure that a certain block is in
/// the cache
Ingest {
/// The name of the chain
name: String,
/// The block number to ingest
number: BlockNumber,
},
}

#[derive(Clone, Debug, Subcommand)]
Expand Down Expand Up @@ -1013,7 +1026,19 @@ impl Context {
self,
chain_name: &str,
) -> anyhow::Result<(Arc<ChainStore>, Arc<EthereumAdapter>)> {
let networks = self.networks().await?;
let logger = self.logger.clone();
let registry = self.metrics_registry();
let metrics = Arc::new(EndpointMetrics::mock());
let networks = Networks::from_config_for_chain(
logger,
&self.config,
registry,
metrics,
&[],
chain_name,
)
.await?;

let chain_store = self.chain_store(chain_name)?;
let ethereum_adapter = networks
.ethereum_rpcs(chain_name.into())
Expand Down Expand Up @@ -1450,6 +1475,12 @@ async fn main() -> anyhow::Result<()> {
}
}
}
Ingest { name, number } => {
let logger = ctx.logger.cheap_clone();
let (chain_store, ethereum_adapter) =
ctx.chain_store_and_adapter(&name).await?;
commands::chain::ingest(&logger, chain_store, ethereum_adapter, number).await
}
}
}
Stats(cmd) => {
Expand Down
50 changes: 47 additions & 3 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,39 @@ pub enum ProviderNetworkStatus {
},
}

pub trait ChainFilter: Send + Sync {
fn filter(&self, chain_name: &str) -> bool;
}

pub struct AnyChainFilter;

impl ChainFilter for AnyChainFilter {
fn filter(&self, _: &str) -> bool {
true
}
}

pub struct OneChainFilter {
chain_name: String,
}

impl OneChainFilter {
pub fn new(chain_name: String) -> Self {
Self { chain_name }
}
}

impl ChainFilter for OneChainFilter {
fn filter(&self, chain_name: &str) -> bool {
self.chain_name == chain_name
}
}

pub fn create_substreams_networks(
logger: Logger,
config: &Config,
endpoint_metrics: Arc<EndpointMetrics>,
chain_filter: &dyn ChainFilter,
) -> Vec<AdapterConfiguration> {
debug!(
logger,
Expand All @@ -63,7 +92,13 @@ pub fn create_substreams_networks(
let mut networks_by_kind: BTreeMap<(BlockchainKind, ChainName), Vec<Arc<FirehoseEndpoint>>> =
BTreeMap::new();

for (name, chain) in &config.chains.chains {
let filtered_chains = config
.chains
.chains
.iter()
.filter(|(name, _)| chain_filter.filter(name));

for (name, chain) in filtered_chains {
let name: ChainName = name.as_str().into();
for provider in &chain.providers {
if let ProviderDetails::Substreams(ref firehose) = provider.details {
Expand Down Expand Up @@ -113,6 +148,7 @@ pub fn create_firehose_networks(
logger: Logger,
config: &Config,
endpoint_metrics: Arc<EndpointMetrics>,
chain_filter: &dyn ChainFilter,
) -> Vec<AdapterConfiguration> {
debug!(
logger,
Expand All @@ -124,7 +160,13 @@ pub fn create_firehose_networks(
let mut networks_by_kind: BTreeMap<(BlockchainKind, ChainName), Vec<Arc<FirehoseEndpoint>>> =
BTreeMap::new();

for (name, chain) in &config.chains.chains {
let filtered_chains = config
.chains
.chains
.iter()
.filter(|(name, _)| chain_filter.filter(name));

for (name, chain) in filtered_chains {
let name: ChainName = name.as_str().into();
for provider in &chain.providers {
let logger = logger.cheap_clone();
Expand Down Expand Up @@ -179,18 +221,20 @@ pub fn create_firehose_networks(

/// Parses all Ethereum connection strings and returns their network names and
/// `EthereumAdapter`.
pub async fn create_all_ethereum_networks(
pub async fn create_ethereum_networks(
logger: Logger,
registry: Arc<MetricsRegistry>,
config: &Config,
endpoint_metrics: Arc<EndpointMetrics>,
chain_filter: &dyn ChainFilter,
) -> anyhow::Result<Vec<AdapterConfiguration>> {
let eth_rpc_metrics = Arc::new(ProviderEthRpcMetrics::new(registry));
let eth_networks_futures = config
.chains
.chains
.iter()
.filter(|(_, chain)| chain.protocol == BlockchainKind::Ethereum)
.filter(|(name, _)| chain_filter.filter(name))
.map(|(name, _)| {
create_ethereum_networks_for_chain(
&logger,
Expand Down
33 changes: 33 additions & 0 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ use graph::cheap_clone::CheapClone;
use graph::components::network_provider::ChainIdentifierStore;
use graph::components::network_provider::ChainName;
use graph::components::store::StoreError;
use graph::futures03::compat::Future01CompatExt as _;
use graph::prelude::BlockNumber;
use graph::prelude::ChainStore as _;
use graph::prelude::LightEthereumBlockExt;
use graph::prelude::{anyhow, anyhow::bail};
use graph::slog::Logger;
use graph::{components::store::BlockStore as _, prelude::anyhow::Error};
use graph_chain_ethereum::chain::BlockFinality;
use graph_chain_ethereum::EthereumAdapter;
use graph_chain_ethereum::EthereumAdapterTrait as _;
use graph_store_postgres::add_chain;
use graph_store_postgres::find_chain;
use graph_store_postgres::update_chain_name;
Expand Down Expand Up @@ -259,3 +264,31 @@ pub fn change_block_cache_shard(

Ok(())
}

pub async fn ingest(
logger: &Logger,
chain_store: Arc<ChainStore>,
ethereum_adapter: Arc<EthereumAdapter>,
number: BlockNumber,
) -> Result<(), Error> {
let Some(block) = ethereum_adapter
.block_by_number(logger, number)
.compat()
.await
.map_err(|e| anyhow!("error getting block number {number}: {}", e))?
else {
bail!("block number {number} not found");
};
let ptr = block.block_ptr();
// For inserting the block, it doesn't matter whether the block is final or not.
let block = Arc::new(BlockFinality::Final(Arc::new(block)));
chain_store.upsert_block(block).await?;

let rows = chain_store.confirm_block_hash(ptr.number, &ptr.hash)?;

println!("Inserted block {}", ptr);
if rows > 0 {
println!(" (also deleted {rows} duplicate row(s) with that number)");
}
Ok(())
}
57 changes: 51 additions & 6 deletions node/src/network_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use graph_store_postgres::{BlockStore, ChainHeadUpdateListener};
use std::{any::Any, cmp::Ordering, sync::Arc, time::Duration};

use crate::chain::{
create_all_ethereum_networks, create_firehose_networks, create_substreams_networks,
networks_as_chains,
create_ethereum_networks, create_firehose_networks, create_substreams_networks,
networks_as_chains, AnyChainFilter, ChainFilter, OneChainFilter,
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -183,31 +183,38 @@ impl Networks {
.await
}

pub async fn from_config(
async fn from_config_inner(
logger: Logger,
config: &crate::config::Config,
registry: Arc<MetricsRegistry>,
endpoint_metrics: Arc<EndpointMetrics>,
provider_checks: &[Arc<dyn ProviderCheck>],
chain_filter: &dyn ChainFilter,
) -> Result<Networks> {
if config.query_only(&config.node) {
return Ok(Networks::noop());
}

let eth = create_all_ethereum_networks(
let eth = create_ethereum_networks(
logger.cheap_clone(),
registry,
&config,
endpoint_metrics.cheap_clone(),
chain_filter,
)
.await?;
let firehose = create_firehose_networks(
logger.cheap_clone(),
&config,
endpoint_metrics.cheap_clone(),
chain_filter,
);
let substreams = create_substreams_networks(
logger.cheap_clone(),
&config,
endpoint_metrics,
chain_filter,
);
let substreams =
create_substreams_networks(logger.cheap_clone(), &config, endpoint_metrics);
let adapters: Vec<_> = eth
.into_iter()
.chain(firehose.into_iter())
Expand All @@ -217,6 +224,44 @@ impl Networks {
Ok(Networks::new(&logger, adapters, provider_checks))
}

pub async fn from_config_for_chain(
logger: Logger,
config: &crate::config::Config,
registry: Arc<MetricsRegistry>,
endpoint_metrics: Arc<EndpointMetrics>,
provider_checks: &[Arc<dyn ProviderCheck>],
chain_name: &str,
) -> Result<Networks> {
let filter = OneChainFilter::new(chain_name.to_string());
Self::from_config_inner(
logger,
config,
registry,
endpoint_metrics,
provider_checks,
&filter,
)
.await
}

pub async fn from_config(
logger: Logger,
config: &crate::config::Config,
registry: Arc<MetricsRegistry>,
endpoint_metrics: Arc<EndpointMetrics>,
provider_checks: &[Arc<dyn ProviderCheck>],
) -> Result<Networks> {
Self::from_config_inner(
logger,
config,
registry,
endpoint_metrics,
provider_checks,
&AnyChainFilter,
)
.await
}

fn new(
logger: &Logger,
adapters: Vec<AdapterConfiguration>,
Expand Down
Loading