Skip to content

Commit

Permalink
[v2] Dispatcher Implementation (#871)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 13, 2024
1 parent 01f8be7 commit 999fb15
Show file tree
Hide file tree
Showing 11 changed files with 915 additions and 16 deletions.
21 changes: 21 additions & 0 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,27 @@ func BlobHeaderFromProto(h *pb.BlobHeader) (*BlobHeader, error) {
}, nil
}

func SerializeMerkleProof(proof *merkletree.Proof) []byte {
proofBytes := make([]byte, 0)
for _, hash := range proof.Hashes {
proofBytes = append(proofBytes, hash[:]...)
}
return proofBytes
}

func DeserializeMerkleProof(data []byte) (*merkletree.Proof, error) {
proof := &merkletree.Proof{}
if len(data)%32 != 0 {
return nil, fmt.Errorf("invalid proof length")
}
for i := 0; i < len(data); i += 32 {
var hash [32]byte
copy(hash[:], data[i:i+32])
proof.Hashes = append(proof.Hashes, hash[:])
}
return proof, nil
}

func encode(obj any) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down
10 changes: 1 addition & 9 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (b *Batcher) updateConfirmationInfo(
blobsToRetry = append(blobsToRetry, batchData.blobs[blobIndex])
continue
}
proof = serializeProof(merkleProof)
proof = core.SerializeMerkleProof(merkleProof)
}

confirmationInfo := &disperser.ConfirmationInfo{
Expand Down Expand Up @@ -563,14 +563,6 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
return nil
}

func serializeProof(proof *merkletree.Proof) []byte {
proofBytes := make([]byte, 0)
for _, hash := range proof.Hashes {
proofBytes = append(proofBytes, hash[:]...)
}
return proofBytes
}

func (b *Batcher) parseBatchIDFromReceipt(txReceipt *types.Receipt) (uint32, error) {
if len(txReceipt.Logs) == 0 {
return 0, errors.New("failed to get transaction receipt with logs")
Expand Down
2 changes: 1 addition & 1 deletion disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status
Value: strconv.Itoa(int(status)),
},
":updatedAt": &types.AttributeValueMemberN{
Value: strconv.FormatInt(time.Now().Unix(), 10),
Value: strconv.FormatInt(int64(lastUpdatedAt), 10),
}})
if err != nil {
return nil, err
Expand Down
44 changes: 44 additions & 0 deletions disperser/common/v2/blobstore/dynamo_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, queued, 1)
assert.Equal(t, metadata1, queued[0])
// query to get newer blobs should result in 0 results
queued, err = blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued, metadata1.UpdatedAt+100)
assert.NoError(t, err)
assert.Len(t, queued, 0)

certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified, 0)
assert.NoError(t, err)
assert.Len(t, certified, 1)
Expand Down Expand Up @@ -153,6 +158,45 @@ func TestBlobMetadataStoreCerts(t *testing.T) {
err = blobMetadataStore.PutBlobCertificate(ctx, blobCert1, fragmentInfo)
assert.ErrorIs(t, err, common.ErrAlreadyExists)

// get multiple certs
numCerts := 100
keys := make([]corev2.BlobKey, numCerts)
for i := 0; i < numCerts; i++ {
blobCert := &corev2.BlobCertificate{
BlobHeader: &corev2.BlobHeader{
BlobVersion: 0,
QuorumNumbers: []core.QuorumID{0},
BlobCommitments: mockCommitment,
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x123",
BinIndex: uint32(i),
CumulativePayment: big.NewInt(321),
},
Signature: []byte("signature"),
},
RelayKeys: []corev2.RelayKey{0},
}
blobKey, err := blobCert.BlobHeader.BlobKey()
assert.NoError(t, err)
keys[i] = blobKey
err = blobMetadataStore.PutBlobCertificate(ctx, blobCert, fragmentInfo)
assert.NoError(t, err)
}

certs, fragmentInfos, err := blobMetadataStore.GetBlobCertificates(ctx, keys)
assert.NoError(t, err)
assert.Len(t, certs, numCerts)
assert.Len(t, fragmentInfos, numCerts)
binIndexes := make(map[uint32]struct{})
for i := 0; i < numCerts; i++ {
assert.Equal(t, fragmentInfos[i], fragmentInfo)
binIndexes[certs[i].BlobHeader.PaymentMetadata.BinIndex] = struct{}{}
}
assert.Len(t, binIndexes, numCerts)
for i := 0; i < numCerts; i++ {
assert.Contains(t, binIndexes, uint32(i))
}

deleteItems(t, []commondynamodb.Key{
{
"PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey.Hex()},
Expand Down
Loading

0 comments on commit 999fb15

Please sign in to comment.