Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregations (write side) #5082

Merged
merged 28 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d1b195f
graph: Remove TypeInfo.fields; use the fields of the included type
lutter Oct 4, 2023
93a39d4
graph: Merge TypeInfo and TypeKind into one enum
lutter Oct 4, 2023
5911dad
graph: Simplify TypeInfo
lutter Oct 4, 2023
eb976a3
graph, store: Make using InputSchema::object_type on an interface an …
lutter Oct 5, 2023
9f942a6
graph: Add a very simple schema test for timeseries
lutter Sep 30, 2023
d975f94
graph: Validate timeseries and aggregation types
lutter Oct 2, 2023
31a8c8c
graph: Aggregation type
lutter Oct 3, 2023
040a433
store: Test store get/set disallows interfaces
lutter Oct 12, 2023
8483f8f
graph, runtime: Disallow store_set/get/remove for all but @entity
lutter Oct 12, 2023
150dc63
graph, store: Generate DDL for aggregations
lutter Oct 4, 2023
a7bd9d5
graph: Provide context when parsing an id fails
lutter Nov 16, 2023
dfc8472
graph: Parse @entity flags into our ObjectType
lutter Nov 16, 2023
362572c
all: Remove explicit BlockPtr argument from process_mapping_trigger
lutter Oct 12, 2023
ef09512
all: Provide the block time to mapping handlers
lutter Oct 12, 2023
d6f4813
all: Pass the timestamp of the current block to the store
lutter Oct 26, 2023
4b7a332
core, graph, store: Store block time in PoI
lutter Oct 27, 2023
6bd54f8
graph, substreams: Suppress some doc generation to avoid name conflicts
lutter Oct 28, 2023
fa3ad33
graph, store: Track time when last rollup for timeseries happened
lutter Oct 30, 2023
cc788a9
graph, runtime: Override user-supplied timestamp in store.set
lutter Nov 22, 2023
28cc721
graph, store: Perform rollups of aggregations
lutter Oct 30, 2023
c28381e
all: Thread the spec version into InputSchema validation
lutter Dec 7, 2023
0f6498d
graph: Gate aggregations and Int8 id on new spec version
lutter Dec 7, 2023
90587d5
graph, store: Add a min aggregate fn
lutter Dec 15, 2023
ac6a1dd
store, graph: Add first and last aggregate functions
lutter Dec 15, 2023
a54af2b
docs: Document aggregations
lutter Dec 15, 2023
a5b98d7
graph, store: Rename count to `AggregateFn::Count`
lutter Jan 12, 2024
8eaa3e5
substreams: Make a missing timestamp an error
lutter Jan 12, 2024
c3f4ec4
graph: Fix typo
lutter Jan 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion chain/arweave/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
#[path = "protobuf/sf.arweave.r#type.v1.rs"]
mod pbcodec;

use graph::{blockchain::Block as BlockchainBlock, blockchain::BlockPtr, prelude::BlockNumber};
use graph::{
blockchain::Block as BlockchainBlock,
blockchain::{BlockPtr, BlockTime},
prelude::BlockNumber,
};

pub use pbcodec::*;

Expand All @@ -28,6 +32,10 @@ impl BlockchainBlock for Block {
number: self.number().saturating_sub(1),
})
}

fn timestamp(&self) -> BlockTime {
BlockTime::since_epoch(i64::try_from(self.timestamp).unwrap(), 0)
}
}

impl AsRef<[u8]> for BigInt {
Expand Down
1 change: 1 addition & 0 deletions chain/arweave/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl blockchain::DataSource<Chain> for DataSource {
trigger.cheap_clone(),
handler.clone(),
block.ptr(),
block.timestamp(),
)))
}

Expand Down
12 changes: 11 additions & 1 deletion chain/cosmos/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub(crate) use crate::protobuf::pbcodec::*;

