Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up unused node methods #874

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/grpc/node/node_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/proto/node/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ service Dispersal {
// StoreBlobs is simiar to StoreChunks, but it stores the blobs using a different storage schema
// so that the stored blobs can later be aggregated by AttestBatch method to a bigger batch.
// StoreBlobs + AttestBatch will eventually replace and deprecate StoreChunks method.
// DEPRECATED: StoreBlobs method is not used
rpc StoreBlobs(StoreBlobsRequest) returns (StoreBlobsReply) {}
// AttestBatch is used to aggregate the batches stored by StoreBlobs method to a bigger batch.
// It will return a signature at the end to attest to the aggregated batch.
// DEPRECATED: AttestBatch method is not used
rpc AttestBatch(AttestBatchRequest) returns (AttestBatchReply) {}
// Retrieve node info metadata
rpc NodeInfo(NodeInfoRequest) returns (NodeInfoReply) {}
Expand Down
119 changes: 2 additions & 117 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ import (
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/shirou/gopsutil/mem"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"

_ "go.uber.org/automaxprocs"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"
)

// Server implements the Node proto APIs.
Expand Down Expand Up @@ -170,125 +168,12 @@ func (s *Server) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*p
return reply, err
}

func (s *Server) validateStoreBlobsRequest(in *pb.StoreBlobsRequest) error {
if in.GetReferenceBlockNumber() == 0 {
return api.NewErrorInvalidArg("missing reference_block_number in request")
}

if len(in.GetBlobs()) == 0 {
return api.NewErrorInvalidArg("missing blobs in request")
}
for _, blob := range in.Blobs {
if blob.GetHeader() == nil {
return api.NewErrorInvalidArg("missing blob header in request")
}
if node.ValidatePointsFromBlobHeader(blob.GetHeader()) != nil {
return api.NewErrorInvalidArg("invalid points contained in the blob header in request")
}
if len(blob.GetHeader().GetQuorumHeaders()) == 0 {
return api.NewErrorInvalidArg("missing quorum headers in request")
}
if len(blob.GetHeader().GetQuorumHeaders()) != len(blob.GetBundles()) {
return api.NewErrorInvalidArg("the number of quorums must be the same as the number of bundles")
}
for _, q := range blob.GetHeader().GetQuorumHeaders() {
if q.GetQuorumId() > core.MaxQuorumID {
return api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, q.GetQuorumId()))
}
if err := core.ValidateSecurityParam(q.GetConfirmationThreshold(), q.GetAdversaryThreshold()); err != nil {
return err
}
}
if in.GetReferenceBlockNumber() != blob.GetHeader().GetReferenceBlockNumber() {
return api.NewErrorInvalidArg("reference_block_number must be the same for all blobs")
}
}
return nil
}

func (s *Server) StoreBlobs(ctx context.Context, in *pb.StoreBlobsRequest) (*pb.StoreBlobsReply, error) {
start := time.Now()

err := s.validateStoreBlobsRequest(in)
if err != nil {
return nil, err
}

blobHeadersSize := 0
bundleSize := 0
for _, blob := range in.Blobs {
blobHeadersSize += proto.Size(blob.GetHeader())
for _, bundle := range blob.GetBundles() {
bundleSize += proto.Size(bundle)
}
}
s.node.Logger.Info("StoreBlobs RPC request received", "numBlobs", len(in.Blobs), "reqMsgSize", proto.Size(in), "blobHeadersSize", blobHeadersSize, "bundleSize", bundleSize, "referenceBlockNumber", in.GetReferenceBlockNumber())

// Process the request
blobs, err := node.GetBlobMessages(in.GetBlobs(), s.node.Config.NumBatchDeserializationWorkers)
if err != nil {
return nil, err
}

s.node.Metrics.ObserveLatency("StoreBlobs", "deserialization", float64(time.Since(start).Milliseconds()))
s.node.Logger.Info("StoreBlobsRequest deserialized", "duration", time.Since(start))

signatures, err := s.node.ProcessBlobs(ctx, blobs, in.GetBlobs())
if err != nil {
return nil, err
}

signaturesBytes := make([]*wrappers.BytesValue, len(signatures))
for i, sig := range signatures {
if sig == nil {
signaturesBytes[i] = nil
continue
}
signaturesBytes[i] = wrapperspb.Bytes(sig.Serialize())
}

return &pb.StoreBlobsReply{Signatures: signaturesBytes}, nil
return &pb.StoreBlobsReply{}, api.NewErrorUnimplemented()
}

