Skip to content

Commit 1710c0a

Browse files
committed
firehose connection loadbalance
1 parent d0282c6 commit 1710c0a

File tree

3 files changed

+32
-28
lines changed

3 files changed

+32
-28
lines changed

graph/src/firehose/endpoints.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use futures03::StreamExt;
1111
use http::uri::{Scheme, Uri};
1212
use rand::prelude::IteratorRandom;
1313
use slog::Logger;
14-
use std::{collections::BTreeMap, fmt::Display, iter, sync::Arc, time::Duration};
14+
use std::{collections::BTreeMap, fmt::Display, sync::Arc, time::Duration};
1515
use tonic::{
1616
metadata::MetadataValue,
1717
transport::{Channel, ClientTlsConfig},
@@ -42,7 +42,6 @@ impl FirehoseEndpoint {
4242
token: Option<String>,
4343
filters_enabled: bool,
4444
compression_enabled: bool,
45-
conn_pool_size: u16,
4645
) -> Self {
4746
let uri = url
4847
.as_ref()
@@ -75,8 +74,7 @@ impl FirehoseEndpoint {
7574
// Timeout on each request, so the timeout to estabilish each 'Blocks' stream.
7675
.timeout(Duration::from_secs(120));
7776

78-
// Load balancing on a same endpoint is useful because it creates a connection pool.
79-
let channel = Channel::balance_list(iter::repeat(endpoint).take(conn_pool_size as usize));
77+
let channel = Channel::balance_list(vec![endpoint].into_iter());
8078

8179
FirehoseEndpoint {
8280
provider: provider.as_ref().to_string(),
@@ -267,10 +265,12 @@ impl FirehoseEndpoints {
267265
self.0.len()
268266
}
269267

268+
// selects the FirehoseEndpoint with the lest amount of references, which will help with spliting
269+
// the load naively across the entire list.
270270
pub fn random(&self) -> Option<&Arc<FirehoseEndpoint>> {
271-
// Select from the matching adapters randomly
272-
let mut rng = rand::thread_rng();
273-
self.0.iter().choose(&mut rng)
271+
self.0
272+
.iter()
273+
.min_by(|x, y| Arc::strong_count(x).cmp(&Arc::strong_count(y)))
274274
}
275275

276276
pub fn remove(&mut self, provider: &str) {

node/src/chain.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -124,19 +124,22 @@ pub fn create_substreams_networks(
124124
"provider" => &provider.label,
125125
);
126126

127-
let endpoint = FirehoseEndpoint::new(
128-
&provider.label,
129-
&firehose.url,
130-
firehose.token.clone(),
131-
firehose.filters_enabled(),
132-
firehose.compression_enabled(),
133-
firehose.conn_pool_size,
134-
);
135-
136127
let parsed_networks = networks_by_kind
137128
.entry(chain.protocol)
138129
.or_insert_with(|| FirehoseNetworks::new());
139-
parsed_networks.insert(name.to_string(), Arc::new(endpoint));
130+
131+
for i in 0..firehose.conn_pool_size {
132+
parsed_networks.insert(
133+
name.to_string(),
134+
Arc::new(FirehoseEndpoint::new(
135+
&format!("{}-{}", provider.label, i),
136+
&firehose.url,
137+
firehose.token.clone(),
138+
firehose.filters_enabled(),
139+
firehose.compression_enabled(),
140+
)),
141+
);
142+
}
140143
}
141144
}
142145
}
@@ -166,19 +169,21 @@ pub fn create_firehose_networks(
166169
"provider" => &provider.label,
167170
);
168171

169-
let endpoint = FirehoseEndpoint::new(
170-
&provider.label,
171-
&firehose.url,
172-
firehose.token.clone(),
173-
firehose.filters_enabled(),
174-
firehose.compression_enabled(),
175-
firehose.conn_pool_size,
176-
);
177-
178172
let parsed_networks = networks_by_kind
179173
.entry(chain.protocol)
180174
.or_insert_with(|| FirehoseNetworks::new());
181-
parsed_networks.insert(name.to_string(), Arc::new(endpoint));
175+
for i in 0..firehose.conn_pool_size {
176+
parsed_networks.insert(
177+
name.to_string(),
178+
Arc::new(FirehoseEndpoint::new(
179+
&format!("{}-{}", provider.label, i),
180+
&firehose.url,
181+
firehose.token.clone(),
182+
firehose.filters_enabled(),
183+
firehose.compression_enabled(),
184+
)),
185+
);
186+
}
182187
}
183188
}
184189
}

tests/src/fixture/ethereum.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ pub async fn chain(blocks: Vec<BlockWithTriggers<Chain>>, stores: &Stores) -> Ch
3434
None,
3535
true,
3636
false,
37-
0,
3837
))]
3938
.into();
4039

0 commit comments

Comments
 (0)