Skip to content

firehose connection loadbalance #4083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 8 additions & 19 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints};
use graph::prelude::{MetricsRegistry, TryFutureExt};
use graph::{
anyhow,
blockchain::{
block_stream::{
BlockStreamEvent, BlockWithTriggers, FirehoseError,
Expand Down Expand Up @@ -108,19 +107,13 @@ impl Blockchain for Chain {
.triggers_adapter(&deployment, &NodeCapabilities {}, unified_api_version)
.expect(&format!("no adapter for network {}", self.name,));

let firehose_endpoint = match self.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available")),
};

let firehose_endpoint = self.firehose_endpoints.random()?;
let logger = self
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});
let firehose_mapper = Arc::new(FirehoseMapper {});

Ok(Box::new(FirehoseBlockStream::new(
deployment.hash,
Expand Down Expand Up @@ -156,12 +149,8 @@ impl Blockchain for Chain {
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
let firehose_endpoint = match self.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available").into()),
};

firehose_endpoint
self.firehose_endpoints
.random()?
.block_ptr_for_number::<codec::Block>(logger, number)
.map_err(Into::into)
.await
Expand Down Expand Up @@ -249,9 +238,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}
}

pub struct FirehoseMapper {
endpoint: Arc<FirehoseEndpoint>,
}
pub struct FirehoseMapper {}

#[async_trait]
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
Expand Down Expand Up @@ -314,9 +301,10 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
async fn block_ptr_for_number(
&self,
logger: &Logger,
endpoint: &Arc<FirehoseEndpoint>,
number: BlockNumber,
) -> Result<BlockPtr, Error> {
self.endpoint
endpoint
.block_ptr_for_number::<codec::Block>(logger, number)
.await
}
Expand All @@ -327,6 +315,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
async fn final_block_ptr_for(
&self,
_logger: &Logger,
_endpoint: &Arc<FirehoseEndpoint>,
block: &codec::Block,
) -> Result<BlockPtr, Error> {
Ok(block.ptr())
Expand Down
26 changes: 8 additions & 18 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use graph::cheap_clone::CheapClone;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::prelude::MetricsRegistry;
use graph::{
anyhow::anyhow,
blockchain::{
block_stream::{
BlockStream, BlockStreamEvent, BlockWithTriggers, FirehoseError,
Expand Down Expand Up @@ -106,19 +105,14 @@ impl Blockchain for Chain {
.triggers_adapter(&deployment, &NodeCapabilities {}, unified_api_version)
.unwrap_or_else(|_| panic!("no adapter for network {}", self.name));

let firehose_endpoint = match self.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow!("no firehose endpoint available",)),
};
let firehose_endpoint = self.firehose_endpoints.random()?;

let logger = self
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});
let firehose_mapper = Arc::new(FirehoseMapper {});

Ok(Box::new(FirehoseBlockStream::new(
deployment.hash,
Expand Down Expand Up @@ -154,10 +148,7 @@ impl Blockchain for Chain {
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
let firehose_endpoint = match self.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow!("no firehose endpoint available").into()),
};
let firehose_endpoint = self.firehose_endpoints.random()?;

firehose_endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
Expand Down Expand Up @@ -324,9 +315,7 @@ fn build_tx_context(tx: &codec::TxResult) -> codec::TransactionContext {
}
}

pub struct FirehoseMapper {
endpoint: Arc<FirehoseEndpoint>,
}
pub struct FirehoseMapper {}

#[async_trait]
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
Expand Down Expand Up @@ -389,21 +378,22 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
async fn block_ptr_for_number(
&self,
logger: &Logger,
endpoint: &Arc<FirehoseEndpoint>,
number: BlockNumber,
) -> Result<BlockPtr, Error> {
self.endpoint
endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
.await
}

async fn final_block_ptr_for(
&self,
logger: &Logger,
endpoint: &Arc<FirehoseEndpoint>,
block: &codec::Block,
) -> Result<BlockPtr, Error> {
// Cosmos provides instant block finality.
self.endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, block.number())
self.block_ptr_for_number(logger, endpoint, block.number())
.await
}
}
Expand Down
1 change: 0 additions & 1 deletion chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ async fn main() -> Result<(), Error> {
token,
false,
false,
1,
));

loop {
Expand Down
20 changes: 7 additions & 13 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,14 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
chain.name, requirements
));

let firehose_endpoint = match chain.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available",)),
};
let firehose_endpoint = chain.firehose_endpoints.random()?;

let logger = chain
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});
let firehose_mapper = Arc::new(FirehoseMapper {});