func (s *Server) AttestBatch(ctx context.Context, in *pb.AttestBatchRequest) (*pb.AttestBatchReply, error) {
start := time.Now()

// Validate the batch root
blobHeaderHashes := make([][32]byte, len(in.GetBlobHeaderHashes()))
for i, hash := range in.GetBlobHeaderHashes() {
if len(hash) != 32 {
return nil, api.NewErrorInvalidArg("invalid blob header hash")
}
var h [32]byte
copy(h[:], hash)
blobHeaderHashes[i] = h
}
batchHeader, err := node.GetBatchHeader(in.GetBatchHeader())
if err != nil {
return nil, fmt.Errorf("failed to get the batch header: %w", err)
}
err = s.node.ValidateBatchContents(ctx, blobHeaderHashes, batchHeader)
if err != nil {
return nil, fmt.Errorf("failed to validate the batch header root: %w", err)
}

// Store the mapping from batch header + blob index to blob header hashes
err = s.node.Store.StoreBatchBlobMapping(ctx, batchHeader, blobHeaderHashes)
if err != nil {
return nil, fmt.Errorf("failed to store the batch blob mapping: %w", err)
}

// Sign the batch header
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
if err != nil {
return nil, fmt.Errorf("failed to get the batch header hash: %w", err)
}
sig := s.node.KeyPair.SignMessage(batchHeaderHash)

s.node.Logger.Info("AttestBatch complete", "duration", time.Since(start))
return &pb.AttestBatchReply{
Signature: sig.Serialize(),
}, nil
return &pb.AttestBatchReply{}, api.NewErrorUnimplemented()
}

func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksRequest) (*pb.RetrieveChunksReply, error) {
Expand Down
181 changes: 0 additions & 181 deletions node/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,187 +381,6 @@ func TestStoreChunksRequestValidation(t *testing.T) {
assert.True(t, strings.Contains(err.Error(), "adversary threshold equals 0"))
}

func TestStoreBlobs(t *testing.T) {
server := newTestServer(t, true)

reqToCopy, _, _, blobHeaders, _ := makeStoreChunksRequest(t, 66, 33)
reqToCopy.BatchHeader = nil
req := &pb.StoreBlobsRequest{
Blobs: reqToCopy.Blobs,
ReferenceBlockNumber: 1,
}
reply, err := server.StoreBlobs(context.Background(), req)
assert.NoError(t, err)
assert.NotNil(t, reply.GetSignatures())
assert.Len(t, reply.GetSignatures(), len(blobHeaders))
for i, sig := range reply.GetSignatures() {
assert.NotNil(t, sig)
assert.NotNil(t, sig.Value)
batchHeader := &core.BatchHeader{
ReferenceBlockNumber: 1,
BatchRoot: [32]byte{},
}
_, err := batchHeader.SetBatchRoot([]*core.BlobHeader{blobHeaders[i]})
assert.NoError(t, err)
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
assert.NoError(t, err)
point, err := new(core.Signature).Deserialize(sig.Value)
assert.NoError(t, err)
s := &core.Signature{G1Point: point}
ok := s.Verify(keyPair.GetPubKeyG2(), batchHeaderHash)
assert.True(t, ok)
}
}

func TestMinibatchDispersalAndRetrieval(t *testing.T) {
server := newTestServer(t, true)

reqToCopy, _, _, blobHeaders, _ := makeStoreChunksRequest(t, 66, 33)
reqToCopy.BatchHeader = nil
req := &pb.StoreBlobsRequest{
Blobs: reqToCopy.Blobs,
ReferenceBlockNumber: 1,
}
ctx := context.Background()
reply, err := server.StoreBlobs(ctx, req)
assert.NoError(t, err)
assert.NotNil(t, reply.GetSignatures())

assert.Len(t, blobHeaders, 2)
bhh0, err := blobHeaders[0].GetBlobHeaderHash()
assert.NoError(t, err)
bhh1, err := blobHeaders[1].GetBlobHeaderHash()
assert.NoError(t, err)
batchHeader := &core.BatchHeader{
ReferenceBlockNumber: 1,
BatchRoot: [32]byte{},
}
_, err = batchHeader.SetBatchRoot([]*core.BlobHeader{blobHeaders[0], blobHeaders[1]})
assert.NoError(t, err)
attestReq := &pb.AttestBatchRequest{
BatchHeader: &pb.BatchHeader{
BatchRoot: batchHeader.BatchRoot[:],
ReferenceBlockNumber: 1,
},
BlobHeaderHashes: [][]byte{bhh0[:], bhh1[:]},
}
attestReply, err := server.AttestBatch(ctx, attestReq)
assert.NotNil(t, reply)
assert.NoError(t, err)
sig := attestReply.GetSignature()
assert.NotNil(t, sig)
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
assert.NoError(t, err)
point, err := new(core.Signature).Deserialize(sig)
assert.NoError(t, err)
s := &core.Signature{G1Point: point}
ok := s.Verify(keyPair.GetPubKeyG2(), batchHeaderHash)
assert.True(t, ok)

// Get blob headers
blobHeaderReply, err := server.GetBlobHeader(ctx, &pb.GetBlobHeaderRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 0,
QuorumId: 0,
})
assert.NoError(t, err)
assert.NotNil(t, blobHeaderReply)
blobHeader, err := node.GetBlobHeaderFromProto(blobHeaderReply.GetBlobHeader())
assert.NoError(t, err)
assert.Equal(t, blobHeader, blobHeaders[0])
proof := &merkletree.Proof{
Hashes: blobHeaderReply.GetProof().GetHashes(),
Index: uint64(blobHeaderReply.GetProof().GetIndex()),
}
ok, err = merkletree.VerifyProofUsing(bhh0[:], false, proof, [][]byte{batchHeader.BatchRoot[:]}, keccak256.New())
assert.NoError(t, err)
assert.True(t, ok)

blobHeaderReply, err = server.GetBlobHeader(ctx, &pb.GetBlobHeaderRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 1,
QuorumId: 0,
})
assert.NoError(t, err)
assert.NotNil(t, blobHeaderReply)
blobHeader, err = node.GetBlobHeaderFromProto(blobHeaderReply.GetBlobHeader())
assert.NoError(t, err)
assert.Equal(t, blobHeader, blobHeaders[1])
proof = &merkletree.Proof{
Hashes: blobHeaderReply.GetProof().GetHashes(),
Index: uint64(blobHeaderReply.GetProof().GetIndex()),
}
ok, err = merkletree.VerifyProofUsing(bhh1[:], false, proof, [][]byte{batchHeader.BatchRoot[:]}, keccak256.New())
assert.NoError(t, err)
assert.True(t, ok)

