Skip to content

Commit

Permalink
firehose ingestor transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Nov 30, 2022
1 parent be4782f commit dbe8e22
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 9 deletions.
27 changes: 24 additions & 3 deletions graph/proto/ethereum/transforms.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ option go_package = "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethe

// CombinedFilter is a combination of "LogFilters" and "CallToFilters"
//
// It transforms the requested stream in two ways:
// It transforms the requested stream in two ways:
// 1. STRIPPING
// The block data is stripped from all transactions that don't
// match any of the filters.
// match any of the filters.
//
// 2. SKIPPING
// If an "block index" covers a range containing a
Expand All @@ -25,7 +25,7 @@ message CombinedFilter {
repeated LogFilter log_filters = 1;
repeated CallToFilter call_filters = 2;

// Always send all blocks. if they don't match any log_filters or call_filters,
// Always send all blocks. if they don't match any log_filters or call_filters,
// all the transactions will be filtered out, sending only the header.
bool send_all_block_headers = 3;
}
Expand Down Expand Up @@ -60,5 +60,26 @@ message CallToFilter {
repeated bytes signatures = 2;
}

// Deprecated: LightBlock is deprecated, replaced by HeaderOnly, note however that the new transform
// does not have any transactions traces returned, so it's not a direct replacement.
message LightBlock {
}

// HeaderOnly returns only the block's header and few top-level core information for the block. Useful
// for cases where no transactions information is required at all.
//
// The structure that would will have access to after:
//
// ```
// Block {
// int32 ver = 1;
// bytes hash = 2;
// uint64 number = 3;
// uint64 size = 4;
// BlockHeader header = 5;
// }
// ```
//
// Everything else will be empty.
message HeaderOnly {
}
30 changes: 29 additions & 1 deletion graph/src/blockchain/firehose_block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,43 @@ use std::{marker::PhantomData, sync::Arc, time::Duration};
use crate::{
blockchain::Block as BlockchainBlock,
components::store::ChainStore,
firehose::{self, decode_firehose_block, FirehoseEndpoint},
firehose::{self, decode_firehose_block, FirehoseEndpoint, HeaderOnly},
prelude::{error, info, Logger},
util::backoff::ExponentialBackoff,
};
use anyhow::{Context, Error};
use futures03::StreamExt;
use prost::Message;
use prost_types::Any;
use slog::trace;
use tonic::Streaming;

const TRANSFORM_ETHEREUM_HEADER_ONLY: &str =
"type.googleapis.com/sf.ethereum.transform.v1.HeaderOnly";

pub enum Transforms {
EthereumHeaderOnly,
}

impl Into<Any> for &Transforms {
fn into(self) -> Any {
match self {
Transforms::EthereumHeaderOnly => Any {
type_url: TRANSFORM_ETHEREUM_HEADER_ONLY.to_owned(),
value: HeaderOnly {}.encode_to_vec(),
},
}
}
}

pub struct FirehoseBlockIngestor<M>
where
M: prost::Message + BlockchainBlock + Default + 'static,
{
chain_store: Arc<dyn ChainStore>,
endpoint: Arc<FirehoseEndpoint>,
logger: Logger,
default_transforms: Vec<Transforms>,

phantom: PhantomData<M>,
}
Expand All @@ -37,9 +58,15 @@ where
endpoint,
logger,
phantom: PhantomData {},
default_transforms: vec![],
}
}

pub fn with_transforms(mut self, transforms: Vec<Transforms>) -> Self {
self.default_transforms = transforms;
self
}

pub async fn run(self) {
let mut latest_cursor = self.fetch_head_cursor().await;
let mut backoff =
Expand All @@ -59,6 +86,7 @@ where
start_block_num: -1,
cursor: latest_cursor.clone(),
final_blocks_only: false,
transforms: self.default_transforms.iter().map(|t| t.into()).collect(),
..Default::default()
})
.await;
Expand Down
27 changes: 24 additions & 3 deletions graph/src/firehose/sf.ethereum.transform.v1.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/// CombinedFilter is a combination of "LogFilters" and "CallToFilters"
///
/// It transforms the requested stream in two ways:
/// It transforms the requested stream in two ways:
/// 1. STRIPPING
/// The block data is stripped from all transactions that don't
/// match any of the filters.
/// match any of the filters.
///
/// 2. SKIPPING
/// If an "block index" covers a range containing a
Expand All @@ -22,7 +22,7 @@ pub struct CombinedFilter {
pub log_filters: ::prost::alloc::vec::Vec<LogFilter>,
#[prost(message, repeated, tag="2")]
pub call_filters: ::prost::alloc::vec::Vec<CallToFilter>,
/// Always send all blocks. if they don't match any log_filters or call_filters,
/// Always send all blocks. if they don't match any log_filters or call_filters,
/// all the transactions will be filtered out, sending only the header.
#[prost(bool, tag="3")]
pub send_all_block_headers: bool,
Expand Down Expand Up @@ -64,6 +64,27 @@ pub struct CallToFilter {
#[prost(bytes="vec", repeated, tag="2")]
pub signatures: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
}
/// Deprecated: LightBlock is deprecated, replaced by HeaderOnly, note however that the new transform
/// does not have any transactions traces returned, so it's not a direct replacement.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LightBlock {
}
/// HeaderOnly returns only the block's header and few top-level core information for the block. Useful
/// for cases where no transactions information is required at all.
///
/// The structure that would will have access to after:
///
/// ```
/// Block {
/// int32 ver = 1;
/// bytes hash = 2;
/// uint64 number = 3;
/// uint64 size = 4;
/// BlockHeader header = 5;
/// }
/// ```
///
/// Everything else will be empty.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HeaderOnly {
}
8 changes: 6 additions & 2 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ethereum::{
BlockIngestor as EthereumBlockIngestor, EthereumAdapterTrait, EthereumNetworks, RuntimeAdapter,
};
use git_testament::{git_testament, render_testament};
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms};
use graph::blockchain::{Block as BlockchainBlock, Blockchain, BlockchainKind, BlockchainMap};
use graph::components::store::BlockStore;
use graph::data::graphql::effort::LoadManager;
Expand Down Expand Up @@ -970,12 +970,16 @@ fn start_firehose_block_ingestor<C, M>(

match store.block_store().chain_store(network_name.as_ref()) {
Some(s) => {
let block_ingestor = FirehoseBlockIngestor::<M>::new(
let mut block_ingestor = FirehoseBlockIngestor::<M>::new(
s,
endpoint.clone(),
logger.new(o!("component" => "FirehoseBlockIngestor", "provider" => endpoint.provider.clone())),
);

if C::KIND == BlockchainKind::Ethereum {
block_ingestor = block_ingestor.with_transforms(vec![Transforms::EthereumHeaderOnly]);
}

// Run the Firehose block ingestor in the background
graph::spawn(block_ingestor.run());
},
Expand Down

0 comments on commit dbe8e22

Please sign in to comment.