Skip to content

Substreams development kit for Ethereum chains, contains Firehose Block model and helpers as well as utilities for Ethereum ABI encoding/decoding.

License

Notifications You must be signed in to change notification settings

streamingfast/substreams-ethereum

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Substreams Ethereum

Substreams development kit for Ethereum chains, contains Rust Firehose Block model and helpers as well as utilities for Ethereum ABI encoding/decoding.

Usage

[package]
name = "substreams-acme"
version = 0.1.2

[lib]
crate-type = ["cdylib"]

[dependencies]
substreams-ethereum = "0.6.0"

Development

We manually keep in sync the rendered Rust Firehose Block models with the actual Protocol Buffer definitions file found in sf-ethereum and we commit them to Git.

This means changes to Protobuf files must be manually re-generated and commit, see below for how to do it.

Regenerate Rust Firehose Block from Protobuf

./gen.sh

Caveats

ABI with Tuple

Tuples are now supported for ABI generated code. It does generate Rust unamed tuples for event/function that uses them. The notes below can be used if you prefer to generate "struct" for your tuple in events instead. You can expand the collapsed Instructions section below to get detailed instructions how you can "manually" generate the code.

Instructions

First step will be to modify a bit the ABI so decoding code is generated correctly. This initial generated code will then be copied over and modified to decode the tuple correctly.

The idea will be to "explode" the inner tuple into it's own "event" in the ABI, this will generate some code for the struct representing the tuple as well as the decoding code for the struct itself. We will then tweak this generated code to wire everything together.

From:

[
  {
    "anonymous": false,
    "inputs": [
      {
        "indexed": false,
        "internalType": "bytes32",
        "name": "orderHash",
        "type": "bytes32"
      },
      {
        "components": [
          {
            "internalType": "enum ItemType",
            "name": "itemType",
            "type": "uint8"
          },
          {
            "internalType": "uint256",
            "name": "amount",
            "type": "uint256"
          }
        ],
        "indexed": false,
        "internalType": "struct SpentItem[]",
        "name": "offer",
        "type": "tuple[]"
      }
    ],
    "name": "OrderFulfilled",
    "type": "event"
  }
]

We explode SpentItem to it's own type and replace the offer type tuple[] to address[] to make it compile correctly:

[
  {
    "anonymous": false,
    "inputs": [
      {
        "indexed": false,
        "internalType": "bytes32",
        "name": "orderHash",
        "type": "bytes32"
      },
      {
        "components": [
          {
            "internalType": "enum ItemType",
            "name": "itemType",
            "type": "uint8"
          },
          {
            "internalType": "uint256",
            "name": "amount",
            "type": "uint256"
          }
        ],
        "indexed": false,
        "internalType": "struct SpentItem[]",
        "name": "offer",
        "type": "address[]"
      }
    ],
    "name": "OrderFulfilled",
    "type": "event"
  },
  {
    "anonymous": false,
    "name": "SpentItem",
    "type": "event",
    "inputs": [
      {
        "internalType": "enum ItemType",
        "name": "itemType",
        "type": "uint8"
      },
      {
        "internalType": "uint256",
        "name": "amount",
        "type": "uint256"
      }
    ]
  }
]

Note No need to remove the components or change the internalType value, they are ignored.

Perform a cargo build using this modified ABI so that code is generated in src/abi/<file>.rs, it's wrong right now but we are going to copy it somewhere else and make it work.

Find the generated code for the OrderFulfilled event within the src/abi/<file>.rs and copy it over to a new file src/events.rs. You should copy over the pub struct OrderFulfilled block, the impl OrderFulfilled block and impl substreams_ethereum::Event for OrderFulfilled block, the SpentItem structure and the impl SpentItem block:

#[derive(Debug, Clone, PartialEq)]
pub struct OrderFulfilled {
    pub order_hash: [u8; 32usize],
    pub offer: Vec<Vec<u8>>,
}
impl OrderFulfilled {
    const TOPIC_ID: [u8; 32] = [
        227u8,
        56u8,
        222u8,
        32u8,
        39u8,
        120u8,
        0u8,
        226u8,
        120u8,
        84u8,
        168u8,
        160u8,
        171u8,
        38u8,
        80u8,
        66u8,
        198u8,
        237u8,
        193u8,
        186u8,
        154u8,
        14u8,
        209u8,
        73u8,
        102u8,
        185u8,
        47u8,
        163u8,
        179u8,
        98u8,
        194u8,
        244u8,
    ];
    pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        if log.topics.len() != 1usize {
            return false;
        }
        if log.data.len() < 96usize {
            return false;
        }
        return log.topics.get(0).expect("bounds already checked").as_ref()
            == Self::TOPIC_ID;
    }
    pub fn decode(
        log: &substreams_ethereum::pb::eth::v2::Log,
    ) -> Result<Self, String> {
        let mut values = ethabi::decode(
                &[
                    ethabi::ParamType::FixedBytes(32usize),
                    ethabi::ParamType::Array(
                        Box::new(ethabi::ParamType::Address),
                    ),
                ],
                log.data.as_ref(),
            )
            .map_err(|e| format!("unable to decode log.data: {:?}", e))?;
        values.reverse();
        Ok(Self {
            order_hash: {
                let mut result = [0u8; 32];
                let v = values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_fixed_bytes()
                    .expect(INTERNAL_ERR);
                result.copy_from_slice(&v);
                result
            },
            offer: values
                .pop()
                .expect(INTERNAL_ERR)
                .into_array()
                .expect(INTERNAL_ERR)
                .into_iter()
                .map(|inner| {
                    inner.into_address().expect(INTERNAL_ERR).as_bytes().to_vec()
                })
                .collect(),
        })
    }
}
impl substreams_ethereum::Event for OrderFulfilled {
    const NAME: &'static str = "OrderFulfilled";
    fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        Self::match_log(log)
    }
    fn decode(
        log: &substreams_ethereum::pb::eth::v2::Log,
    ) -> Result<Self, String> {
        Self::decode(log)
    }
}
#[derive(Debug, Clone, PartialEq)]
pub struct SpentItem {
    pub item_type: substreams::scalar::BigInt,
    pub amount: substreams::scalar::BigInt,
}
impl SpentItem {
    const TOPIC_ID: [u8; 32] = [
        18u8,
        7u8,
        103u8,
        62u8,
        30u8,
        101u8,
        94u8,
        85u8,
        209u8,
        209u8,
        166u8,
        82u8,
        139u8,
        137u8,
        197u8,
        45u8,
        11u8,
        224u8,
        230u8,
        74u8,
        27u8,
        234u8,
        238u8,
        52u8,
        150u8,
        245u8,
        214u8,
        202u8,
        230u8,
        104u8,
        138u8,
        22u8,
    ];
    pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        if log.topics.len() != 1usize {
            return false;
        }
        if log.data.len() != 64usize {
            return false;
        }
        return log.topics.get(0).expect("bounds already checked").as_ref()
            == Self::TOPIC_ID;
    }
    pub fn decode(
        log: &substreams_ethereum::pb::eth::v2::Log,
    ) -> Result<Self, String> {
        let mut values = ethabi::decode(
                &[
                    ethabi::ParamType::Uint(8usize),
                    ethabi::ParamType::Uint(256usize),
                ],
                log.data.as_ref(),
            )
            .map_err(|e| format!("unable to decode log.data: {:?}", e))?;
        values.reverse();
        Ok(Self {
            item_type: {
                let mut v = [0 as u8; 32];
                values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_uint()
                    .expect(INTERNAL_ERR)
                    .to_big_endian(v.as_mut_slice());
                substreams::scalar::BigInt::from_unsigned_bytes_be(&v)
            },
            amount: {
                let mut v = [0 as u8; 32];
                values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_uint()
                    .expect(INTERNAL_ERR)
                    .to_big_endian(v.as_mut_slice());
                substreams::scalar::BigInt::from_unsigned_bytes_be(&v)
            },
        })
    }
}
impl substreams_ethereum::Event for SpentItem {
    const NAME: &'static str = "SpentItem";
    fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        Self::match_log(log)
    }
    fn decode(
        log: &substreams_ethereum::pb::eth::v2::Log,
    ) -> Result<Self, String> {
        Self::decode(log)
    }
}

This src/events.rs file right now not compiled/included in the project because in Rust, a module is included only if it's "defined" somewhere, so let's define the module. In src/lib.rs, at the top of the file, add:

mod events

Let's start to modify our incorrect generated code to make it correct. First define INTERNAL_ERR constant because it's used normally, put it at top of the src/events.rs file:

const INTERNAL_ERR: &str = "decode event internal error";