use graph::blockchain::Block as BlockchainBlock;
use graph::blockchain::{Block as BlockchainBlock, BlockTime};
use graph::{
blockchain::BlockPtr,
prelude::{anyhow::anyhow, BlockNumber, Error},
Expand Down Expand Up @@ -80,6 +80,11 @@ impl BlockchainBlock for Block {
fn parent_ptr(&self) -> Option<BlockPtr> {
self.parent_ptr().unwrap()
}

fn timestamp(&self) -> BlockTime {
let time = self.header().unwrap().time.as_ref().unwrap();
BlockTime::since_epoch(time.seconds, time.nanos as u32)
}
}

impl HeaderOnlyBlock {
Expand Down Expand Up @@ -137,6 +142,11 @@ impl BlockchainBlock for HeaderOnlyBlock {
fn parent_ptr(&self) -> Option<BlockPtr> {
self.parent_ptr().unwrap()
}

fn timestamp(&self) -> BlockTime {
let time = self.header().unwrap().time.as_ref().unwrap();
BlockTime::since_epoch(time.seconds, time.nanos as u32)
}
}

impl EventData {
Expand Down
1 change: 1 addition & 0 deletions chain/cosmos/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl blockchain::DataSource<Chain> for DataSource {
trigger.cheap_clone(),
handler,
block.ptr(),
block.timestamp(),
)))
}

Expand Down
11 changes: 10 additions & 1 deletion chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{anyhow, bail, Result};
use anyhow::{Context, Error};
use graph::blockchain::client::ChainClient;
use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms};
use graph::blockchain::{BlockIngestor, BlockchainKind, TriggersAdapterSelector};
use graph::blockchain::{BlockIngestor, BlockTime, BlockchainKind, TriggersAdapterSelector};
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::{FirehoseEndpoint, ForkStep};
Expand Down Expand Up @@ -588,6 +588,15 @@ impl Block for BlockFinality {
BlockFinality::NonFinal(block) => json::to_value(&block.ethereum_block),
}
}

fn timestamp(&self) -> BlockTime {
let ts = match self {
BlockFinality::Final(block) => block.timestamp,
BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp,
};
let ts = i64::try_from(ts.as_u64()).unwrap();
BlockTime::since_epoch(ts, 0)
}
}

pub struct DummyDataSourceTemplate;
Expand Down
14 changes: 13 additions & 1 deletion chain/ethereum/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ mod pbcodec;

use anyhow::format_err;
use graph::{
blockchain::{Block as BlockchainBlock, BlockPtr, ChainStoreBlock, ChainStoreData},
blockchain::{
self, Block as BlockchainBlock, BlockPtr, BlockTime, ChainStoreBlock, ChainStoreData,
},
prelude::{
web3,
web3::types::{Bytes, H160, H2048, H256, H64, U256, U64},
Expand Down Expand Up @@ -449,6 +451,11 @@ impl BlockchainBlock for Block {
fn data(&self) -> Result<jsonrpc_core::serde_json::Value, jsonrpc_core::serde_json::Error> {
self.header().to_json()
}

fn timestamp(&self) -> BlockTime {
let ts = self.header().timestamp.as_ref().unwrap();
BlockTime::since_epoch(ts.seconds, ts.nanos as u32)
}
}

impl HeaderOnlyBlock {
Expand Down Expand Up @@ -502,6 +509,11 @@ impl BlockchainBlock for HeaderOnlyBlock {
fn data(&self) -> Result<jsonrpc_core::serde_json::Value, jsonrpc_core::serde_json::Error> {
self.header().to_json()
}

fn timestamp(&self) -> blockchain::BlockTime {
let ts = self.header().timestamp.as_ref().unwrap();
blockchain::BlockTime::since_epoch(ts.seconds, ts.nanos as u32)
}
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,11 @@ impl DataSource {
fn matches_trigger_address(&self, trigger: &EthereumTrigger) -> bool {
let Some(ds_address) = self.address else {
// 'wildcard' data sources match any trigger address.
return true
return true;
};

let Some(trigger_address) = trigger.address() else {
return true
return true;
};

ds_address == *trigger_address
Expand Down Expand Up @@ -644,6 +644,7 @@ impl DataSource {
},
handler.handler,
block.block_ptr(),
block.timestamp(),
)))
}
EthereumTrigger::Log(log_ref) => {
Expand Down Expand Up @@ -753,6 +754,7 @@ impl DataSource {
},
event_handler.handler,
block.block_ptr(),
block.timestamp(),
logging_extras,
)))
}
Expand Down Expand Up @@ -863,6 +865,7 @@ impl DataSource {
},
handler.handler,
block.block_ptr(),
block.timestamp(),
logging_extras,
)))
}
Expand Down
12 changes: 11 additions & 1 deletion chain/near/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod substreams_triggers;