Ok(Box::new(FirehoseBlockStream::new(
deployment.hash,
Expand Down Expand Up @@ -584,9 +579,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}
}

pub struct FirehoseMapper {
endpoint: Arc<FirehoseEndpoint>,
}
pub struct FirehoseMapper {}

#[async_trait]
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
Expand Down Expand Up @@ -660,16 +653,18 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
async fn block_ptr_for_number(
&self,
logger: &Logger,
endpoint: &Arc<FirehoseEndpoint>,
number: BlockNumber,
) -> Result<BlockPtr, Error> {
self.endpoint
endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
.await
}

async fn final_block_ptr_for(
&self,
logger: &Logger,
endpoint: &Arc<FirehoseEndpoint>,
block: &BlockFinality,
) -> Result<BlockPtr, Error> {
// Firehose for Ethereum has an hard-coded confirmations for finality sets to 200 block
Expand All @@ -680,8 +675,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
_ => 0,
};

self.endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, final_block_number)
self.block_ptr_for_number(logger, endpoint, final_block_number)
.await
}
}
26 changes: 8 additions & 18 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints};
use graph::prelude::{MetricsRegistry, TryFutureExt};
use graph::{
anyhow,
anyhow::Result,
blockchain::{
block_stream::{
Expand Down Expand Up @@ -50,19 +49,14 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
.triggers_adapter(&deployment, &NodeCapabilities {}, unified_api_version)
.expect(&format!("no adapter for network {}", chain.name,));

let firehose_endpoint = match chain.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available")),
};
let firehose_endpoint = chain.firehose_endpoints.random()?;

let logger = chain
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {
endpoint: firehose_endpoint.cheap_clone(),
});
let firehose_mapper = Arc::new(FirehoseMapper {});

Ok(Box::new(FirehoseBlockStream::new(
deployment.hash,
Expand Down Expand Up @@ -200,10 +194,7 @@ impl Blockchain for Chain {
logger: &Logger,
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
let firehose_endpoint = match self.firehose_endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available").into()),
};
let firehose_endpoint = self.firehose_endpoints.random()?;

firehose_endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
Expand Down Expand Up @@ -318,9 +309,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}
}

pub struct FirehoseMapper {
endpoint: Arc<FirehoseEndpoint>,
}
pub struct FirehoseMapper {}

#[async_trait]
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
Expand Down Expand Up @@ -384,22 +373,23 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
async fn block_ptr_for_number(
&self,
logger: &Logger,
endpoint: &Arc<FirehoseEndpoint>,
number: BlockNumber,
) -> Result<BlockPtr, Error> {
self.endpoint
endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
.await
}

async fn final_block_ptr_for(
&self,
logger: &Logger,
endpoint: &Arc<FirehoseEndpoint>,
block: &codec::Block,
) -> Result<BlockPtr, Error> {
let final_block_number = block.header().last_final_block_height as BlockNumber;

self.endpoint
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, final_block_number)
self.block_ptr_for_number(logger, endpoint, final_block_number)
.await
}
}
Expand Down
1 change: 0 additions & 1 deletion chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ async fn main() -> Result<(), Error> {
token,
false,
false,
1,
));

let mut stream: SubstreamsBlockStream<graph_chain_substreams::Chain> =
Expand Down
5 changes: 1 addition & 4 deletions chain/substreams/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
filter: Arc<TriggerFilter>,
_unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Chain>>> {
let firehose_endpoint = match chain.endpoints.random() {
Some(e) => e.clone(),
None => return Err(anyhow::format_err!("no firehose endpoint available")),
};
let firehose_endpoint = chain.endpoints.random()?;

let mapper = Arc::new(Mapper {});

Expand Down
9 changes: 5 additions & 4 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ where
let block_stream_canceler = CancelGuard::new();
let block_stream_cancel_handle = block_stream_canceler.handle();

let mut block_stream = new_block_stream(&self.inputs, &self.ctx.filter)
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
let mut block_stream =
new_block_stream(&self.inputs, &self.ctx.filter, &self.metrics.subgraph)
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));

// Keep the stream's cancel guard around to be able to shut it down when the subgraph
// deployment is unassigned
Expand Down
10 changes: 7 additions & 3 deletions core/src/subgraph/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::subgraph::inputs::IndexingInputs;
use graph::blockchain::block_stream::{BlockStream, BufferedBlockStream};
use graph::blockchain::Blockchain;
use graph::prelude::Error;
use graph::prelude::{Error, SubgraphInstanceMetrics};
use std::sync::Arc;

const BUFFERED_BLOCK_STREAM_SIZE: usize = 100;
Expand All @@ -10,6 +10,7 @@ const BUFFERED_FIREHOSE_STREAM_SIZE: usize = 1;
pub async fn new_block_stream<C: Blockchain>(
inputs: &IndexingInputs<C>,
filter: &C::TriggerFilter,
metrics: &SubgraphInstanceMetrics,
) -> Result<Box<dyn BlockStream<C>>, Error> {
let is_firehose = inputs.chain.is_firehose_supported();

Expand Down Expand Up @@ -37,10 +38,13 @@ pub async fn new_block_stream<C: Blockchain>(
inputs.unified_api_version.clone(),
),
}
.await?;
.await;
if is_firehose && block_stream.is_err() {
metrics.firehose_connection_errors.inc();
}

Ok(BufferedBlockStream::spawn_from_stream(
block_stream,
block_stream?,
buffer_size,
))
}
4 changes: 3 additions & 1 deletion graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{Block, BlockPtr, Blockchain};
use crate::anyhow::Result;
use crate::components::store::{BlockNumber, DeploymentLocator};
use crate::data::subgraph::UnifiedMappingApiVersion;
use crate::firehose;
use crate::firehose::{self, FirehoseEndpoint};
use crate::substreams::BlockScopedData;
use crate::{prelude::*, prometheus::labels};

Expand Down Expand Up @@ -254,6 +254,7 @@ pub trait FirehoseMapper<C: Blockchain>: Send + Sync {
async fn block_ptr_for_number(
&self,
logger: &Logger,
endpoint: &Arc<FirehoseEndpoint>,
number: BlockNumber,
) -> Result<BlockPtr, Error>;

Expand All @@ -271,6 +272,7 @@ pub trait FirehoseMapper<C: Blockchain>: Send + Sync {
async fn final_block_ptr_for(
&self,
logger: &Logger,
endpoint: &Arc<FirehoseEndpoint>,
block: &C::Block,
) -> Result<BlockPtr, Error>;
}
Expand Down
Loading