Now a tricky part, we need to update the TOPIC_ID constant because it's wrong right now. If you already know the event ID (which is the topics #0), perfect. If you don't, you can compute it by taking the keccak256 hash of the event definition. You need the event name and its types to have it, in our case it's OrderFulfilled(bytes32,(uint8,uint256)[])

Warning The event definition must be without space nor extra punctuation, a single wrong character will make the whole event ID wrong.

You can use jq -r '.[] | select(.name == "OrderFulfilled") | .inputs[].type' | tr "\n" "," and jq -r '.[] | select(.name == "SpentItem") | .inputs[].type' | tr "\n" "," on the "modified" ABI file we just did to get the correct ordered types. Then assemble it correctly changing the second field of OrderFulfilled to be the tuple definition (uint8,uint256)[] (it's returned as address[] because of our modification).

Now that we have our event definition, we can compute the keccak256 hash. We use a CLI tool keccak-256sum to do it (installation instructions):

$ printf 'OrderFulfilled(bytes32,(uint8,uint256)[])' | keccak-256sum
e86f4727db138d4b9cb776888b1d2239562eafaa38dd110b7d5def7698ccfd41  -

So our event topic 0 is e86f4727db138d4b9cb776888b1d2239562eafaa38dd110b7d5def7698ccfd41. Now we just need to change the TOPIC_ID constant definition in impl OrderFulfilled definition to become:

const TOPIC_ID: [u8; 32] = hex_literal::hex!("e86f4727db138d4b9cb776888b1d2239562eafaa38dd110b7d5def7698ccfd41");

Now, within pub struct OrderFulfilled, change offer field (defined right now as offer: Vec<Vec<u8>>) which normally holds an array of tuple to it's correct value offer: Vec<SpentItem>:

pub struct OrderFulfilled {
    pub order_hash: [u8; 32usize],
    pub offer: Vec<SpentItem>,
}

Then find the event "token" definition within the pub fn decode(log: <type>) which is right now:

let mut values = ethabi::decode(
    &[
        ethabi::ParamType::FixedBytes(32usize),
        ethabi::ParamType::Array(Box::new(ethabi::ParamType::Address)),
    ],
    log.data.as_ref(),
)

Determine the field that is a tuple, in our case the second field and change it to be define as a ethabi::ParamType::Tuple type.

let mut values = ethabi::decode(
    &[
        ethabi::ParamType::FixedBytes(32usize),
        ethabi::ParamType::Array(Box::new(ethabi::ParamType::Tuple(vec![
            ethabi::ParamType::Uint(8usize),
            ethabi::ParamType::Uint(256usize),
        ]))),
    ],
    log.data.as_ref(),

Info The SpentItem::decode function has a let mut values definition variable right at the beginning of the function that list the correct element to put for the tuple, no need to manually define the list, simply copy it over.

Now still within OrderFulfilled function pub fn decode(log: <type>), find the offer: piece of code that does the actual decoding work, it looks right now like:

 ...
 },
 offer: values
            .pop()
            .expect(INTERNAL_ERR)
            .into_array()
            .expect(INTERNAL_ERR)
            .into_iter()
            .map(|inner| {
                inner
                    .into_address()
                    .expect(INTERNAL_ERR)
                    .as_bytes()
                    .to_vec()
            })
            .collect(),

Change it so it forwards its decoding to SpentItem structure:

 ...
 },
 offer: values
            .pop()
            .expect(INTERNAL_ERR)
            .into_array()
            .expect(INTERNAL_ERR)
            .into_iter()
            .map(|inner| {
                let fields = inner.into_tuple().expect(INTERNAL_ERR);
                SpentItem::decode(fields).expect(INTERNAL_ERR);
            })
            .collect(),

And the final modification within the impl OrderFulfilled structure is to modify slightly the pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool definition. It contains a code that ensure the log.data has a certain number of bytes, our modified ABI will produce the wrong validation code for log.data, so let's remove it:

pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
    if log.topics.len() != 1usize {
        return false;
    }
    if log.data.len() < 96usize {
        return false;
    }
    return log.topics.get(0).expect("bounds already checked").as_ref() == Self::TOPIC_ID;
}

Should become:

pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
    if log.topics.len() != 1usize {
        return false;
    }
    return log.topics.get(0).expect("bounds already checked").as_ref() == Self::TOPIC_ID;
}

Note Making the computation is too cumbersome to be performed manually, the topic count and validation of topic 0 value is enough to match only events you need.

Now, let's move our focus on the SpentItem implementation. Within the impl SpentItem block, delete:

  • TOPIC_ID constant
  • match_log function