use graph::{
blockchain::Block as BlockchainBlock,
blockchain::BlockPtr,
blockchain::{BlockPtr, BlockTime},
prelude::{hex, web3::types::H256, BlockNumber},
};
use std::convert::TryFrom;
Expand Down Expand Up @@ -75,6 +75,11 @@ impl BlockchainBlock for Block {
fn parent_ptr(&self) -> Option<BlockPtr> {
self.parent_ptr()
}

fn timestamp(&self) -> BlockTime {
let ts = i64::try_from(self.header().timestamp).unwrap();
BlockTime::since_epoch(ts, 0)
}
}

impl HeaderOnlyBlock {
Expand All @@ -101,6 +106,11 @@ impl BlockchainBlock for HeaderOnlyBlock {
fn parent_ptr(&self) -> Option<BlockPtr> {
self.header().parent_ptr()
}

fn timestamp(&self) -> BlockTime {
let ts = i64::try_from(self.header().timestamp).unwrap();
BlockTime::since_epoch(ts, 0)
}
}

impl execution_outcome::Status {
Expand Down
1 change: 1 addition & 0 deletions chain/near/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl blockchain::DataSource<Chain> for DataSource {
trigger.cheap_clone(),
handler.clone(),
block.ptr(),
block.timestamp(),
)))
}

Expand Down
4 changes: 4 additions & 0 deletions chain/starknet/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ impl BlockchainBlock for Block {
})
}
}

fn timestamp(&self) -> graph::blockchain::BlockTime {
graph::blockchain::BlockTime::since_epoch(self.timestamp as i64, 0)
}
}
1 change: 1 addition & 0 deletions chain/starknet/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl blockchain::DataSource<Chain> for DataSource {
trigger.clone(),
handler,
block.ptr(),
block.timestamp(),
)))
}

