Skip to content

PikaZ76/chainstorage

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Table of Contents generated with DocToc

Overview

ChainStorage is the foundational component of ChainNode and Chainsformer. These projects represent the crypto data processing suites (code name ChainStack) widely adopted within Coinbase.

ChainStorage is inspired by the Change Data Capture paradigm, commonly used in the big data world. It continuously replicates the changes (i.e. new blocks) on the blockchain, and acts like a distributed file system for the blockchain.

It aims to provide an efficient and flexible way to access the on-chain data:

  • Efficiency is optimized by storing data in horizontally-scalable storage with a key-value schema. At Coinbase's production environment, ChainStorage can serve up to 1,500 blocks per second, enabling teams to build various indexers cost-effectively.
  • Flexibility is improved by decoupling data interpretation from data ingestion. ChainStorage stores the raw data, and the parsing is deferred until the data is consumed. The parsers are shipped as part of the SDK and run on the consumer side. Thanks to the ELT (Extract, Load, Transform) architecture, we can quickly iterate on the parser without ingesting the data from the blockchain again.

Quick Start

This section will guide you through setting up ChainStorage on your local machine for development.

Prerequisites

  1. Go (version 1.22):
    brew install go@1.22
    brew unlink go
    brew link go@1.22
    Verify your Go installation:
    go version
  2. Protocol Buffer Compiler (protobuf): Used for code generation based on .proto files.
    brew install protobuf@29
    brew unlink protobuf
    brew link protobuf@29
    Verify your installation:
    protoc --version

Initial Setup

This command (run only once) installs necessary Go tools for development, like linters and code generators.

make bootstrap

Build the Project

This command compiles the ChainStorage Go programs.

make build

You'll run this command whenever you make changes to the Go source code.

Generate Protocol Buffers

ChainStorage uses Protocol Buffers to define data structures. This command generates Go code from those definitions.

make proto

You'll need to run this if you change any .proto files (usually located in the protos/ directory).

Command Line

the cmd/admin tool consists of multiple sub command.

admin is a utility for managing chainstorage

Usage:
  admin [command]

Available Commands:
  backfill    Backfill a block
  block       Fetch a block
  completion  Generate the autocompletion script for the specified shell
  event       tool for managing events storage
  help        Help about any command
  sdk
  validator
  workflow    tool for managing chainstorage workflows

Flags:
      --blockchain string   blockchain full name (e.g. ethereum)
      --env string          one of [local, development, production]
  -h, --help                help for admin
      --meta                output metadata only
      --network string      network name (e.g. mainnet)
      --out string          output filepath: default format is json; use a .pb extension for protobuf format
      --parser string       parser type: one of native, mesh, or raw (default "native")

Use "admin [command] --help" for more information about a command.

All sub-commands require the blockchain, env, network flags.

Block Command

This command allows you to fetch and inspect individual blocks from a specified blockchain and network.

Usage Example:

Fetch block #46147 from Ethereum mainnet, using your local configuration:

go run ./cmd/admin/main.go block --blockchain ethereum --network mainnet --env local --height 46147
  • block: The command to fetch block data.
  • --blockchain ethereum --network mainnet --env local: These flags specify the target (Ethereum mainnet) and the configuration environment (local).
  • --height 46147: The specific block number you want to retrieve.

You can also fetch blocks from other supported blockchains and networks by changing the flag values:

Fetch a block from Ethereum Goerli testnet:

# Assuming Goerli is configured and data is available
go run ./cmd/admin/main.go block --blockchain ethereum --network goerli --env local --height 12345

Backfill Command (development)

Backfill a block from BSC mainnet:

go run ./cmd/admin backfill --blockchain bsc --network mainnet --env development --start-height 10408613 --end-height 10408614

Stream Command

Stream block events from a specific event sequence id:

go run ./cmd/admin sdk stream --blockchain ethereum --network mainnet --env development --sequence 2228575 --event-tag 1

Testing

Unit Test

# Run everything
make test

# Run the blockchain package only
make test TARGET=internal/blockchain/...

Integration Test

# Run everything
make integration

# Run the storage package only
make integration TARGET=internal/storage/...

# If test class implemented with test suite, add suite name before the test name
make integration TARGET=internal/blockchain/... TEST_FILTER=TestIntegrationPolygonTestSuite/TestPolygonGetBlock

Functional Test

