Skip to content

Commit

Permalink
More datasets (#64)
Browse files Browse the repository at this point in the history
* Decode logs based on event signature

* Remove examples

* event-signature flag, add to schema even in event of no logs in batch

* fmt

* fix tests, doc comments

* apply clippy lints + formatter

* initial commit

* clean merged code

---------

Co-authored-by: Erik Reppel <erik@zora.co>
  • Loading branch information
sslivkoff and Erik Reppel authored Sep 16, 2023
1 parent a2941ce commit 5e4c4e9
Show file tree
Hide file tree
Showing 39 changed files with 3,499 additions and 249 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ governor = "0.5.1"
hex = "0.4.3"
indexmap = "2.0.0"
indicatif = "0.17.5"
lazy_static = "1.4.0"
polars = { version = "0.32.1", features = [
"parquet",
"string_encoding",
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ Many `cryo` cli options will affect output schemas by adding/removing columns or
#### Schema Design Guide

An attempt is made to ensure that the dataset schemas conform to a common set of design guidelines:
- By default, rows should contain enough information be order-able
- By default, rows should contain enough information in their columns to be order-able (unless the rows do not have an intrinsic order)
- Columns should be named by their JSON-RPC or ethers.rs defaults, except in cases where a much more explicit name is available
- To make joins across tables easier, a given piece of information should use the same datatype and column name across tables when possible
- Large ints such as `u256` should allow multiple conversions. A `value` column of type `u256` should allow: `value_binary`, `value_string`, `value_f32`, `value_f64`, `value_u32`, `value_u64`, and `value_d128`
- By default, columns related to non-identifying cryptographic signatures are omitted by default. For example, `state_root` of a block or `v`/`r`/`s` of a transaction
- Integer values that can never be negative should be stored as unsigned integers
- Every table should allow an optional `chain_id` column so that data from multiple chains can be easily stored in the same table.

Standard types across tables:
- `block_number`: `u32`
Expand Down
34 changes: 31 additions & 3 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,37 @@ pub struct Args {
#[arg(long, help_heading = "Output Options")]
pub no_report: bool,

/// Address
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub address: Option<Vec<String>>,

/// To Address
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..), value_name="TO")]
pub to_address: Option<Vec<String>>,

/// From Address
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..), value_name="FROM")]
pub from_address: Option<Vec<String>>,

/// [eth_calls] Call data to use for eth_calls
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub call_data: Option<Vec<String>>,

/// [eth_calls] Function to use for eth_calls
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub function: Option<Vec<String>>,

/// [eth_calls] Inputs to use for eth_calls
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub inputs: Option<Vec<String>>,

/// [slots] Slots
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub slots: Option<Vec<String>>,

/// [logs] filter logs by contract address
#[arg(long, help_heading = "Dataset-specific Options")]
pub contract: Option<String>,
pub contract: Option<Vec<String>>,

/// [logs] filter logs by topic0
#[arg(long, visible_alias = "event", help_heading = "Dataset-specific Options")]
Expand All @@ -178,14 +206,14 @@ pub struct Args {
/// [logs] Blocks per request
#[arg(
long,
value_name = "BLOCKS",
value_name = "SIZE",
default_value_t = 1,
help_heading = "Dataset-specific Options"
)]
pub inner_request_size: u64,

/// [logs] event signature to parse
#[arg(long, help_heading = "Dataset-specific Options")]
#[arg(long, value_name = "SIGNATURE", help_heading = "Dataset-specific Options")]
pub event_signature: Option<String>,
}

Expand Down
12 changes: 10 additions & 2 deletions crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use ethers::prelude::*;
use polars::prelude::*;
use std::collections::HashMap;

use cryo_freeze::{BlockChunk, Chunk, ChunkData, Fetcher, ParseError, Subchunk};
use cryo_freeze::{BlockChunk, Chunk, ChunkData, Datatype, Fetcher, ParseError, Subchunk, Table};

use crate::args::Args;

