Skip to content

Commit ede2071

Browse files
committed
Subgraph Composition: Support multiple subgraph datasources
1 parent 63ea9d7 commit ede2071

File tree

35 files changed

+2224
-175
lines changed

35 files changed

+2224
-175
lines changed

Diff for: chain/arweave/src/chain.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use graph::{
2727
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
2828
};
2929
use prost::Message;
30-
use std::collections::HashSet;
30+
use std::collections::BTreeSet;
3131
use std::sync::Arc;
3232

3333
use crate::adapter::TriggerFilter;
@@ -272,7 +272,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
272272
async fn load_block_ptrs_by_numbers(
273273
&self,
274274
_logger: Logger,
275-
_block_numbers: HashSet<BlockNumber>,
275+
_block_numbers: BTreeSet<BlockNumber>,
276276
) -> Result<Vec<ArweaveBlock>, Error> {
277277
todo!()
278278
}

Diff for: chain/cosmos/src/chain.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use graph::components::network_provider::ChainName;
44
use graph::env::EnvVars;
55
use graph::prelude::MetricsRegistry;
66
use graph::substreams::Clock;
7-
use std::collections::HashSet;
7+
use std::collections::BTreeSet;
88
use std::convert::TryFrom;
99
use std::sync::Arc;
1010

@@ -200,7 +200,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
200200
async fn load_block_ptrs_by_numbers(
201201
&self,
202202
_logger: Logger,
203-
_block_numbers: HashSet<BlockNumber>,
203+
_block_numbers: BTreeSet<BlockNumber>,
204204
) -> Result<Vec<Block>, Error> {
205205
todo!()
206206
}

Diff for: chain/ethereum/src/chain.rs