Before running the functional test, you need to provide the endpoint group config by creating secrets.yml. See here for more details.

# Run everything
make functional

# Run the workflow package only
make functional TARGET=internal/workflow/...

# Run TestIntegrationEthereumGetBlock only
make functional TARGET=internal/blockchain/... TEST_FILTER='TestIntegrationEthereumGetBlock$$'

# If test class implemented with test suite, add suite name before the test name
make functional TARGET=internal/blockchain/... TEST_FILTER=TestIntegrationPolygonTestSuite/TestPolygonGetBlock

Configuration

Configuration in ChainStorage tells the system:

  1. Which blockchain to connect to (like Ethereum, Bitcoin, etc.)
  2. How to connect to that blockchain (which nodes/servers to use)
  3. Where to store the data it collects
  4. How to process and manage the data

Dependency overview

To understand the structure and elements of ChainStorage's config, it's important to comprehend its dependencies.

ChainStorage needs several services to work properly:

  1. Blockchain Nodes: These are servers that maintain a copy of the blockchain. ChainStorage connects to these to get blockchain data.

    • Example: An Ethereum node that provides information about Ethereum transactions and blocks
  2. Storage Systems:

    • Blob storage - current implementation is on AWS S3, and the local service is provied by localstack
    • Key value storage - current implemnentation is based on dynamodb and the local service is provied by localstack
    • Dead Letter queue - current implementation is on SQS and the local service is provied by localstack
  3. Workflow Engine (Temporal):Temporal is a workflow engine that orchestrates the data ingestion workflow. It calls ChainStorage service endpoint to complete various of tasks.

Template location and generated config

The config template directory is in config_templates/config, which make config reads this directory and generates the config into the config/chainstorage directory.

Creating New Configurations

Every new asset in ChainStorage consists of ChainStorage configuration files. These configuration files are generated from .template.yml template files using:

make config

these templates will be under a directory dedicated to storing the config templates in a structure that mirrors the final config structure of the config directories. All configurations from this directory will be generated within the final respective config directories

Environment Variables

ChainStorage depends on the following environment variables to resolve the path of the configuration. The directory structure is as follows: config/{namespace}/{blockchain}/{network}/{environment}.yml.

  • CHAINSTORAGE_NAMESPACE: A {namespace} is logical grouping of several services, each of which manages its own blockchain and network. The default namespace is chainstorage. To deploy a different namespace, set the env var to the name of a subdirectory of ./config.
  • CHAINSTORAGE_CONFIG: This env var, in the format of {blockchain}-{network}, determines the blockchain and network managed by the service. The naming is defined in c3/common.
  • CHAINSTORAGE_ENVIRONMENT: This env var controls the {environment} in which the service is deployed. Possible values include production , development, and local (which is also the default value).

Template Format and Inheritance

Configuration templates are composable and inherit configuration properties from "parent templates", which can be defined in base.template.yml, local.template.yml, development.template.yml, and production.template.yml. These parent templates are merged into the final blockchain and network specific base.template.yml, local.template.yml, development.template.yml, production.template.yml configurations respectively.

In the following example, config/chainstorage/ethereum/mainnet/base.yml inherits from config_templates/base.template.yml and config_templates/chainstorage/ethereum/mainnet/base.template.yml, with the latter taking precedence over the former.

config
  chainstorage
    ethereum
      mainnet
        base.yml
        development.yml
        local.yml
        production.yml
config_templates
  chainstorage
    ethereum
      mainnet
        base.template.yml
        development.template.yml
        local.template.yml
        production.template.yml
    base.template.yml
    development.template.yml
    local.template.yml
    production.template.yml

The template language supports string substitution for the Config-Name and Environment using the {{, }} tags.

Example:

foo: {{blockchain}}-{{network}}-{{environment}}

The blockchain, {{blockchain}}, network, {{network}}, and environment, {{environment}} template variables are derived from the directory and file naming schemes associated with cloud and ChainStorage configurations.

Endpoint Group

Endpoint group is an abstraction for one or more JSON-RPC endpoints. EndpointProvider uses the endpoint_group config to implement client-side routing to the node provider.

ChainStorage utilizes two endpoint groups to speed up data ingestion:

  • master: This endpoint group is used to resolve the canonical chain and determine what blocks to ingest next. Typically, sticky session is turned on for this group to ensure stronger data consistency between the requests.
  • slave: This endpoint group is used to ingest the data from the blockchain. During data ingestion, the new blocks are ingested in parallel and out of order. Typically, the endpoints are selected in a round-robin fashion, but you may increase the weights to send more traffic to certain endpoints.

