Skip to content

Commit 03c2911

Browse files
committed
firehose connection loadbalance
1 parent efd47b0 commit 03c2911

File tree

16 files changed

+153
-122
lines changed

16 files changed

+153
-122
lines changed

chain/arweave/src/chain.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use graph::data::subgraph::UnifiedMappingApiVersion;
44
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints};
55
use graph::prelude::{MetricsRegistry, TryFutureExt};
66
use graph::{
7-
anyhow,
87
blockchain::{
98
block_stream::{
109
BlockStreamEvent, BlockWithTriggers, FirehoseError,
@@ -108,19 +107,13 @@ impl Blockchain for Chain {
108107
.triggers_adapter(&deployment, &NodeCapabilities {}, unified_api_version)
109108
.expect(&format!("no adapter for network {}", self.name,));
110109

111-
let firehose_endpoint = match self.firehose_endpoints.random() {
112-
Some(e) => e.clone(),
113-
None => return Err(anyhow::format_err!("no firehose endpoint available")),
114-
};
115-
110+
let firehose_endpoint = self.firehose_endpoints.random()?;
116111
let logger = self
117112
.logger_factory
118113
.subgraph_logger(&deployment)
119114
.new(o!("component" => "FirehoseBlockStream"));
120115

121-
let firehose_mapper = Arc::new(FirehoseMapper {
122-
endpoint: firehose_endpoint.cheap_clone(),
123-
});
116+
let firehose_mapper = Arc::new(FirehoseMapper {});
124117

125118
Ok(Box::new(FirehoseBlockStream::new(
126119
deployment.hash,
@@ -156,12 +149,8 @@ impl Blockchain for Chain {
156149
logger: &Logger,
157150
number: BlockNumber,
158151
) -> Result<BlockPtr, IngestorError> {
159-
let firehose_endpoint = match self.firehose_endpoints.random() {
160-
Some(e) => e.clone(),
161-
None => return Err(anyhow::format_err!("no firehose endpoint available").into()),
162-
};
163-
164-
firehose_endpoint
152+
self.firehose_endpoints
153+
.random()?
165154
.block_ptr_for_number::<codec::Block>(logger, number)
166155
.map_err(Into::into)
167156
.await
@@ -249,9 +238,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
249238
}
250239
}
251240

252-
pub struct FirehoseMapper {
253-
endpoint: Arc<FirehoseEndpoint>,
254-
}
241+
pub struct FirehoseMapper {}
255242

256243
#[async_trait]
257244
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
@@ -314,9 +301,10 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
314301
async fn block_ptr_for_number(
315302
&self,
316303
logger: &Logger,
304+
endpoint: &Arc<FirehoseEndpoint>,
317305
number: BlockNumber,
318306
) -> Result<BlockPtr, Error> {
319-
self.endpoint
307+
endpoint
320308
.block_ptr_for_number::<codec::Block>(logger, number)
321309
.await
322310
}
@@ -327,6 +315,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
327315
async fn final_block_ptr_for(
328316
&self,
329317
_logger: &Logger,
318+
_endpoint: &Arc<FirehoseEndpoint>,
330319
block: &codec::Block,
331320
) -> Result<BlockPtr, Error> {
332321
Ok(block.ptr())

chain/cosmos/src/chain.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use graph::cheap_clone::CheapClone;
55
use graph::data::subgraph::UnifiedMappingApiVersion;
66
use graph::prelude::MetricsRegistry;
77
use graph::{
8-
anyhow::anyhow,
98
blockchain::{
109
block_stream::{
1110
BlockStream, BlockStreamEvent, BlockWithTriggers, FirehoseError,
@@ -106,19 +105,14 @@ impl Blockchain for Chain {
106105
.triggers_adapter(&deployment, &NodeCapabilities {}, unified_api_version)
107106
.unwrap_or_else(|_| panic!("no adapter for network {}", self.name));
108107

109-
let firehose_endpoint = match self.firehose_endpoints.random() {
110-
Some(e) => e.clone(),
111-
None => return Err(anyhow!("no firehose endpoint available",)),
112-
};
108+
let firehose_endpoint = self.firehose_endpoints.random()?;
113109

114110
let logger = self
115111
.logger_factory
116112
.subgraph_logger(&deployment)
117113
.new(o!("component" => "FirehoseBlockStream"));
118114

119-
let firehose_mapper = Arc::new(FirehoseMapper {
120-
endpoint: firehose_endpoint.cheap_clone(),
121-
});
115+
let firehose_mapper = Arc::new(FirehoseMapper {});
122116

123117
Ok(Box::new(FirehoseBlockStream::new(
124118
deployment.hash,
@@ -154,10 +148,7 @@ impl Blockchain for Chain {
154148
logger: &Logger,
155149
number: BlockNumber,
156150
) -> Result<BlockPtr, IngestorError> {
157-
let firehose_endpoint = match self.firehose_endpoints.random() {
158-
Some(e) => e.clone(),
159-
None => return Err(anyhow!("no firehose endpoint available").into()),
160-
};
151+
let firehose_endpoint = self.firehose_endpoints.random()?;
161152

162153
firehose_endpoint
163154
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
@@ -324,9 +315,7 @@ fn build_tx_context(tx: &codec::TxResult) -> codec::TransactionContext {
324315
}
325316
}
326317

327-
pub struct FirehoseMapper {
328-
endpoint: Arc<FirehoseEndpoint>,
329-
}
318+
pub struct FirehoseMapper {}
330319

331320
#[async_trait]
332321
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
@@ -389,21 +378,22 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
389378
async fn block_ptr_for_number(
390379
&self,
391380
logger: &Logger,
381+
endpoint: &Arc<FirehoseEndpoint>,
392382
number: BlockNumber,
393383
) -> Result<BlockPtr, Error> {
394-
self.endpoint
384+
endpoint
395385
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
396386
.await
397387
}
398388

399389
async fn final_block_ptr_for(
400390
&self,
401391
logger: &Logger,
392+
endpoint: &Arc<FirehoseEndpoint>,
402393
block: &codec::Block,
403394
) -> Result<BlockPtr, Error> {
404395
// Cosmos provides instant block finality.
405-
self.endpoint
406-
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, block.number())
396+
self.block_ptr_for_number(logger, endpoint, block.number())
407397
.await
408398
}
409399
}

chain/ethereum/examples/firehose.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ async fn main() -> Result<(), Error> {
2525
token,
2626
false,
2727
false,
28-
1,
2928
));
3029

3130
loop {

chain/ethereum/src/chain.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,14 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
6969
chain.name, requirements
7070
));
7171

72-
let firehose_endpoint = match chain.firehose_endpoints.random() {
73-
Some(e) => e.clone(),
74-
None => return Err(anyhow::format_err!("no firehose endpoint available",)),
75-
};
72+
let firehose_endpoint = chain.firehose_endpoints.random()?;
7673

7774
let logger = chain
7875
.logger_factory
7976
.subgraph_logger(&deployment)
8077
.new(o!("component" => "FirehoseBlockStream"));
8178

82-
let firehose_mapper = Arc::new(FirehoseMapper {
83-
endpoint: firehose_endpoint.cheap_clone(),
84-
});
79+
let firehose_mapper = Arc::new(FirehoseMapper {});
8580

8681
Ok(Box::new(FirehoseBlockStream::new(
8782
deployment.hash,
@@ -584,9 +579,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
584579
}
585580
}
586581

587-
pub struct FirehoseMapper {
588-
endpoint: Arc<FirehoseEndpoint>,
589-
}
582+
pub struct FirehoseMapper {}
590583

591584
#[async_trait]
592585
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
@@ -660,16 +653,18 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
660653
async fn block_ptr_for_number(
661654
&self,
662655
logger: &Logger,
656+
endpoint: &Arc<FirehoseEndpoint>,
663657
number: BlockNumber,
664658
) -> Result<BlockPtr, Error> {
665-
self.endpoint
659+
endpoint
666660
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
667661
.await
668662
}
669663

670664
async fn final_block_ptr_for(
671665
&self,
672666
logger: &Logger,
667+
endpoint: &Arc<FirehoseEndpoint>,
673668
block: &BlockFinality,
674669
) -> Result<BlockPtr, Error> {
675670
// Firehose for Ethereum has an hard-coded confirmations for finality sets to 200 block
@@ -680,8 +675,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
680675
_ => 0,
681676
};
682677

683-
self.endpoint
684-
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, final_block_number)
678+
self.block_ptr_for_number(logger, endpoint, final_block_number)
685679
.await
686680
}
687681
}

chain/near/src/chain.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use graph::data::subgraph::UnifiedMappingApiVersion;
44
use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints};
55
use graph::prelude::{MetricsRegistry, TryFutureExt};
66
use graph::{
7-
anyhow,
87
anyhow::Result,
98
blockchain::{
109
block_stream::{
@@ -50,19 +49,14 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
5049
.triggers_adapter(&deployment, &NodeCapabilities {}, unified_api_version)
5150
.expect(&format!("no adapter for network {}", chain.name,));
5251

53-
let firehose_endpoint = match chain.firehose_endpoints.random() {
54-
Some(e) => e.clone(),
55-
None => return Err(anyhow::format_err!("no firehose endpoint available")),
56-
};
52+
let firehose_endpoint = chain.firehose_endpoints.random()?;
5753

5854
let logger = chain
5955
.logger_factory
6056
.subgraph_logger(&deployment)
6157
.new(o!("component" => "FirehoseBlockStream"));
6258

63-
let firehose_mapper = Arc::new(FirehoseMapper {
64-
endpoint: firehose_endpoint.cheap_clone(),
65-
});
59+
let firehose_mapper = Arc::new(FirehoseMapper {});
6660

6761
Ok(Box::new(FirehoseBlockStream::new(
6862
deployment.hash,
@@ -200,10 +194,7 @@ impl Blockchain for Chain {
200194
logger: &Logger,
201195
number: BlockNumber,
202196
) -> Result<BlockPtr, IngestorError> {
203-
let firehose_endpoint = match self.firehose_endpoints.random() {
204-
Some(e) => e.clone(),
205-
None => return Err(anyhow::format_err!("no firehose endpoint available").into()),
206-
};
197+
let firehose_endpoint = self.firehose_endpoints.random()?;
207198

208199
firehose_endpoint
209200
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
@@ -318,9 +309,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
318309
}
319310
}
320311

321-
pub struct FirehoseMapper {
322-
endpoint: Arc<FirehoseEndpoint>,
323-
}
312+
pub struct FirehoseMapper {}
324313

325314
#[async_trait]
326315
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
@@ -384,22 +373,23 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
384373
async fn block_ptr_for_number(
385374
&self,
386375
logger: &Logger,
376+
endpoint: &Arc<FirehoseEndpoint>,
387377
number: BlockNumber,
388378
) -> Result<BlockPtr, Error> {
389-
self.endpoint
379+
endpoint
390380
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, number)
391381
.await
392382
}
393383

394384
async fn final_block_ptr_for(
395385
&self,
396386
logger: &Logger,
387+
endpoint: &Arc<FirehoseEndpoint>,
397388
block: &codec::Block,
398389
) -> Result<BlockPtr, Error> {
399390
let final_block_number = block.header().last_final_block_height as BlockNumber;
400391

401-
self.endpoint
402-
.block_ptr_for_number::<codec::HeaderOnlyBlock>(logger, final_block_number)
392+
self.block_ptr_for_number(logger, endpoint, final_block_number)
403393
.await
404394
}
405395
}

chain/substreams/examples/substreams.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ async fn main() -> Result<(), Error> {
4646
token,
4747
false,
4848
false,
49-
1,
5049
));
5150

5251
let mut stream: SubstreamsBlockStream<graph_chain_substreams::Chain> =

chain/substreams/src/block_stream.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
3939
filter: Arc<TriggerFilter>,
4040
_unified_api_version: UnifiedMappingApiVersion,
4141
) -> Result<Box<dyn BlockStream<Chain>>> {
42-
let firehose_endpoint = match chain.endpoints.random() {
43-
Some(e) => e.clone(),
44-
None => return Err(anyhow::format_err!("no firehose endpoint available")),
45-
};
42+
let firehose_endpoint = chain.endpoints.random()?;
4643

4744
let mapper = Arc::new(Mapper {});
4845

core/src/subgraph/runner.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,11 @@ where
9999
let block_stream_canceler = CancelGuard::new();
100100
let block_stream_cancel_handle = block_stream_canceler.handle();
101101

102-
let mut block_stream = new_block_stream(&self.inputs, &self.ctx.filter)
103-
.await?
104-
.map_err(CancelableError::Error)
105-
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
102+
let mut block_stream =
103+
new_block_stream(&self.inputs, &self.ctx.filter, &self.metrics.subgraph)
104+
.await?
105+
.map_err(CancelableError::Error)
106+
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
106107

107108
// Keep the stream's cancel guard around to be able to shut it down when the subgraph
108109
// deployment is unassigned

core/src/subgraph/stream.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::subgraph::inputs::IndexingInputs;
22
use graph::blockchain::block_stream::{BlockStream, BufferedBlockStream};
33
use graph::blockchain::Blockchain;
4-
use graph::prelude::Error;
4+
use graph::prelude::{Error, SubgraphInstanceMetrics};
55
use std::sync::Arc;
66

77
const BUFFERED_BLOCK_STREAM_SIZE: usize = 100;
@@ -10,6 +10,7 @@ const BUFFERED_FIREHOSE_STREAM_SIZE: usize = 1;
1010
pub async fn new_block_stream<C: Blockchain>(
1111
inputs: &IndexingInputs<C>,
1212
filter: &C::TriggerFilter,
13+
metrics: &SubgraphInstanceMetrics,
1314
) -> Result<Box<dyn BlockStream<C>>, Error> {
1415
let is_firehose = inputs.chain.is_firehose_supported();
1516

@@ -37,10 +38,13 @@ pub async fn new_block_stream<C: Blockchain>(
3738
inputs.unified_api_version.clone(),
3839
),
3940
}
40-
.await?;
41+
.await;
42+
if is_firehose && block_stream.is_err() {
43+
metrics.firehose_connection_errors.inc();
44+
}
4145

4246
Ok(BufferedBlockStream::spawn_from_stream(
43-
block_stream,
47+
block_stream?,
4448
buffer_size,
4549
))
4650
}

graph/src/blockchain/block_stream.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use super::{Block, BlockPtr, Blockchain};
1010
use crate::anyhow::Result;
1111
use crate::components::store::{BlockNumber, DeploymentLocator};
1212
use crate::data::subgraph::UnifiedMappingApiVersion;
13-
use crate::firehose;
13+
use crate::firehose::{self, FirehoseEndpoint};
1414
use crate::substreams::BlockScopedData;
1515
use crate::{prelude::*, prometheus::labels};
1616

@@ -254,6 +254,7 @@ pub trait FirehoseMapper<C: Blockchain>: Send + Sync {
254254
async fn block_ptr_for_number(
255255
&self,
256256
logger: &Logger,
257+
endpoint: &Arc<FirehoseEndpoint>,
257258
number: BlockNumber,
258259
) -> Result<BlockPtr, Error>;
259260

@@ -271,6 +272,7 @@ pub trait FirehoseMapper<C: Blockchain>: Send + Sync {
271272
async fn final_block_ptr_for(
272273
&self,
273274
logger: &Logger,
275+
endpoint: &Arc<FirehoseEndpoint>,
274276
block: &C::Block,
275277
) -> Result<BlockPtr, Error>;
276278
}

0 commit comments

Comments
 (0)