Skip to content

Commit

Permalink
chain, graph : move subgraph trigger scanning back to TriggersAdapter…
Browse files Browse the repository at this point in the history
…Wrapper
  • Loading branch information
incrypto32 committed Sep 11, 2024
1 parent c35774c commit d6be3d7
Show file tree
Hide file tree
Showing 16 changed files with 228 additions and 145 deletions.
14 changes: 12 additions & 2 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ use graph::{
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
};
use prost::Message;
use std::collections::HashSet;
use std::sync::Arc;

use crate::adapter::TriggerFilter;
use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate};
use crate::trigger::{self, ArweaveTrigger};
use crate::Block as ArweaveBlock;
use crate::{
codec,
data_source::{DataSource, UnresolvedDataSource},
Expand Down Expand Up @@ -138,7 +140,7 @@ impl Blockchain for Chain {

let firehose_mapper = Arc::new(FirehoseMapper {
adapter,
filter: filter.filter.clone(),
filter: filter.chain_filter.clone(),
});

Ok(Box::new(FirehoseBlockStream::new(
Expand Down Expand Up @@ -198,7 +200,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_from: BlockNumber,
_to: BlockNumber,
_filter: &Arc<TriggerFilterWrapper<Chain>>,
_filter: &TriggerFilter,
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}
Expand Down Expand Up @@ -266,6 +268,14 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
number: block.number.saturating_sub(1),
}))
}

async fn load_blocks_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
) -> Result<Vec<ArweaveBlock>, Error> {
todo!()
}
}