If your node provider, e.g. QuickNode, already has built-in load balancing, your endpoint group may contain only one endpoint, as illustrated by the following configuration:

chain:
  client:
    master:
      endpoint_group: |
        {
          "endpoints": [
            {
              "name": "quicknode-foo-bar-sticky",
              "url": "https://foo-bar.matic.quiknode.pro/****",
              "weight": 1
            }
          ],
          "sticky_session": {
            "header_hash": "x-session-hash"
          }
        }
    slave:
      endpoint_group: |
        {
          "endpoints": [
            {
              "name": "quicknode-foo-bar-round-robin",
              "url": "https://foo-bar.matic.quiknode.pro/****",
              "weight": 1
            }
          ]
        }

Overriding the Configuration

You can override configurations in two ways:

  1. Environment Variables: You may override any configuration using an environment variable. The environment variable should be prefixed with "CHAINSTORAGE_". For nested dictionary, use underscore to separate the keys.

For example, you may override the endpoint group config at runtime by injecting the following environment variables:

  • master: CHAINSTORAGE_CHAIN_CLIENT_MASTER_ENDPOINT_GROUP
  • slave: CHAINSTORAGE_CHAIN_CLIENT_SLAVE_ENDPOINT_GROUP

Security Best Practice - PostgreSQL Credentials: For sensitive data like database passwords, always use environment variables instead of hardcoding them in config files:

# PostgreSQL credentials (never put these in config files!)
export CHAINSTORAGE_AWS_POSTGRES_USER="your_username"
export CHAINSTORAGE_AWS_POSTGRES_PASSWORD="your_secure_password"

# Storage type configuration
export CHAINSTORAGE_STORAGE_TYPE_META="POSTGRES"
  1. secrets.yml: Alternatively, you may override the configuration by creating secrets.yml within the same directory. Its attributes will be merged into the runtime configuration and take the highest precedence. Note that this file may contain credentials and is excluded from check-in by .gitignore.

Example config/chainstorage/ethereum/mainnet/.secrets.yml:

storage_type:
  meta: POSTGRES
aws:
  postgres:
    user: your_username
    password: your_secure_password

Development

Running Server

Start the dockers by the docker-compose file from project root folder:

make localstack

If you have developed ChainStorage before locally with previous docker compose file and got below error message

nc: bad address 'postgresql'

Please remove existing ChainStorage container and reran make localstack

The next step is to start the server locally:

# Ethereum Mainnet
# Use aws local stack
make server

# If want to start testnet (goerli) server
# Use aws local stack
make server CHAINSTORAGE_CONFIG=ethereum_goerli

Running with PostgreSQL

ChainStorage supports PostgreSQL as an alternative to DynamoDB for metadata storage. Here's how to set it up:

1. Start PostgreSQL Database

You can use Docker to run PostgreSQL locally:

# Start PostgreSQL container
docker run --name chainstorage-postgres \
  -e POSTGRES_USER=temporal \
  -e POSTGRES_PASSWORD=temporal \
  -e POSTGRES_DB=postgres \
  -p 5432:5432 \
  -d postgres:13

Or add it to your existing docker-compose setup.

2. Configure Meta Storage Type

Create or modify your local config to use PostgreSQL instead of DynamoDB. You have two options:

Option A: Create a local secrets file (recommended for development)

Create config/chainstorage/{blockchain}/{network}/.secrets.yml (e.g., config/chainstorage/ethereum/mainnet/.secrets.yml):

storage_type:
  meta: POSTGRES

Option B: Set via environment variable

export CHAINSTORAGE_STORAGE_TYPE_META=POSTGRES

3. Set PostgreSQL Credentials

Since PostgreSQL credentials should not be hardcoded in config files, set them via environment variables:

# PostgreSQL connection details
export CHAINSTORAGE_AWS_POSTGRES_USER="temporal"
export CHAINSTORAGE_AWS_POSTGRES_PASSWORD="temporal"
export CHAINSTORAGE_AWS_POSTGRES_HOST="localhost"
export CHAINSTORAGE_AWS_POSTGRES_PORT="5432"
export CHAINSTORAGE_AWS_POSTGRES_SSL_MODE="require"