And remove fully the impl substreams_ethereum::Event for SpentItem block. Last thing to do is to tweak the SpentItem::decode function by changing its current signature:

pub fn decode(log: &substreams_ethereum::pb::eth::v2::Log) -> Result<Self, String>

So that it accepts a Vec<ethabi::Token> instead and rename the variable to values as well as making it mutable:

pub fn decode(mut values: Vec<ethabi::Token>) -> Result<Self, String>

And finally, remove the previous let mut values definition in the function body that we have right now which looks like:

let mut values = ethabi::decode(
    &[
        ethabi::ParamType::Uint(8usize),
        ethabi::ParamType::Uint(256usize),
    ],
    log.data.as_ref(),
)
.map_err(|e| format!("unable to decode log.data: {:?}",

Note You must keep the values.reverse() part just below, it's important for proper functioning of the decoding code.

Now everything is done and you can use OrderFulfilled to decode your event that contains a tuple. The final code looks like that:

const INTERNAL_ERR: &str = "decode event internal error";
#[derive(Debug, Clone, PartialEq)]
pub struct OrderFulfilled {
    pub order_hash: [u8; 32usize],
    pub offer: Vec<SpentItem>,
}
impl OrderFulfilled {
    const TOPIC_ID: [u8; 32] =
        hex_literal::hex!("e86f4727db138d4b9cb776888b1d2239562eafaa38dd110b7d5def7698ccfd41");

    pub fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        if log.topics.len() != 1usize {
            return false;
        }
        return log.topics.get(0).expect("bounds already checked").as_ref() == Self::TOPIC_ID;
    }

    pub fn decode(log: &substreams_ethereum::pb::eth::v2::Log) -> Result<Self, String> {
        let mut values = ethabi::decode(
            &[
                ethabi::ParamType::FixedBytes(32usize),
                ethabi::ParamType::Array(Box::new(ethabi::ParamType::Tuple(vec![
                    ethabi::ParamType::Uint(8usize),
                    ethabi::ParamType::Uint(256usize),
                ]))),
            ],
            log.data.as_ref(),
        )
        .map_err(|e| format!("unable to decode log.data: {:?}", e))?;
        values.reverse();
        Ok(Self {
            order_hash: {
                let mut result = [0u8; 32];
                let v = values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_fixed_bytes()
                    .expect(INTERNAL_ERR);
                result.copy_from_slice(&v);
                result
            },
            offer: values
                .pop()
                .expect(INTERNAL_ERR)
                .into_array()
                .expect(INTERNAL_ERR)
                .into_iter()
                .map(|inner| {
                    let fields = inner.into_tuple().expect(INTERNAL_ERR);
                    SpentItem::decode(fields).unwrap()
                })
                .collect(),
        })
    }
}
impl substreams_ethereum::Event for OrderFulfilled {
    const NAME: &'static str = "OrderFulfilled";
    fn match_log(log: &substreams_ethereum::pb::eth::v2::Log) -> bool {
        Self::match_log(log)
    }
    fn decode(log: &substreams_ethereum::pb::eth::v2::Log) -> Result<Self, String> {
        Self::decode(log)
    }
}
#[derive(Debug, Clone, PartialEq)]
pub struct SpentItem {
    pub item_type: substreams::scalar::BigInt,
    pub amount: substreams::scalar::BigInt,
}
impl SpentItem {
    pub fn decode(mut values: Vec<ethabi::Token>) -> Result<Self, String> {
        values.reverse();
        Ok(Self {
            item_type: {
                let mut v = [0 as u8; 32];
                values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_uint()
                    .expect(INTERNAL_ERR)
                    .to_big_endian(v.as_mut_slice());
                substreams::scalar::BigInt::from_unsigned_bytes_be(&v)
            },
            amount: {
                let mut v = [0 as u8; 32];
                values
                    .pop()
                    .expect(INTERNAL_ERR)
                    .into_uint()
                    .expect(INTERNAL_ERR)
                    .to_big_endian(v.as_mut_slice());
                substreams::scalar::BigInt::from_unsigned_bytes_be(&v)
            },
        })
    }
}

If you struggle with something, reach out to us on Discord and we are going to help you out.

About

Substreams development kit for Ethereum chains, contains Firehose Block model and helpers as well as utilities for Ethereum ABI encoding/decoding.

Resources

License

Stars

Watchers

Forks

Packages

No packages published