Skip to content

Commit

Permalink
Move the substreams block stream process to earlier in the pipeline w…
Browse files Browse the repository at this point in the history
…hen parallel processing is possible
  • Loading branch information
mangas committed Sep 20, 2023
1 parent 70a2c99 commit 0b02cda
Show file tree
Hide file tree
Showing 28 changed files with 599 additions and 390 deletions.
50 changes: 27 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use graph::prelude::{
BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
};
use graph::schema::InputSchema;
use graph::{
blockchain::{
block_stream::{
Expand Down Expand Up @@ -102,6 +103,18 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
)))
}

async fn build_substreams(
&self,
_chain: &Chain,
_schema: Arc<InputSchema>,
_deployment: DeploymentLocator,
_block_cursor: FirehoseCursor,
_subgraph_current_block: Option<BlockPtr>,
_filter: Arc<<Chain as Blockchain>::TriggerFilter>,
) -> Result<Box<dyn BlockStream<Chain>>> {
unimplemented!()
}

async fn build_polling(
&self,
chain: &Chain,
Expand Down
13 changes: 13 additions & 0 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::FirehoseEndpoint;
use graph::prelude::{MetricsRegistry, TryFutureExt};
use graph::schema::InputSchema;
use graph::{
anyhow::Result,
blockchain::{
Expand Down Expand Up @@ -40,6 +41,18 @@ pub struct NearStreamBuilder {}

#[async_trait]
impl BlockStreamBuilder<Chain> for NearStreamBuilder {
async fn build_substreams(
&self,
_chain: &Chain,
_schema: Arc<InputSchema>,
_deployment: DeploymentLocator,
_block_cursor: FirehoseCursor,
_subgraph_current_block: Option<BlockPtr>,
_filter: Arc<<Chain as Blockchain>::TriggerFilter>,
) -> Result<Box<dyn BlockStream<Chain>>> {
unimplemented!()
}

async fn build_firehose(
&self,
chain: &Chain,
Expand Down
2 changes: 1 addition & 1 deletion chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn main() -> Result<(), Error> {
client,
None,
None,
Arc::new(Mapper {}),
Arc::new(Mapper { schema: None }),
package.modules.clone(),
module_name.to_string(),
vec![12369621],
Expand Down
2 changes: 1 addition & 1 deletion chain/substreams/src/block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl SubstreamsBlockIngestor {
#[async_trait]
impl BlockIngestor for SubstreamsBlockIngestor {
async fn run(self: Box<Self>) {
let mapper = Arc::new(Mapper {});
let mapper = Arc::new(Mapper { schema: None });
let mut latest_cursor = self.fetch_head_cursor().await;
let mut backoff =
ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30));
Expand Down
25 changes: 20 additions & 5 deletions chain/substreams/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use graph::{
components::store::DeploymentLocator,
data::subgraph::UnifiedMappingApiVersion,
prelude::{async_trait, BlockNumber, BlockPtr},
schema::InputSchema,
slog::o,
};

Expand All @@ -30,17 +31,18 @@ impl BlockStreamBuilder {
/// is very similar, so we can re-use the configuration and the builder for it.
/// This is probably something to improve but for now it works.
impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
async fn build_firehose(
async fn build_substreams(
&self,
chain: &Chain,
schema: Arc<InputSchema>,
deployment: DeploymentLocator,
block_cursor: FirehoseCursor,
_start_blocks: Vec<BlockNumber>,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<TriggerFilter>,
_unified_api_version: UnifiedMappingApiVersion,
filter: Arc<<Chain as Blockchain>::TriggerFilter>,
) -> Result<Box<dyn BlockStream<Chain>>> {
let mapper = Arc::new(Mapper {});
let mapper = Arc::new(Mapper {
schema: Some(schema),
});

let logger = chain
.logger_factory
Expand All @@ -62,6 +64,19 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
)))
}

async fn build_firehose(
&self,
_chain: &Chain,
_deployment: DeploymentLocator,
_block_cursor: FirehoseCursor,
_start_blocks: Vec<BlockNumber>,
_subgraph_current_block: Option<BlockPtr>,
_filter: Arc<TriggerFilter>,
_unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Chain>>> {
unimplemented!()
}

async fn build_polling(
&self,
_chain: &Chain,
Expand Down
34 changes: 24 additions & 10 deletions chain/substreams/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use crate::{data_source::*, EntityChanges, TriggerData, TriggerFilter, TriggersA
use anyhow::Error;
use graph::blockchain::client::ChainClient;
use graph::blockchain::{
BasicBlockchainBuilder, BlockIngestor, BlockchainBuilder, EmptyNodeCapabilities,
NoopRuntimeAdapter,
BasicBlockchainBuilder, BlockIngestor, EmptyNodeCapabilities, NoopRuntimeAdapter,
};
use graph::components::store::DeploymentCursorTracker;
use graph::components::store::{DeploymentCursorTracker, EntityKey};
use graph::firehose::FirehoseEndpoints;
use graph::prelude::{BlockHash, CheapClone, LoggerFactory, MetricsRegistry};
use graph::prelude::{BlockHash, CheapClone, Entity, LoggerFactory, MetricsRegistry};
use graph::{
blockchain::{
self,
Expand All @@ -20,13 +19,29 @@ use graph::{
prelude::{async_trait, BlockNumber, ChainStore},
slog::Logger,
};

use std::sync::Arc;

// ParsedChanges are an internal representation of the equivalent operations defined on the
// graph-out format used by substreams.
// Unset serves as a sentinel value, if for some reason an unknown value is sent or the value
// was empty then it's probably an unintended behaviour. This code was moved here for performance
// reasons, but the validation is still performed during trigger processing so while Unset will
// very likely just indicate an error somewhere, as far as the stream is concerned we just pass
// that along and let the downstream components deal with it.
#[derive(Debug, Clone)]
pub enum ParsedChanges {
Unset,
Delete(EntityKey),
Upsert { key: EntityKey, entity: Entity },
}

#[derive(Default, Debug, Clone)]
pub struct Block {
pub hash: BlockHash,
pub number: BlockNumber,
pub changes: EntityChanges,
pub parsed_changes: Vec<ParsedChanges>,
}

impl blockchain::Block for Block {
Expand Down Expand Up @@ -112,19 +127,18 @@ impl Blockchain for Chain {
&self,
deployment: DeploymentLocator,
store: impl DeploymentCursorTracker,
start_blocks: Vec<BlockNumber>,
_start_blocks: Vec<BlockNumber>,
filter: Arc<Self::TriggerFilter>,
unified_api_version: UnifiedMappingApiVersion,
_unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
self.block_stream_builder
.build_firehose(
.build_substreams(
self,
store.input_schema(),
deployment,
store.firehose_cursor(),
start_blocks,
store.block_ptr(),
filter,
unified_api_version,
)
.await
}
Expand Down Expand Up @@ -177,7 +191,7 @@ impl Blockchain for Chain {
}
}

impl BlockchainBuilder<super::Chain> for BasicBlockchainBuilder {
impl blockchain::BlockchainBuilder<super::Chain> for BasicBlockchainBuilder {
fn build(self) -> super::Chain {
let BasicBlockchainBuilder {
logger_factory,
Expand Down
Loading

0 comments on commit 0b02cda

Please sign in to comment.