4. Run the Server

Now start the server with PostgreSQL configuration:

# Method 1: Using exported environment variables
make server

# Method 2: Setting environment variables inline
CHAINSTORAGE_STORAGE_TYPE_META=POSTGRES \
CHAINSTORAGE_AWS_POSTGRES_USER="temporal" \
CHAINSTORAGE_AWS_POSTGRES_PASSWORD="temporal" \
make server

PostgreSQL Configuration Reference

The following environment variables can be used to configure PostgreSQL:

Environment Variable Config Path Description Default
CHAINSTORAGE_AWS_POSTGRES_HOST aws.postgres.host PostgreSQL hostname localhost
CHAINSTORAGE_AWS_POSTGRES_PORT aws.postgres.port PostgreSQL port 5432
CHAINSTORAGE_AWS_POSTGRES_USER aws.postgres.user PostgreSQL username (required)
CHAINSTORAGE_AWS_POSTGRES_PASSWORD aws.postgres.password PostgreSQL password (required)
CHAINSTORAGE_AWS_POSTGRES_DATABASE aws.postgres.database Database name chainstorage_{blockchain}_{network}
CHAINSTORAGE_AWS_POSTGRES_SSL_MODE aws.postgres.ssl_mode SSL mode require
CHAINSTORAGE_AWS_POSTGRES_MAX_CONNECTIONS aws.postgres.max_connections Maximum connection pool size 25
CHAINSTORAGE_AWS_POSTGRES_MIN_CONNECTIONS aws.postgres.min_connections Minimum connection pool size 5
CHAINSTORAGE_AWS_POSTGRES_CONNECT_TIMEOUT aws.postgres.connect_timeout Connection establishment timeout 30s
CHAINSTORAGE_AWS_POSTGRES_STATEMENT_TIMEOUT aws.postgres.statement_timeout Statement/transaction timeout 60s
CHAINSTORAGE_STORAGE_TYPE_META storage_type.meta Meta storage type DYNAMODB

Database Schema

ChainStorage will automatically create the necessary database schema and run migrations when it starts up. The database will contain tables for:

  • block_metadata - Block metadata and headers
  • canonical_blocks - Canonical chain state
  • block_events - Blockchain event log

PostgreSQL Setup

ChainStorage supports PostgreSQL as an alternative to DynamoDB for metadata storage with role-based access for enhanced security.

Local Development

Quick Start:

# Start PostgreSQL with automatic database initialization
docker-compose -f docker-compose-local-dev.yml up -d chainstorage-postgres

This automatically creates:

  • Shared chainstorage_worker and chainstorage_server roles
  • Databases for all supported networks (ethereum_mainnet, bitcoin_mainnet, etc.)
  • Proper permissions (worker: read-write, server: read-only)

Default Credentials:

  • Worker: chainstorage_worker / worker_password
  • Server: chainstorage_server / server_password

Manual Setup:

chainstorage admin setup-postgres \
  --blockchain ethereum \
  --network mainnet \
  --env local \
  --master-user postgres \
  --master-password postgres \
  --worker-password worker_password \
  --server-password server_password

Production/Development Setup

In production, databases are initialized using the db-init command:

# Connect to admin pod
kubectl exec -it deploy/chainstorage-admin-dev-console -c chainstorage-admin -- /bin/bash

# Initialize database for ethereum-mainnet
./admin db-init --blockchain ethereum --network mainnet --env dev

The db-init command:

  1. Reads master credentials from environment variables (injected by Kubernetes)
  2. Fetches network-specific credentials from AWS Secrets Manager (chainstorage/db-creds/{env})
  3. Creates the database (e.g., chainstorage_ethereum_mainnet)
  4. Creates network-specific users with passwords from the secret
  5. Grants appropriate permissions

Database Naming Convention

Databases follow the pattern: chainstorage_{blockchain}_{network}

Examples:

  • chainstorage_ethereum_mainnet
  • chainstorage_bitcoin_mainnet
  • chainstorage_polygon_testnet

Note: Hyphens in blockchain/network names are replaced with underscores.

Local Development Setup

Complete Local Environment

Start the full local development stack:

# Start all services (PostgreSQL, Temporal, LocalStack)
make localstack

# Load environment variables
source scripts/postgres-roles-local.env

Available Commands

Database Operations:

# Set up PostgreSQL database and roles for a new network
go run ./cmd/admin setup-postgres \
  --blockchain ethereum \
  --network mainnet \
  --env local \
  --master-user postgres \
  --master-password postgres \
  --host localhost \
  --port 5433

# Initialize databases from AWS Secrets Manager (production)
go run ./cmd/admin db-init \
  --secret-name chainstorage/db-init/prod \
  --aws-region us-east-1

# Migrate data from DynamoDB to PostgreSQL
chainstorage admin migrate-dynamodb-to-postgres \
  --blockchain ethereum \
  --network mainnet \
  --env local \
  --start-height 1000000 \
  --end-height 1001000

Command Reference

Command Description Example
setup-postgres Create database and roles setup-postgres --master-user postgres --master-password postgres
db-init Initialize from AWS Secrets Manager db-init --blockchain ethereum --network mainnet --env dev
migrate-dynamodb-to-postgres Migrate data from DynamoDB to PostgreSQL migrate-dynamodb-to-postgres --start-height 1000000 --end-height 1001000

AWS localstack

Check S3 files: You can checkout the config from config/chainstorage/{{Blockchain}}/{{network}}/{{evironment}} for the value of S3 bucket name, dynamoDB tables, and SQS name.

aws s3 --no-sign-request --region local --endpoint-url http://localhost:4566 ls --recursive example-chainstorage-ethereum-mainnet-dev/

Check DynamoDB rows:

aws dynamodb --no-sign-request --region local --endpoint-url http://localhost:4566 scan --table-name example_chainstorage_blocks_ethereum_mainnet

Check DLQ:

aws sqs --no-sign-request --region local --endpoint-url http://localhost:4566/000000000000/example_chainstorage_blocks_ethereum_mainnet_dlq receive-message --queue-url "http://localhost:4566/000000000000/example_chainstorage_blocks_ethereum_mainnet_dlq" --max-number-of-messages 10 --visibility-timeout 2

Temporal Workflow

Open Temporal UI in a browser by entering the URL: http://localhost:8080/namespaces/chainstorage-ethereum-mainnet/workflows

Start the backfill workflow:

go run ./cmd/admin workflow start --workflow backfiller --input '{"StartHeight": 11000000, "EndHeight": 11000100, "NumConcurrentExtractors": 24}' --blockchain ethereum --network mainnet --env local

Start the benchmarker workflow:

go run ./cmd/admin workflow start --workflow benchmarker --input '{"StartHeight": 1, "EndHeight": 12000000, "NumConcurrentExtractors": 24, "StepSize":1000000, "SamplesToTest":500}' --blockchain ethereum --network mainnet --env local

Start the monitor workflow:

go run ./cmd/admin workflow start --workflow monitor --blockchain ethereum --network mainnet --env local --input '{"StartHeight": 11000000}'

Start the poller workflow:

go run ./cmd/admin workflow start --workflow poller --input '{"Tag": 0, "MaxBlocksToSync": 100, "Parallelism":4}' --blockchain ethereum --network mainnet --env local

NOTE: the recommended value for "parallelism" depend on the capacity of your node provider. If you are not sure what value should be used, just drop it from the command.

Start the streamer workflow:

go run ./cmd/admin workflow start --workflow streamer --input '{}' --blockchain ethereum --network goerli --env local

Start the migrator workflow (event-driven migration from DynamoDB to PostgreSQL):

# Migrate events by sequence range
go run ./cmd/admin workflow start --workflow migrator --input '{"StartEventSequence": 1, "EndEventSequence": 1000, "Tag": 2, "EventTag": 3, "BatchSize": 500, "Parallelism": 2, "CheckpointSize": 10000}' --blockchain ethereum --network mainnet --env local

# Auto-resume from last migrated position
go run ./cmd/admin workflow start --workflow migrator --input '{"StartEventSequence": 0, "EndEventSequence": 100000, "Tag": 2, "EventTag": 3, "AutoResume": true}' --blockchain ethereum --network mainnet --env local

# Continuous sync mode with auto-resume
go run ./cmd/admin workflow start --workflow migrator --input '{"StartEventSequence": 0, "EndEventSequence": 0, "Tag": 2, "EventTag": 3, "AutoResume": true, "ContinuousSync": true, "BatchSize": 500,"Parallelism": 2, "CheckpointSize": 10000}' --blockchain ethereum --network mainnet --env local

