Skip to content

Commit

Permalink
node minibatch retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Aug 20, 2024
1 parent ec77268 commit e165edb
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 14 deletions.
59 changes: 58 additions & 1 deletion node/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestStoreBlobs(t *testing.T) {
}
}

func TestAttestBatch(t *testing.T) {
func TestMinibatchDispersalAndRetrieval(t *testing.T) {
server := newTestServer(t, true)

reqToCopy, _, _, blobHeaders, _ := makeStoreChunksRequest(t, 66, 33)
Expand Down Expand Up @@ -503,6 +503,63 @@ func TestAttestBatch(t *testing.T) {
})
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "commit not found in db"))

// Test GetChunks
p := &peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP("0.0.0.0"),
Port: 3000,
},
}
ctx = peer.NewContext(context.Background(), p)
retrieveChunksReply, err := server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 0,
QuorumId: 0,
})
assert.NoError(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat)
assert.Len(t, retrieveChunksReply.GetChunks(), 1)
recovered, err := new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0])
assert.NoError(t, err)
chunk, err := new(encoding.Frame).Deserialize(encodedChunk)
assert.NoError(t, err)
assert.Equal(t, recovered, chunk)

retrieveChunksReply, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 1,
QuorumId: 0,
})
assert.NoError(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat)
assert.Len(t, retrieveChunksReply.GetChunks(), 1)
recovered, err = new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0])
assert.NoError(t, err)
chunk, err = new(encoding.Frame).Deserialize(encodedChunk)
assert.NoError(t, err)
assert.Equal(t, recovered, chunk)

retrieveChunksReply, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 0,
QuorumId: 0,
})
assert.NoError(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, retrieveChunksReply.ChunkEncodingFormat)
assert.Len(t, retrieveChunksReply.GetChunks(), 1)
recovered, err = new(encoding.Frame).Deserialize(retrieveChunksReply.GetChunks()[0])
assert.NoError(t, err)
chunk, err = new(encoding.Frame).Deserialize(encodedChunk)
assert.NoError(t, err)
assert.Equal(t, recovered, chunk)

_, err = server.RetrieveChunks(ctx, &pb.RetrieveChunksRequest{
BatchHeaderHash: batchHeaderHash[:],
BlobIndex: 1,
QuorumId: 1,
})
assert.ErrorContains(t, err, "quorum ID 1 not found in blob header")
}

func TestRetrieveChunks(t *testing.T) {
Expand Down
30 changes: 24 additions & 6 deletions node/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,12 +694,28 @@ func (s *Store) GetBlobHeaderByHeaderHash(ctx context.Context, blobHeaderHash [3
// format of the bytes.
func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobIndex int, quorumID core.QuorumID) ([][]byte, node.ChunkEncodingFormat, error) {
log := s.logger

var err error
blobKey, err := EncodeBlobKey(batchHeaderHash, blobIndex, quorumID)
if err != nil {
return nil, node.ChunkEncodingFormat_UNKNOWN, err
}
data, err := s.db.Get(blobKey)
var data []byte
data, err = s.db.Get(blobKey)
if errors.Is(err, leveldb.ErrNotFound) {
// If the blob is not found, try to get the blob header hash and get the blob by the hash (stored via minibatch dispersal).
var blobHeaderHash [32]byte
blobHeaderHash, err = s.GetBlobHeaderHashAtIndex(ctx, batchHeaderHash, blobIndex)
if err != nil {
return nil, node.ChunkEncodingFormat_UNKNOWN, err
}
var key []byte
key, err = EncodeBlobKeyByHash(blobHeaderHash, quorumID)
if err != nil {
return nil, node.ChunkEncodingFormat_UNKNOWN, fmt.Errorf("failed to generate the key for storing blob: %w", err)
}
data, err = s.db.Get(key)
}

if err != nil {
return nil, node.ChunkEncodingFormat_UNKNOWN, err
}
Expand All @@ -713,16 +729,18 @@ func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobInd
return chunks, format, nil
}

func (s *Store) GetBlobHeaderHashAtIndex(ctx context.Context, batchHeaderHash [32]byte, blobIndex int) ([]byte, error) {
func (s *Store) GetBlobHeaderHashAtIndex(ctx context.Context, batchHeaderHash [32]byte, blobIndex int) ([32]byte, error) {
var res [32]byte
blobIndexKey := EncodeBlobIndexKey(batchHeaderHash, blobIndex)
data, err := s.db.Get(blobIndexKey)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return nil, ErrKeyNotFound
return res, ErrKeyNotFound
}
return nil, err
return res, err
}
return data, nil
copy(res[:], data)
return res, nil
}

// HasKey returns if a given key has been stored.
Expand Down
17 changes: 12 additions & 5 deletions node/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,16 @@ func TestStoreBatchSuccess(t *testing.T) {
assert.Nil(t, err)
assert.True(t, s.HasKey(ctx, blobKey2))

// Check the chunks.
chunks, format, err := s.GetChunks(ctx, batchHeaderHash, 0, 0)
assert.Nil(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, format)
assert.Equal(t, chunks, blobsProto[0].Bundles[0].Chunks)
chunks, format, err = s.GetChunks(ctx, batchHeaderHash, 1, 0)
assert.Nil(t, err)
assert.Equal(t, pb.ChunkEncodingFormat_GOB, format)
assert.Equal(t, chunks, blobsProto[1].Bundles[0].Chunks)

// Store the batch again it should be no-op.
_, err = s.StoreBatch(ctx, batchHeader, blobs, blobsProto)
assert.NotNil(t, err)
Expand Down Expand Up @@ -452,15 +462,12 @@ func TestStoreBatchBlobMapping(t *testing.T) {
assert.True(t, s.HasKey(ctx, blobIndexKey0))
assert.True(t, s.HasKey(ctx, blobIndexKey1))

var h [32]byte
bhh0, err := s.GetBlobHeaderHashAtIndex(ctx, batchHeaderHash, 0)
assert.Nil(t, err)
copy(h[:], bhh0)
assert.Equal(t, blobHeaderHash0, h)
assert.Equal(t, blobHeaderHash0, bhh0)
bhh1, err := s.GetBlobHeaderHashAtIndex(ctx, batchHeaderHash, 1)
assert.Nil(t, err)
copy(h[:], bhh1)
assert.Equal(t, blobHeaderHash1, h)
assert.Equal(t, blobHeaderHash1, bhh1)

// Check blob headers by GetBlobHeader method
var protoBlobHeader pb.BlobHeader
Expand Down
3 changes: 1 addition & 2 deletions node/store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ func EncodeBlobKey(batchHeaderHash [32]byte, blobIndex int, quorumID core.Quorum
}

func EncodeBlobKeyByHash(blobHeaderHash [32]byte, quorumID core.QuorumID) ([]byte, error) {
prefix := []byte(blobHeaderPrefix)
buf := bytes.NewBuffer(append(prefix, blobHeaderHash[:]...))
buf := bytes.NewBuffer(blobHeaderHash[:])
err := binary.Write(buf, binary.LittleEndian, quorumID)
if err != nil {
return nil, err
Expand Down

0 comments on commit e165edb

Please sign in to comment.