Expand Down Expand Up @@ -123,8 +124,15 @@ async fn postprocess_block_chunks<P: JsonRpcClient>(
pub(crate) async fn get_default_block_chunks<P: JsonRpcClient>(
args: &Args,
fetcher: Arc<Fetcher<P>>,
schemas: &HashMap<Datatype, Table>,
) -> Result<Vec<(Chunk, Option<String>)>, ParseError> {
let block_chunks = parse_block_inputs(&String::from(r"0:latest"), &fetcher).await?;
let default_blocks = schemas
.keys()
.map(|datatype| datatype.dataset().default_blocks())
.find(|blocks| !blocks.is_none())
.unwrap_or(Some("0:latest".to_string()))
.unwrap();
let block_chunks = parse_block_inputs(&default_blocks, &fetcher).await?;
postprocess_block_chunks(block_chunks, args, fetcher).await
}

Expand Down
7 changes: 3 additions & 4 deletions crates/cli/src/parse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
mod args;
mod blocks;
mod file_output;
mod parse_utils;
mod query;
mod schemas;
mod source;
mod transactions;

pub use args::*;
// use blocks::*;
// use file_output::*;
// use query::*;
// use source::*;
use schemas::*;
83 changes: 83 additions & 0 deletions crates/cli/src/parse/parse_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use cryo_freeze::ParseError;
use std::collections::HashMap;

pub(crate) fn hex_string_to_binary(hex_string: &String) -> Result<Vec<u8>, ParseError> {
let hex_string = hex_string.strip_prefix("0x").unwrap_or(hex_string);
hex::decode(hex_string)
.map_err(|_| ParseError::ParseError("could not parse data as hex".to_string()))
}

pub(crate) fn hex_strings_to_binary(hex_strings: &[String]) -> Result<Vec<Vec<u8>>, ParseError> {
hex_strings
.iter()
.map(|x| {
hex::decode(x.strip_prefix("0x").unwrap_or(x))
.map_err(|_| ParseError::ParseError("could not parse data as hex".to_string()))
})
.collect::<Result<Vec<_>, _>>()
}

#[derive(Eq, PartialEq, Hash)]
pub(crate) enum BinaryInputList {
Explicit,
ParquetColumn(String, String),
}

type ParsedBinaryArg = HashMap<BinaryInputList, Vec<Vec<u8>>>;

/// parse binary argument list
/// each argument can be a hex string or a parquet column reference
/// each parquet column is loaded into its own list, hex strings loaded into another
pub(crate) fn parse_binary_arg(
inputs: &[String],
default_column: &str,
) -> Result<ParsedBinaryArg, ParseError> {
let mut parsed = HashMap::new();

// separate into files vs explicit
let (files, hex_strings): (Vec<&String>, Vec<&String>) =
inputs.iter().partition(|tx| std::path::Path::new(tx).exists());

// files columns
for path in files {
let reference = parse_file_column_reference(path, default_column)?;
let values = cryo_freeze::read_binary_column(&reference.path, &reference.column)
.map_err(|_e| ParseError::ParseError("could not read input".to_string()))?;
let key = BinaryInputList::ParquetColumn(reference.path, reference.column);
parsed.insert(key, values);
}

// explicit binary strings
if !hex_strings.is_empty() {
let hex_strings: Vec<String> = hex_strings.into_iter().cloned().collect();
let binary_vec = hex_strings_to_binary(&hex_strings)?;
parsed.insert(BinaryInputList::Explicit, binary_vec);
};

Ok(parsed)
}

struct FileColumnReference {
path: String,
column: String,
}

fn parse_file_column_reference(
path: &str,
default_column: &str,
) -> Result<FileColumnReference, ParseError> {
let (path, column) = if path.contains(':') {
let pieces: Vec<&str> = path.split(':').collect();
if pieces.len() == 2 {
(pieces[0], pieces[1])
} else {
return Err(ParseError::ParseError("could not parse path column".to_string()))
}
} else {
(path, default_column)
};

let parsed = FileColumnReference { path: path.to_string(), column: column.to_string() };

Ok(parsed)
}
Loading

0 comments on commit 5e4c4e9

Please sign in to comment.