pub struct FirehoseMapper {
Expand Down
15 changes: 12 additions & 3 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use graph::components::adapter::ChainId;
use graph::env::EnvVars;
use graph::prelude::{DeploymentHash, MetricsRegistry};
use graph::substreams::Clock;
use std::collections::HashSet;
use std::convert::TryFrom;
use std::sync::Arc;

Expand Down Expand Up @@ -33,7 +34,7 @@ use crate::data_source::{
DataSource, DataSourceTemplate, EventOrigin, UnresolvedDataSource, UnresolvedDataSourceTemplate,
};
use crate::trigger::CosmosTrigger;
use crate::{codec, TriggerFilter};
use crate::{codec, Block, TriggerFilter};

pub struct Chain {
logger_factory: LoggerFactory,
Expand Down Expand Up @@ -132,7 +133,7 @@ impl Blockchain for Chain {

let firehose_mapper = Arc::new(FirehoseMapper {
adapter,
filter: filter.filter.clone(),
filter: filter.chain_filter.clone(),
});

Ok(Box::new(FirehoseBlockStream::new(
Expand Down Expand Up @@ -197,6 +198,14 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
panic!("Should never be called since not used by FirehoseBlockStream")
}

async fn load_blocks_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
) -> Result<Vec<Block>, Error> {
unimplemented!()
}

async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
unimplemented!()
}
Expand All @@ -205,7 +214,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_from: BlockNumber,
_to: BlockNumber,
_filter: &Arc<TriggerFilterWrapper<Chain>>,
_filter: &TriggerFilter,
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}
Expand Down
6 changes: 3 additions & 3 deletions chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,9 +1111,9 @@ pub trait EthereumAdapter: Send + Sync + 'static {

async fn load_blocks_by_numbers(
&self,
logger: Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
_logger: Logger,
_chain_store: Arc<dyn ChainStore>,
_block_numbers: HashSet<BlockNumber>,
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send>;

/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
Expand Down
32 changes: 28 additions & 4 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
filter: Arc<TriggerFilterWrapper<Chain>>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Chain>>> {
let requirements = filter.filter.node_capabilities();
let requirements = filter.chain_filter.node_capabilities();
let adapter = TriggersAdapterWrapper::new(
chain
.triggers_adapter(&deployment, &requirements, unified_api_version.clone())
Expand Down Expand Up @@ -480,7 +480,7 @@ impl Blockchain for Chain {
store.firehose_cursor(),
start_blocks,
current_ptr,
filter.filter.clone(),
filter.chain_filter.clone(),
unified_api_version,
)
.await
Expand Down Expand Up @@ -717,7 +717,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
from: BlockNumber,
to: BlockNumber,
filter: &Arc<TriggerFilterWrapper<Chain>>,
filter: &TriggerFilter,
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
blocks_with_triggers(
self.chain_client
Expand All @@ -735,6 +735,30 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
.await
}

async fn load_blocks_by_numbers(
&self,
logger: Logger,
block_numbers: HashSet<BlockNumber>,
) -> Result<Vec<BlockFinality>> {
use graph::futures01::stream::Stream;

let adapter = self
.chain_client
.rpc()?
.cheapest_with(&self.capabilities)
.await?;

let blocks = adapter
.load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers)
.await
.map(|block| BlockFinality::Final(block))
.collect()
.compat()
.await?;

Ok(blocks)
}

async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
let chain_store = self.chain_store.clone();
chain_store.chain_head_ptr().await
Expand Down Expand Up @@ -771,7 +795,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
self.ethrpc_metrics.clone(),
block_number,
block_number,
&Arc::new(TriggerFilterWrapper::<Chain>::new(filter.clone(), vec![])), // TODO(krishna): This is temporary until we take TriggerFilterWrapper as param in triggers_in_block
filter,
self.unified_api_version.clone(),
)
.await?;
Expand Down
106 changes: 3 additions & 103 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered};
use graph::blockchain::client::ChainClient;
use graph::blockchain::BlockHash;
use graph::blockchain::ChainIdentifier;
use graph::blockchain::SubgraphFilter;
use graph::blockchain::TriggerFilterWrapper;

use graph::components::transaction_receipt::LightTransactionReceipt;
use graph::data::store::ethereum::call;
use graph::data::store::scalar;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::data::subgraph::API_VERSION_0_0_7;
use graph::data_source::subgraph;
use graph::futures01::stream;
use graph::futures01::Future;
use graph::futures01::Stream;
Expand All @@ -21,10 +19,6 @@ use graph::prelude::ethabi::ParamType;
use graph::prelude::ethabi::Token;
use graph::prelude::tokio::try_join;
use graph::prelude::web3::types::U256;
use graph::prelude::DeploymentHash;
use graph::prelude::Entity;
use graph::prelude::Value;
use graph::schema::InputSchema;
use graph::slog::o;
use graph::tokio::sync::RwLock;
use graph::tokio::time::timeout;
Expand Down Expand Up @@ -65,6 +59,7 @@ use crate::chain::BlockFinality;
use crate::trigger::LogRef;
use crate::Chain;
use crate::NodeCapabilities;
use crate::TriggerFilter;
use crate::{
adapter::{
ContractCall, ContractCallError, EthGetLogsFilter, EthereumAdapter as EthereumAdapterTrait,
Expand Down Expand Up @@ -1729,81 +1724,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
}
}

// TODO(krishna): Currently this is a mock implementation of subgraph triggers.
// This will be replaced with the actual implementation which will use the filters to
// query the database of the source subgraph and return the entity triggers.
async fn subgraph_triggers(
adapter: Arc<EthereumAdapter>,
logger: Logger,
chain_store: Arc<dyn ChainStore>,
_subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
from: BlockNumber,
to: BlockNumber,
filter: &Arc<TriggerFilterWrapper<Chain>>,
_unified_api_version: UnifiedMappingApiVersion,
) -> Result<(Vec<BlockWithTriggers<crate::Chain>>, BlockNumber), Error> {
let logger2 = logger.cheap_clone();
let eth = adapter.clone();
let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?;
let to = to_ptr.block_number();

let first_filter = filter.subgraph_filter.first().unwrap();

let blocks = adapter
.load_blocks_by_numbers(
logger.cheap_clone(),
chain_store.clone(),
HashSet::from_iter(from..=to),
)
.await
.and_then(move |block| {
Ok(BlockWithTriggers::<Chain>::new_with_subgraph_triggers(
BlockFinality::Final(block.clone()),
vec![create_mock_subgraph_trigger(first_filter, &block)],
&logger2,
))
})
.collect()
.compat()
.await?;

Ok((blocks, to))
}

fn create_mock_subgraph_trigger(
filter: &SubgraphFilter,
block: &LightEthereumBlock,
) -> subgraph::TriggerData {
let mock_entity = create_mock_entity(block);
subgraph::TriggerData {
source: filter.subgraph.clone(),
entity: mock_entity,
entity_type: filter.entities.first().unwrap().clone(),
}
}

fn create_mock_entity(block: &LightEthereumBlock) -> Entity {
let id = DeploymentHash::new("test").unwrap();
let data_schema = InputSchema::parse_latest(
"type Block @entity { id: Bytes!, number: BigInt!, hash: Bytes! }",
id.clone(),
)
.unwrap();
let hash = Value::Bytes(scalar::Bytes::from(block.hash.unwrap().as_bytes().to_vec()));
let data = data_schema
.make_entity(vec![
("id".into(), hash.clone()),
(
"number".into(),
Value::BigInt(scalar::BigInt::from(block.number())),
),
("hash".into(), hash),
])
.unwrap();

data
}

/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved
/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block.
/// If a block contains no triggers, there may be no corresponding item in the stream.
Expand All @@ -1825,33 +1745,13 @@ pub(crate) async fn blocks_with_triggers(
subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
from: BlockNumber,
to: BlockNumber,
filter: &Arc<TriggerFilterWrapper<Chain>>,
filter: &TriggerFilter,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<(Vec<BlockWithTriggers<crate::Chain>>, BlockNumber), Error> {
// Each trigger filter needs to be queried for the same block range
// and the blocks yielded need to be deduped. If any error occurs
// while searching for a trigger type, the entire operation fails.
let eth = adapter.clone();
let subgraph_filter = filter.subgraph_filter.clone();

// TODO(krishna): In the initial implementation we do not allow any other datasource type
// When using subgraph data sources, there if subgraph_filter is not empty, we can return
// by just processing the subgraph triggers.
if !subgraph_filter.is_empty() {
return subgraph_triggers(
adapter.clone(),
logger.clone(),
chain_store.clone(),
subgraph_metrics.clone(),
from,
to,
filter,
unified_api_version,
)
.await;
}

let filter = filter.filter.clone();
let call_filter = EthereumCallFilter::from(&filter.block);

// Scan the block range to find relevant triggers
Expand Down
16 changes: 13 additions & 3 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ use graph::{
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
};
use prost::Message;
use std::collections::HashSet;
use std::sync::Arc;

use crate::adapter::TriggerFilter;
use crate::codec::substreams_triggers::BlockAndReceipts;
use crate::codec::Block;
use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate};
use crate::trigger::{self, NearTrigger};
use crate::{
Expand Down Expand Up @@ -243,7 +245,7 @@ impl Blockchain for Chain {
deployment,
store.firehose_cursor(),
store.block_ptr(),
filter.filter.clone(),
filter.chain_filter.clone(),
)
.await;
}
Expand All @@ -255,7 +257,7 @@ impl Blockchain for Chain {
store.firehose_cursor(),
start_blocks,
store.block_ptr(),
filter.filter.clone(),
filter.chain_filter.clone(),
unified_api_version,
)
.await
Expand Down Expand Up @@ -318,11 +320,19 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_from: BlockNumber,
_to: BlockNumber,
_filter: &Arc<TriggerFilterWrapper<Chain>>,
_filter: &TriggerFilter,
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

async fn load_blocks_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
) -> Result<Vec<Block>> {
unimplemented!()
}

async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
unimplemented!()
}
Expand Down
Loading

0 comments on commit d6be3d7

Please sign in to comment.