Skip to content

Commit

Permalink
update ethereum RPCs and Services
Browse files Browse the repository at this point in the history
  • Loading branch information
cortze committed Mar 26, 2024
1 parent e74c3aa commit 65efbdf
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.0.0-20190807091052-3d65705ee9f1 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.50.0 // indirect
github.com/prometheus/common v0.51.1 // indirect
github.com/prometheus/procfs v0.13.0 // indirect
github.com/protolambda/bls12-381-util v0.1.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,8 @@ github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16
github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.50.0 h1:YSZE6aa9+luNa2da6/Tik0q0A5AbR+U003TItK57CPQ=
github.com/prometheus/common v0.50.0/go.mod h1:wHFBCEVWVmHMUpg7pYcOm2QUR/ocQdYSJVQJKnHc3xQ=
github.com/prometheus/common v0.51.1 h1:eIjN50Bwglz6a/c3hAgSMcofL3nD+nFQkV6Dd4DsQCw=
github.com/prometheus/common v0.51.1/go.mod h1:lrWtQx+iDfn2mbH5GUzlH9TSHyfZpHkSiG1W7y3sF2Q=
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
Expand Down
5 changes: 5 additions & 0 deletions pkg/crawler/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,13 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig)
func (c *EthereumCrawler) Run() {
// init all the eth_protocols
c.EthNode.ServeBeaconPing(c.Host.Host())
c.EthNode.ServeBeaconGoodbye(c.Host.Host())
c.EthNode.ServeBeaconStatus(c.Host.Host())
c.EthNode.ServeBeaconMetadata(c.Host.Host())
c.EthNode.ServeBeaconBlocksByRootV2(c.Host.Host())
c.EthNode.ServeBeaconBlocksByRangeV2(c.Host.Host())
c.EthNode.ServeBeaconBlobsByRootV1(c.Host.Host())
c.EthNode.ServeBeaconBlobsByRangeV1(c.Host.Host())

// initialization secuence for the crawler
c.Events.Start(c.ctx)
Expand Down
66 changes: 66 additions & 0 deletions pkg/networks/ethereum/beacon_blobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package ethereum

import (
"context"

"github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods"
"github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
log "github.com/sirupsen/logrus"
)

func (en *LocalEthereumNode) ServeBeaconBlobsByRangeV1(h host.Host) {
go func() {
sCtxFn := func() context.Context {
reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout)
return reqCtx
}
comp := new(reqresp.SnappyCompression)
listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) {
blobsRange := new(methods.BlobsByRangeReqV1)
err := handler.ReadRequest(blobsRange)
if err != nil {
_ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_range request")
log.Errorf("failed to read blobs_by_range request: %v from %s", err, peerId.String())
} else {
log.Info("dropped blobs_by_range request", *blobsRange)
}
}
b := methods.BlobsByRangeRPCv1
streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq)
h.SetStreamHandler(b.Protocol, streamHandler)
log.Info("Started serving blobs_by_range")
// wait untill the ctx is down
<-en.ctx.Done() // TODO: do it better
log.Info("Stopped serving blobs_by_range")
}()
}

func (en *LocalEthereumNode) ServeBeaconBlobsByRootV1(h host.Host) {
go func() {
sCtxFn := func() context.Context {
reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout)
return reqCtx
}
comp := new(reqresp.SnappyCompression)
listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) {
blobRoots := new(methods.BlobByRootV1)
err := handler.ReadRequest(blobRoots)
if err != nil {
_ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse blobs_by_root request")
log.Errorf("failed to read blobs_by_root request: %v from %s", err, peerId.String())
} else {
log.Info("dropped blobs_by_root request", *blobRoots)
}
}
b := methods.BlobsByRootRPCv1
streamHandler := b.MakeStreamHandler(sCtxFn, comp, listenReq)
h.SetStreamHandler(b.Protocol, streamHandler)
log.Info("Started serving blobs_by_root")
// wait untill the ctx is down
<-en.ctx.Done() // TODO: do it better
log.Info("Stopped serving blobs_by_root")
}()
}
66 changes: 66 additions & 0 deletions pkg/networks/ethereum/beacon_blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package ethereum

import (
"context"

"github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/methods"
"github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
log "github.com/sirupsen/logrus"
)

func (en *LocalEthereumNode) ServeBeaconBlocksByRangeV2(h host.Host) {
go func() {
sCtxFn := func() context.Context {
reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout)
return reqCtx
}
comp := new(reqresp.SnappyCompression)
listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) {
blockRange := new(methods.BlocksByRootReq)
err := handler.ReadRequest(blockRange)
if err != nil {
_ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_range request")
log.Errorf("failed to read block_by_range request: %v from %s", err, peerId.String())
} else {
log.Infof("dropped block_by_range request %v", *blockRange)
}
}
m := methods.BlocksByRangeRPCv2
streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq)
h.SetStreamHandler(m.Protocol, streamHandler)
log.Info("Started serving block_by_range")
// wait untill the ctx is down
<-en.ctx.Done() // TODO: do it better
log.Info("Stopped serving block_by_range")
}()
}