Expand Down
2 changes: 1 addition & 1 deletion chain/substreams/examples/substreams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn main() -> Result<(), Error> {
Some(event) => match event {
Err(_) => {}
Ok(block_stream_event) => match block_stream_event {
BlockStreamEvent::ProcessWasmBlock(_, _, _, _) => {
BlockStreamEvent::ProcessWasmBlock(_, _, _, _, _) => {
unreachable!("Cannot happen with this mapper")
}
BlockStreamEvent::Revert(_, _) => {}
Expand Down
8 changes: 7 additions & 1 deletion chain/substreams/src/block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ impl SubstreamsBlockIngestor {

while let Some(message) = stream.next().await {
let (block, cursor) = match message {
Ok(BlockStreamEvent::ProcessWasmBlock(_block_ptr, _data, _handler, _cursor)) => {
Ok(BlockStreamEvent::ProcessWasmBlock(
_block_ptr,
_block_time,
_data,
_handler,
_cursor,
)) => {
unreachable!("Block ingestor should never receive raw blocks");
}
Ok(BlockStreamEvent::ProcessBlock(triggers, cursor)) => {
Expand Down
6 changes: 5 additions & 1 deletion chain/substreams/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{data_source::*, EntityChanges, TriggerData, TriggerFilter, TriggersA
use anyhow::Error;
use graph::blockchain::client::ChainClient;
use graph::blockchain::{
BasicBlockchainBuilder, BlockIngestor, EmptyNodeCapabilities, NoopRuntimeAdapter,
BasicBlockchainBuilder, BlockIngestor, BlockTime, EmptyNodeCapabilities, NoopRuntimeAdapter,
};
use graph::components::store::DeploymentCursorTracker;
use graph::env::EnvVars;
Expand Down Expand Up @@ -57,6 +57,10 @@ impl blockchain::Block for Block {
fn parent_ptr(&self) -> Option<BlockPtr> {
None
}

fn timestamp(&self) -> BlockTime {
BlockTime::NONE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this type be called something like MaybeBlockTime or something just to indicate it may be none even though it's not an option?

I guess the substreams notion of blocks doesn't fit the aggregations model anyway

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NONE thing should really go away (maybe except for tests) I just put it there for the few cases where I wasn't sure what's happening (like substreams)

}
}

pub struct Chain {
Expand Down
22 changes: 20 additions & 2 deletions chain/substreams/src/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use anyhow::{anyhow, Error};
use graph::blockchain::block_stream::{
BlockStreamEvent, BlockStreamMapper, BlockWithTriggers, FirehoseCursor, SubstreamsError,
};
use graph::blockchain::BlockTime;
use graph::data::store::scalar::Bytes;
use graph::data::store::IdType;
use graph::data::value::Word;
use graph::data_source::CausalityRegion;
use graph::prelude::{async_trait, BigInt, BlockHash, BlockNumber, Logger, Value};
use graph::prelude::{BigDecimal, BlockPtr};
use graph::schema::InputSchema;
use graph::slog::error;
use graph::substreams::Clock;
use prost::Message;

Expand Down Expand Up @@ -41,15 +43,15 @@ impl BlockStreamMapper<Chain> for WasmBlockMapper {

async fn handle_substreams_block(
&self,
_logger: &Logger,
logger: &Logger,
clock: Clock,
cursor: FirehoseCursor,
block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
let Clock {
id,
number,
timestamp: _,
timestamp,
} = clock;

let block_ptr = BlockPtr {
Expand All @@ -59,8 +61,24 @@ impl BlockStreamMapper<Chain> for WasmBlockMapper {

let block_data = block.into_boxed_slice();

// `timestamp` is an `Option`, but it should always be set
let timestamp = match timestamp {
None => {
error!(logger,
"Substream block is missing a timestamp";
"cursor" => cursor.to_string(),
"number" => number,
);
return Err(anyhow!(
"Substream block is missing a timestamp at cursor {cursor}, block number {number}"
));
}
Some(ts) => BlockTime::since_epoch(ts.seconds, ts.nanos as u32),
};

Ok(BlockStreamEvent::ProcessWasmBlock(
block_ptr,
timestamp,
block_data,
self.handler.clone(),
cursor,
Expand Down
4 changes: 3 additions & 1 deletion core/src/subgraph/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::polling_monitor::{
use anyhow::{self, Error};
use bytes::Bytes;
use graph::{
blockchain::Blockchain,
blockchain::{BlockTime, Blockchain},
components::{
store::{DeploymentId, SubgraphFork},
subgraph::{HostMetrics, MappingError, RuntimeHost as _, SharedProofOfIndexing},
Expand Down Expand Up @@ -133,6 +133,7 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
&self,
logger: &Logger,
block_ptr: BlockPtr,
block_time: BlockTime,
block_data: Box<[u8]>,
handler: String,
mut state: BlockState<C>,
Expand Down Expand Up @@ -161,6 +162,7 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
.process_block(
logger,
block_ptr,
block_time,
block_data,
handler,
state,
Expand Down
Loading
Loading