# Custom batch size and parallelism for large migrations
go run ./cmd/admin workflow start --workflow migrator --input '{"StartEventSequence": 1000000, "EndEventSequence": 2000000, "Tag": 1, "EventTag": 0, "BatchSize": 10000, "Parallelism": 16, "CheckpointSize": 100000}' --blockchain ethereum --network mainnet --env local

Note: The migrator uses an event-driven architecture where events are fetched by sequence number and blocks are extracted from BLOCK_ADDED events. This ensures data consistency and proper handling of blockchain reorganizations.

Start the cross validator workflow:

go run ./cmd/admin workflow start --workflow cross_validator --input '{"StartHeight": 15500000, "Tag": 0}' --blockchain ethereum --network mainnet --env local

Start the event backfiller workflow:

go run ./cmd/admin workflow start --workflow event_backfiller --input '{"Tag": 0, "EventTag": 0, "StartSequence": 1000, "EndSequence": 2000}' --blockchain ethereum --network mainnet --env local

Start the replicator workflow:

go run ./cmd/admin workflow start --workflow replicator --input '{"Tag": 0, "StartHeight": 1000000, "EndHeight": 1001000}' --blockchain ethereum --network mainnet --env local

Stop the monitor workflow:

go run ./cmd/admin workflow stop --workflow monitor --blockchain ethereum --network mainnet --env local

Stop a versioned streamer workflow:

go run ./cmd/admin workflow stop --workflow streamer --blockchain ethereum --network mainnet --env local --workflowID {workflowID}

Using Temporal CLI to check the status of the workflow:

brew install tctl

tctl --address localhost:7233 --namespace chainstorage-ethereum-mainnet workflow show --workflow_id workflow.backfiller

Failover

Nodes Failover

ChainStorage supports nodes failover feature to mitigate the issue from nodes which may impact our SLA. When primary clusters are down, we can choose to switch over to failover clusters to mitigate incidents, instead of waiting for nodes get fully recovered.

Check this comment to get more details of the primary/secondary cluster definition.

To check if failover clusters are provided, please go to config service and check the endpoints configurations.

Failover in Workflows

Both Backfiller and Poller workflows provide failover feature without updating configs in the config service which requires approvals as well as redeployment.

To use failover clusters for those two workflows, you simply need to set the Failover workflow param as true when you trigger it.

Note: By default, the Failover workflow param should be set as false, which means the primary clusters should always be first choice. If you intend to use failover clusters, please update endpoints configs in config service instead.

Automatic Failover in Poller Workflow

Poller workflow provides an automatic failover mechanism so that we don't need to manually restart workflows with Failover param.

This feature is guarded by the failover_enabled configuration. Once this feature is enabled, when the workflow execution using primary clusters fails, it will automatically trigger a new workflow run with Failover=true. Implementation Code.

Ref: Temporal Continue-As-New

Checking Workflow Statuses

Install tctl, it is a command-line tool that you can use to interact with a Temporal cluster. More info can be found here: https://docs.temporal.io/tctl/

brew install tctl

APIs

APIs List

Supported APIs List

# local
grpcurl --plaintext localhost:9090 coinbase.chainstorage.ChainStorage/GetLatestBlock
grpcurl --plaintext -d '{"start_height": 0, "end_height": 10}' localhost:9090 coinbase.chainstorage.ChainStorage/GetBlockFilesByRange
grpcurl --plaintext -d '{"sequence_num": 2223387}' localhost:9090 coinbase.chainstorage.ChainStorage/StreamChainEvents
grpcurl --plaintext -d '{"initial_position_in_stream": "EARLIEST"}' localhost:9090 coinbase.chainstorage.ChainStorage/StreamChainEvents
grpcurl --plaintext -d '{"initial_position_in_stream": "LATEST"}' localhost:9090 coinbase.chainstorage.ChainStorage/StreamChainEvents
grpcurl --plaintext -d '{"initial_position_in_stream": "13222054"}' localhost:9090 coinbase.chainstorage.ChainStorage/StreamChainEvents

SDK

Chainstorage also provides SDK, and you can find supported methods here