func (en *LocalEthereumNode) ServeBeaconBlocksByRootV2(h host.Host) {
go func() {
sCtxFn := func() context.Context {
reqCtx, _ := context.WithTimeout(en.ctx, RPCTimeout)
return reqCtx
}
comp := new(reqresp.SnappyCompression)
listenReq := func(ctx context.Context, peerId peer.ID, handler reqresp.ChunkedRequestHandler) {
blockRoot := new(methods.BlocksByRootReq)
err := handler.ReadRequest(blockRoot)
if err != nil {
_ = handler.WriteErrorChunk(reqresp.InvalidReqCode, "could not parse block_by_root request")
log.Error("failed to read block_by_root request: %v from %s", err, peerId.String())
} else {
log.Infof("dropped block_by_root request %v", *blockRoot)
}
}
m := methods.BlocksByRootRPCv2
streamHandler := m.MakeStreamHandler(sCtxFn, comp, listenReq)
h.SetStreamHandler(m.Protocol, streamHandler)
log.Info("Started serving block_by_root")
// wait untill the ctx is down
<-en.ctx.Done() // TODO: do it better
log.Info("Stopped serving block_by_root")
}()
}
140 changes: 140 additions & 0 deletions pkg/networks/ethereum/rpc/methods/blobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package methods

import (
"encoding/hex"
"fmt"
"github.com/migalabs/armiarma/pkg/networks/ethereum/rpc/reqresp"
"github.com/protolambda/ztyp/codec"
"github.com/protolambda/ztyp/tree"
"github.com/protolambda/ztyp/view"
)

// https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1
const (
MAX_BLOBS_PER_BLOCK int = 6
MAX_BLOBS_PER_RPC_REQ int = MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK
)

type BlobIdentifier struct {
BlockRoot Root
Index view.Uint64View
}

func (blobId *BlobIdentifier) Deserialize(dr *codec.DecodingReader) error {
return dr.FixedLenContainer(&blobId.BlockRoot, &blobId.Index)
}

func (blobId *BlobIdentifier) Serialize(w *codec.EncodingWriter) error {
return w.FixedLenContainer(&blobId.BlockRoot, &blobId.Index)
}

func (blobId *BlobIdentifier) ByteLength() uint64 {
return blobId.BlockRoot.FixedLength() + blobId.Index.FixedLength()
}

func (blobId *BlobIdentifier) FixedLength() uint64 {
return blobId.BlockRoot.FixedLength() + blobId.Index.FixedLength()
}

func (blobId *BlobIdentifier) HashTreeRoot(hFn tree.HashFn) Root {
return hFn.HashTreeRoot(&blobId.BlockRoot, &blobId.Index)
}

func (blobId *BlobIdentifier) String() string {
return fmt.Sprintf("%v", *blobId)
}

type BlobByRootV1 []BlobIdentifier

func (b BlobByRootV1) Deserialize(dr *codec.DecodingReader) error {
var idx int = 0
return dr.List(
func() codec.Deserializable {
i := idx
idx++
return &b[i]
},
uint64(len(b)),
uint64(MAX_BLOBS_PER_RPC_REQ))
}

func (b BlobByRootV1) Serialize(w *codec.EncodingWriter) error {
return w.List(func(i uint64) codec.Serializable {
return &b[i]
},
uint64(len(b)),
uint64(MAX_BLOBS_PER_RPC_REQ))
}

func (b BlobByRootV1) ByteLength() uint64 {
return uint64(len(b) * (32 + 8))
}

func (b BlobByRootV1) FixedLength() uint64 {
return 0
}

func (b BlobByRootV1) String() string {
if len(b) == 0 {
return "empty blobs-by-root request"
}
out := make([]byte, 0, len(b)*66)
for i, bId := range b {
hex.Encode(out[i*66:], bId.BlockRoot[:])
out[(i+1)*66-2] = ','
out[(i+1)*66-1] = ' '
}
return "blobs-by-root requested: " + string(out[:len(out)-1])
}

type BlobsByRangeReqV1 struct {
StartSlot Slot
Count view.Uint64View
}

func (b *BlobsByRangeReqV1) Data() map[string]interface{} {
return map[string]interface{}{
"start_slot": b.StartSlot,
"count": b.Count,
}
}

func (b *BlobsByRangeReqV1) Deserialize(dr *codec.DecodingReader) error {
return dr.FixedLenContainer(&b.StartSlot, &b.Count)
}

func (b *BlobsByRangeReqV1) Serialize(w *codec.EncodingWriter) error {
return w.FixedLenContainer(&b.StartSlot, &b.Count)
}

const blobsByRangeReqBytes uint64 = 8 + 8

func (b BlobsByRangeReqV1) ByteLength() uint64 {
return blobsByRangeReqBytes
}

func (b *BlobsByRangeReqV1) FixedLength() uint64 {
return blobsByRangeReqBytes
}

func (b *BlobsByRangeReqV1) HashTreeRoot(hFn tree.HashFn) Root {
return hFn.HashTreeRoot(&b.StartSlot, &b.Count)
}

func (b *BlobsByRangeReqV1) String() string {
return fmt.Sprintf("%v", *b)
}

var BlobsByRangeRPCv1 = reqresp.RPCMethod{
Protocol: "/eth2/beacon_chain/req/blob_sidecars_by_range/1/ssz_snappy",
RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobsByRangeReqV1) }, blobsByRangeReqBytes, blobsByRangeReqBytes),
ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobsByRangeReqV1) }, 0, uint64(0)),
DefaultResponseChunkCount: 20,
}

var BlobsByRootRPCv1 = reqresp.RPCMethod{
Protocol: "/eth2/beacon_chain/req/blob_sidecars_by_root/1/ssz_snappy",
RequestCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobByRootV1) }, 0, uint64((32+8)*MAX_BLOBS_PER_RPC_REQ)),
ResponseChunkCodec: reqresp.NewSSZCodec(func() reqresp.SerDes { return new(BlobByRootV1) }, 0, uint64(0)),
DefaultResponseChunkCount: 20,
}
Loading

0 comments on commit 65efbdf

Please sign in to comment.