+9-10
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use graph::{
3939
},
4040
};
4141
use prost::Message;
42-
use std::collections::HashSet;
42+
use std::collections::{BTreeSet, HashSet};
4343
use std::future::Future;
4444
use std::iter::FromIterator;
4545
use std::sync::Arc;
@@ -747,7 +747,7 @@ pub struct TriggersAdapter {
747747
async fn fetch_unique_blocks_from_cache(
748748
logger: &Logger,
749749
chain_store: Arc<dyn ChainStore>,
750-
block_numbers: HashSet<BlockNumber>,
750+
block_numbers: BTreeSet<BlockNumber>,
751751
) -> (Vec<Arc<ExtendedBlockPtr>>, Vec<i32>) {
752752
// Load blocks from the cache
753753
let blocks_map = chain_store
@@ -795,7 +795,7 @@ async fn fetch_unique_blocks_from_cache(
795795
async fn load_blocks<F, Fut>(
796796
logger: &Logger,
797797
chain_store: Arc<dyn ChainStore>,
798-
block_numbers: HashSet<BlockNumber>,
798+
block_numbers: BTreeSet<BlockNumber>,
799799
fetch_missing: F,
800800
) -> Result<Vec<BlockFinality>>
801801
where
@@ -843,7 +843,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
843843
async fn load_block_ptrs_by_numbers(
844844
&self,
845845
logger: Logger,
846-
block_numbers: HashSet<BlockNumber>,
846+
block_numbers: BTreeSet<BlockNumber>,
847847
) -> Result<Vec<BlockFinality>> {
848848
match &*self.chain_client {
849849
ChainClient::Firehose(endpoints) => {
@@ -1200,7 +1200,6 @@ mod tests {
12001200
use graph::{slog, tokio};
12011201

12021202
use super::*;
1203-
use std::collections::HashSet;
12041203
use std::sync::Arc;
12051204

12061205
// Helper function to create test blocks
@@ -1224,7 +1223,7 @@ mod tests {
12241223
let block = create_test_block(1, "block1");
12251224
chain_store.blocks.insert(1, vec![block.clone()]);
12261225

1227-
let block_numbers: HashSet<_> = vec![1].into_iter().collect();
1226+
let block_numbers: BTreeSet<_> = vec![1].into_iter().collect();
12281227

12291228
let (blocks, missing) =
12301229
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
@@ -1246,7 +1245,7 @@ mod tests {
12461245
.blocks
12471246
.insert(1, vec![block1.clone(), block2.clone()]);
12481247

1249-
let block_numbers: HashSet<_> = vec![1].into_iter().collect();
1248+
let block_numbers: BTreeSet<_> = vec![1].into_iter().collect();
12501249

12511250
let (blocks, missing) =
12521251
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
@@ -1266,7 +1265,7 @@ mod tests {
12661265
let block = create_test_block(1, "block1");
12671266
chain_store.blocks.insert(1, vec![block.clone()]);
12681267

1269-
let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect();
1268+
let block_numbers: BTreeSet<_> = vec![1, 2].into_iter().collect();
12701269

12711270
let (blocks, missing) =
12721271
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
@@ -1287,7 +1286,7 @@ mod tests {
12871286
chain_store.blocks.insert(1, vec![block1.clone()]);
12881287
chain_store.blocks.insert(2, vec![block2.clone()]);
12891288

1290-
let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect();
1289+
let block_numbers: BTreeSet<_> = vec![1, 2].into_iter().collect();
12911290

12921291
let (blocks, missing) =
12931292
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
@@ -1316,7 +1315,7 @@ mod tests {
13161315
.blocks
13171316
.insert(2, vec![block2a.clone(), block2b.clone()]);
13181317

1319-
let block_numbers: HashSet<_> = vec![1, 2, 3].into_iter().collect();
1318+
let block_numbers: BTreeSet<_> = vec![1, 2, 3].into_iter().collect();
13201319

13211320
let (blocks, missing) =
13221321
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;

Diff for: chain/near/src/chain.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use graph::{
3232
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
3333
};
3434
use prost::Message;
35-
use std::collections::HashSet;
35+
use std::collections::BTreeSet;
3636
use std::sync::Arc;
3737

3838
use crate::adapter::TriggerFilter;
@@ -328,7 +328,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
328328
async fn load_block_ptrs_by_numbers(
329329
&self,
330330
_logger: Logger,
331-
_block_numbers: HashSet<BlockNumber>,
331+
_block_numbers: BTreeSet<BlockNumber>,
332332
) -> Result<Vec<Block>> {
333333
unimplemented!()
334334
}

Diff for: chain/substreams/src/trigger.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use graph::{
1616
};
1717
use graph_runtime_wasm::module::ToAscPtr;
1818
use lazy_static::__Deref;
19-
use std::{collections::HashSet, sync::Arc};
19+
use std::{collections::BTreeSet, sync::Arc};
2020

2121
use crate::{Block, Chain, NoopDataSourceTemplate, ParsedChanges};
2222

@@ -139,7 +139,7 @@ impl blockchain::TriggersAdapter<Chain> for TriggersAdapter {
139139
async fn load_block_ptrs_by_numbers(
140140
&self,
141141
_logger: Logger,
142-
_block_numbers: HashSet<BlockNumber>,
142+
_block_numbers: BTreeSet<BlockNumber>,
143143
) -> Result<Vec<Block>, Error> {
144144
unimplemented!()
145145
}

Diff for: core/src/subgraph/runner.rs

+1
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ where
155155
.iter()
156156
.map(|handler| handler.entity.clone())
157157
.collect(),
158+
manifest_idx: ds.manifest_idx,
158159
})
159160
.collect::<Vec<_>>();
160161

Diff for: graph/src/blockchain/block_stream.rs

+98-54
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use anyhow::Error;
77
use async_stream::stream;
88
use futures03::Stream;
99
use prost_types::Any;
10-
use std::collections::{BTreeMap, HashMap, HashSet};
10+
use std::collections::{BTreeMap, BTreeSet, HashMap};
1111
use std::fmt;
1212
use std::sync::Arc;
1313
use std::time::Instant;
@@ -339,53 +339,127 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
339339
pub async fn blocks_with_subgraph_triggers(
340340
&self,
341341
logger: &Logger,
342-
subgraph_filter: &SubgraphFilter,
342+
filters: &[SubgraphFilter],
343343
range: SubgraphTriggerScanRange<C>,
344344
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
345-
let store = self
346-
.source_subgraph_stores
347-
.get(&subgraph_filter.subgraph)
348-
.ok_or_else(|| anyhow!("Store not found for subgraph: {}", subgraph_filter.subgraph))?;
345+
if filters.is_empty() {
346+
return Err(anyhow!("No subgraph filters provided"));
347+
}
348+
349+
let (blocks, hash_to_entities) = match range {
350+
SubgraphTriggerScanRange::Single(block) => {
351+
let hash_to_entities = self
352+
.fetch_entities_for_filters(filters, block.number(), block.number())
353+
.await?;
354+
355+
(vec![block], hash_to_entities)
356+
}
357+
SubgraphTriggerScanRange::Range(from, to) => {
358+
let hash_to_entities = self.fetch_entities_for_filters(filters, from, to).await?;
359+
360+
// Get block numbers that have entities
361+
let mut block_numbers: BTreeSet<_> = hash_to_entities
362+
.iter()
363+
.flat_map(|(_, entities, _)| entities.keys().copied())
364+
.collect();
365+
366+
// Always include the last block in the range
367+
block_numbers.insert(to);
368+
369+
let blocks = self
370+
.adapter
371+
.load_block_ptrs_by_numbers(logger.clone(), block_numbers)
372+
.await?;
349373

350-
let schema = <dyn crate::components::store::SourceableStore>::input_schema(store);
374+
(blocks, hash_to_entities)
375+
}
376+
};
351377

352-
let adapter = self.adapter.clone();
378+
create_subgraph_triggers::<C>(logger.clone(), blocks, hash_to_entities).await
379+
}
380+
381+
async fn fetch_entities_for_filters(
382+
&self,
383+
filters: &[SubgraphFilter],
384+
from: BlockNumber,
385+
to: BlockNumber,
386+
) -> Result<
387+
Vec<(
388+
DeploymentHash,
389+
BTreeMap<BlockNumber, Vec<EntitySourceOperation>>,
390+
u32,
391+
)>,
392+
Error,
393+
> {
394+
let futures = filters
395+
.iter()
396+
.filter_map(|filter| {
397+
self.source_subgraph_stores
398+
.get(&filter.subgraph)
399+
.map(|store| {
400+
let store = store.clone();
401+
let schema = store.input_schema();
402+
403+
async move {
404+
let entities =
405+
get_entities_for_range(&store, filter, &schema, from, to).await?;
406+
Ok::<_, Error>((filter.subgraph.clone(), entities, filter.manifest_idx))
407+
}
408+
})
409+
})
410+
.collect::<Vec<_>>();
411+
412+
if futures.is_empty() {
413+
return Ok(Vec::new());
414+
}
353415

354-
scan_subgraph_triggers::<C>(logger, store, &adapter, &schema, &subgraph_filter, range).await
416+
futures03::future::try_join_all(futures).await
355417
}
356418
}
357419

358420
fn create_subgraph_trigger_from_entities(
359-
filter: &SubgraphFilter,
421+
subgraph: &DeploymentHash,
360422
entities: Vec<EntitySourceOperation>,
423+
manifest_idx: u32,
361424
) -> Vec<subgraph::TriggerData> {
362425
entities
363426
.into_iter()
364427
.map(|entity| subgraph::TriggerData {
365-
source: filter.subgraph.clone(),
428+
source: subgraph.clone(),
366429
entity,
430+
source_idx: manifest_idx,
367431
})
368432
.collect()
369433
}
370434

371435
async fn create_subgraph_triggers<C: Blockchain>(
372436
logger: Logger,
373437
blocks: Vec<C::Block>,
374-
filter: &SubgraphFilter,
375-
mut entities: BTreeMap<BlockNumber, Vec<EntitySourceOperation>>,
438+
subgraph_data: Vec<(
439+
DeploymentHash,
440+
BTreeMap<BlockNumber, Vec<EntitySourceOperation>>,
441+
u32,
442+
)>,
376443
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
377444
let logger_clone = logger.cheap_clone();
378-
379445
let blocks: Vec<BlockWithTriggers<C>> = blocks
380446
.into_iter()
381447
.map(|block| {
382448
let block_number = block.number();
383-
let trigger_data = entities
384-
.remove(&block_number)
385-
.map(|e| create_subgraph_trigger_from_entities(filter, e))
386-
.unwrap_or_else(Vec::new);
449+
let mut all_trigger_data = Vec::new();
450+
451+
for (hash, entities, manifest_idx) in subgraph_data.iter() {
452+
if let Some(block_entities) = entities.get(&block_number) {
453+
let trigger_data = create_subgraph_trigger_from_entities(
454+
hash,
455+
block_entities.clone(),
456+
*manifest_idx,
457+
);
458+
all_trigger_data.extend(trigger_data);
459+
}
460+
}
387461

388-
BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger_clone)
462+
BlockWithTriggers::new_with_subgraph_triggers(block, all_trigger_data, &logger_clone)
389463
})
390464
.collect();
391465

@@ -397,36 +471,6 @@ pub enum SubgraphTriggerScanRange<C: Blockchain> {
397471
Range(BlockNumber, BlockNumber),
398472
}
399473

400-
async fn scan_subgraph_triggers<C: Blockchain>(
401-
logger: &Logger,
402-
store: &Arc<dyn SourceableStore>,
403-
adapter: &Arc<dyn TriggersAdapter<C>>,
404-
schema: &InputSchema,
405-
filter: &SubgraphFilter,
406-
range: SubgraphTriggerScanRange<C>,
407-
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
408-
match range {
409-
SubgraphTriggerScanRange::Single(block) => {
410-
let entities =
411-
get_entities_for_range(store, filter, schema, block.number(), block.number())
412-
.await?;
413-
create_subgraph_triggers::<C>(logger.clone(), vec![block], filter, entities).await
414-
}
415-
SubgraphTriggerScanRange::Range(from, to) => {
416-
let entities = get_entities_for_range(store, filter, schema, from, to).await?;
417-
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
418-
// Ensure the 'to' block is included in the block_numbers
419-
block_numbers.insert(to);
420-
421-
let blocks = adapter
422-
.load_block_ptrs_by_numbers(logger.clone(), block_numbers)
423-
.await?;
424-
425-
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
426-
}
427-
}
428-
}
429-
430474
#[derive(Debug, Clone, Eq, PartialEq)]
431475
pub enum EntityOperationKind {
432476
Create,
@@ -474,11 +518,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
474518
to: BlockNumber,
475519
filter: &Arc<TriggerFilterWrapper<C>>,
476520
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
477-
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
521+
if !filter.subgraph_filter.is_empty() {
478522
let blocks_with_triggers = self
479523
.blocks_with_subgraph_triggers(
480524
logger,
481-
subgraph_filter,
525+
&filter.subgraph_filter,
482526
SubgraphTriggerScanRange::Range(from, to),
483527
)
484528
.await?;
@@ -504,11 +548,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
504548
"block_hash" => block.hash().hash_hex(),
505549
);
506550

507-
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
551+
if !filter.subgraph_filter.is_empty() {
508552
let blocks_with_triggers = self
509553
.blocks_with_subgraph_triggers(
510554
logger,
511-
subgraph_filter,
555+
&filter.subgraph_filter,
512556
SubgraphTriggerScanRange::Single(block),
513557
)
514558
.await?;
@@ -594,7 +638,7 @@ pub trait TriggersAdapter<C: Blockchain>: Send + Sync {
594638
async fn load_block_ptrs_by_numbers(
595639
&self,
596640
logger: Logger,
597-
block_numbers: HashSet<BlockNumber>,
641+
block_numbers: BTreeSet<BlockNumber>,
598642
) -> Result<Vec<C::Block>>;
599643
}
600644

0 commit comments

Comments
 (0)