// non-existent blob index
_, err = server.GetBlobHeader(ctx, &pb.GetBlobHeaderRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 2,
QuorumId: 0,
})
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "commit not found in db"))

// Test GetChunks
p := &peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP("0.0.0.0"),
Port: 3000,
},
}
ctx = peer.NewContext(context.Background(), p)
retrieveChunksReply, err := server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 0,
QuorumId: 0,
})
assert.NoError(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat)
assert.Len(t, retrieveChunksReply.GetChunks(), 1)
recovered, err := new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0])
assert.NoError(t, err)
chunk, err := new(encoding.Frame).Deserialize(encodedChunk)
assert.NoError(t, err)
assert.Equal(t, recovered, chunk)

retrieveChunksReply, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 1,
QuorumId: 0,
})
assert.NoError(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat)
assert.Len(t, retrieveChunksReply.GetChunks(), 1)
recovered, err = new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0])
assert.NoError(t, err)
chunk, err = new(encoding.Frame).Deserialize(encodedChunk)
assert.NoError(t, err)
assert.Equal(t, recovered, chunk)

retrieveChunksReply, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 0,
QuorumId: 0,
})
assert.NoError(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat)
assert.Len(t, retrieveChunksReply.GetChunks(), 1)
recovered, err = new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0])
assert.NoError(t, err)
chunk, err = new(encoding.Frame).Deserialize(encodedChunk)
assert.NoError(t, err)
assert.Equal(t, recovered, chunk)

_, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 1,
QuorumId: 1,
})
assert.ErrorContains(t, err, "quorum ID 1 not found in blob header")
}

func TestRetrieveChunks(t *testing.T) {
server := newTestServer(t, true)
batchHeaderHash, _, _, _ := storeChunks(t, server, false)
Expand Down
Loading
Loading