Skip to content

Commit

Permalink
feat: indexer indexes blocks and txs of mev-commit chain to any plugg…
Browse files Browse the repository at this point in the history
…able storage
  • Loading branch information
kant committed Jul 30, 2024
1 parent 1387398 commit 110dffa
Show file tree
Hide file tree
Showing 14 changed files with 1,748 additions and 0 deletions.
1 change: 1 addition & 0 deletions indexer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# indexer
121 changes: 121 additions & 0 deletions indexer/cmd/indexer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/primev/mev-commit/indexer/internal/indexer"
"github.com/primev/mev-commit/indexer/pkg/ethclient"
"github.com/primev/mev-commit/indexer/pkg/logutil"
"github.com/primev/mev-commit/indexer/pkg/store"
"github.com/primev/mev-commit/indexer/pkg/store/elasticsearch"

"log/slog"

"github.com/urfave/cli/v2"
)

func main() {
app := &cli.App{
Name: "blockchain-indexer",
Usage: "Index blockchain data into Elasticsearch",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "ethereum-endpoint",
EnvVars: []string{"ETHEREUM_ENDPOINT"},
Value: "http://localhost:8545",
Usage: "Ethereum node endpoint",
},
&cli.StringFlag{
Name: "elasticsearch-endpoint",
EnvVars: []string{"ELASTICSEARCH_ENDPOINT"},
Value: "http://localhost:9200",
Usage: "Elasticsearch endpoint",
},
&cli.StringFlag{
Name: "es-username",
EnvVars: []string{"ES_USERNAME"},
Value: "",
Usage: "Elasticsearch username",
},
&cli.StringFlag{
Name: "es-password",
EnvVars: []string{"ES_PASSWORD"},
Value: "",
Usage: "Elasticsearch password",
},
&cli.DurationFlag{
Name: "index-interval",
EnvVars: []string{"INDEX_INTERVAL"},
Value: 15 * time.Second,
Usage: "Interval between indexing operations",
},
&cli.StringFlag{
Name: "log-level",
EnvVars: []string{"LOG_LEVEL"},
Value: "info",
Usage: "Log level (debug, info, warn, error)",
},
},
Action: run,
}

if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}

func run(cliCtx *cli.Context) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ethClient, err := ethclient.NewW3EthereumClient(cliCtx.String("ethereum-endpoint"))
if err != nil {
slog.Error("Failed to create Ethereum client", "error", err)
return err
}

esClient, err := elasticsearch.NewESClient(cliCtx.String("elasticsearch-endpoint"), cliCtx.String("es-username"), cliCtx.String("es-password"))
if err != nil {
slog.Error("Failed to create Elasticsearch client", "error", err)
return err
}
defer func() {
if err := esClient.Close(ctx); err != nil {
slog.Error("Failed to close Elasticsearch client", "error", err)
}
}()

var esStorage store.Storage = esClient

blockchainIndexer := indexer.NewBlockchainIndexer(
ethClient,
esStorage,
cliCtx.Duration("index-interval"),
)

// Set log level
logutil.SetLogLevel(cliCtx.String("log-level"))

if err := blockchainIndexer.Start(ctx); err != nil {
slog.Error("Failed to start blockchain indexer", "error", err)
return err
}

// Set up graceful shutdown
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

// Wait for interrupt signal
<-c
slog.Info("Shutting down gracefully...")
cancel()
// Wait for some time to allow ongoing operations to complete
time.Sleep(5 * time.Second)
return nil
}
101 changes: 101 additions & 0 deletions indexer/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/primev/mev-commit/indexer/pkg/indexer"
"github.com/urfave/cli/v2"
"log/slog"
)

func main() {
app := &cli.App{
Name: "blockchain-indexer",
Usage: "Index blockchain data into Elasticsearch",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "ethereum-endpoint",
EnvVars: []string{"ETHEREUM_ENDPOINT"},
Value: "http://localhost:8545",
Usage: "Ethereum node endpoint",
},
&cli.StringFlag{
Name: "elasticsearch-endpoint",
EnvVars: []string{"ELASTICSEARCH_ENDPOINT"},
Value: "http://localhost:9200",
Usage: "Elasticsearch endpoint",
},
&cli.DurationFlag{
Name: "index-interval",
EnvVars: []string{"INDEX_INTERVAL"},
Value: 15 * time.Second,
Usage: "Interval between indexing operations",
},
&cli.StringFlag{
Name: "log-level",
EnvVars: []string{"LOG_LEVEL"},
Value: "info",
Usage: "Log level (debug, info, warn, error)",
},
},
Action: run,
}

if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}

func run(cliCtx *cli.Context) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ethClient, err := indexer.NewW3EthereumClient(cliCtx.String("ethereum-endpoint"))
if err != nil {
slog.Error("Failed to create Ethereum client", "error", err)
return err
}

esClient, err := indexer.NewESClient(cliCtx.String("elasticsearch-endpoint"))
if err != nil {
slog.Error("Failed to create Elasticsearch client", "error", err)
return err
}
defer func() {
if err := esClient.Close(ctx); err != nil {
slog.Error("Failed to close Elasticsearch client", "error", err)
}
}()

blockchainIndexer := indexer.NewBlockchainIndexer(
ethClient,
esClient,
cliCtx.Duration("index-interval"),
)

// Set log level
indexer.SetLogLevel(cliCtx.String("log-level"))

if err := blockchainIndexer.Start(ctx); err != nil {
slog.Error("Failed to start blockchain indexer", "error", err)
return err
}

// Set up graceful shutdown
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

// Wait for interrupt signal
<-c
slog.Info("Shutting down gracefully...")
cancel()
// Wait for some time to allow ongoing operations to complete
time.Sleep(5 * time.Second)
return nil
}
49 changes: 49 additions & 0 deletions indexer/find_missing_block_num.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from elasticsearch import Elasticsearch, helpers

# Initialize Elasticsearch client with authentication
es = Elasticsearch(
["http://localhost:9200"], # Replace with your Elasticsearch host if different
basic_auth=("elastic", "mev-commit")
)

# Function to get all numbers using scroll API
def get_all_numbers():
numbers = []
scroll_size = 10000

# Initial search request
response = es.search(
index="blocks",
body={
"size": scroll_size,
"_source": ["number"],
"sort": [{"number": "asc"}]
},
scroll='2m'
)

# Get the scroll ID
scroll_id = response['_scroll_id']

# Get the first batch of numbers
numbers.extend([hit['_source']['number'] for hit in response['hits']['hits']])

# Continue scrolling until no more hits
while len(response['hits']['hits']):
response = es.scroll(scroll_id=scroll_id, scroll='2m')
numbers.extend([hit['_source']['number'] for hit in response['hits']['hits']])

return numbers

# Get all numbers
all_numbers = get_all_numbers()

# Find missing numbers
missing_numbers = []
for i in range(len(all_numbers) - 1):
current_number = all_numbers[i]
next_number = all_numbers[i + 1]
if next_number != current_number + 1:
missing_numbers.extend(range(current_number + 1, next_number))

print("Missing numbers:", missing_numbers)
56 changes: 56 additions & 0 deletions indexer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
module github.com/primev/mev-commit/indexer

go 1.22

require (
github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/ethereum/go-ethereum v1.14.6
github.com/lmittmann/tint v1.0.5
github.com/lmittmann/w3 v0.16.7
github.com/urfave/cli/v2 v2.27.1
)

require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/rivo/uniseg v0.4.2 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/supranational/blst v0.3.11 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
Loading

0 comments on commit 110dffa

Please sign in to comment.