Note:

  • GetBlocksByRangeWithTag is not equivalent to the batch version of GetBlockWithTag since you don't have a way to specify the block hash. So when you use GetBlocksByRangeWithTag and if it goes beyond the current tip of chain due to reorg, you'll get back the FailedPrecondition error because it exceeds the latest watermark.

    In conclusion, it's safe to use GetBlocksByRangeWithTag for backfilling since the reorg will not happen for past blocks, however, you'd be suggested to use GetBlockWithTag for recent blocks (e.g. streaming case).

Data Processing Pattern

Below are several patterns you can choose for data processing.

  1. If you want the most up-to-date blocks, you need to use the streaming APIs to handle the chain reorg events:

  2. Unified batch and streaming: - Download, let's say 10k events, using GetChainEvents. - Break down 10k events into small batches, e.g. 20 events/batch. - Process those batches in parallel. - For events in each batch, it can be processed either sequentially or in parallel using GetBlockWithTag. - Update watermark once all small batches have been processed. - Repeat above steps.

    With the above pattern, you can unify batch and streaming use cases. When your data pipeline is close to the tip, GetChainEvents will simply return all available blocks.

  3. Separate workflows for backfilling and live streaming: Use GetBlocksByRangeWithTag for backfilling and then switch over to StreamChainEvents for live streaming.

  4. If you don't want to deal with chain reorg, you may use the batch APIs as follows:

  • Maintain a distance (irreversibleDistance) to the tip, the irreversible distance can be queried using GetChainMetadata.
  • Get the latest block height (latest) using GetLatestBlock.
  • Poll for new data from current watermark block to the block (latest - irreversibleDistance) using GetBlocksByRangeWithTag.
  • Repeat above steps periodically.

Examples

See below for a few examples for implementing a simple indexer using the SDK. Note that the examples are provided in increasing complexity.

Batch

In this example, we use the blocks API to fetch the confirmed blocks as follows:

  1. Fetch the maximum reorg distance (irreversibleDistance).
  2. Fetch the latest block height (latest).
  3. Poll for new blocks from the checkpoint up to the latest confirmed block (latest - irreversibleDistance). using GetBlocksByRange.
  4. Update the checkpoint.
  5. Repeat above steps periodically.

Stream

This example demonstrates how to stream the latest blocks and handle chain reorgs. The worker processes the events sequentially and relies on BlockchainEvent_Type to construct the canonical chain. For example, given +1, +2, +3, -3, -2, +2', +3' as the events, the canonical chain would be +1, +2', +3'.

Unified

The last example showcases how to turn the data processing into an embarrassingly parallel problem by leveraging the mono-increasing sequence number. In this example, though the events are processed in parallel and out of order, the logical ordering guarantee is preserved.

  1. Download, say 10k events, using GetChainEvents. Note that this API is non-blocking, and it returns all the available events if the requested amount is not available. This enables us to unify batch and stream processing.
  2. Break down 10k events into small batches, e.g. 20 events/batch.
  3. Distribute those batches to a number of workers for parallel processing. Note that this step is not part of the example.
  4. For events in each batch, it can be processed either sequentially or in parallel using GetBlockWithTag.
  5. Implement versioning using the mono-increasing sequence numbers provided by the events. See here for more details.
  6. Update watermark once all the batches have been processed.
  7. Repeat above steps.

Data Migration Tool

Tool to migrate blockchain data from DynamoDB to PostgreSQL with complete reorg support and data integrity preservation.

Overview

The migration tool performs a comprehensive transfer of blockchain data:

  • Block metadata from DynamoDB to PostgreSQL (block_metadata + canonical_blocks tables)
  • Events from DynamoDB to PostgreSQL (block_events table)
  • Complete reorg data including both canonical and non-canonical blocks
  • Event ID-based migration for efficient sequential processing

Critical Requirements:

  1. Block metadata must be migrated before events (foreign key dependencies)
  2. Migration preserves complete blockchain history including all reorg blocks
  3. Canonical block identification is maintained through migration ordering

Basic Usage

# Migrate both blocks and events for a height range
go run cmd/admin/*.go migrate \
  --env=local \
  --blockchain=ethereum \
  --network=mainnet \
  --start-height=1000000 \
  --end-height=1001000 \
  --tag=1 \
  --event-tag=0

Contact Us

We have set up a Discord server soon. Here is the link to join (limited 10) https://discord.com/channels/1079683467018764328/1079683467786334220.

About

The File System For a Multi-Blockchain World

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 98.2%
  • Python 1.3%
  • Other 0.5%