Skip to content

Commit

Permalink
filter out expired blobs from dynamo query
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Sep 11, 2024
1 parent 0b78d17 commit 0622f30
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 68 deletions.
11 changes: 7 additions & 4 deletions disperser/batcher/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestFinalizedBlob(t *testing.T) {
blobIndex := uint32(10)
sigRecordHash := [32]byte{0}
inclusionProof := []byte{1, 2, 3, 4, 5}
expiry := uint64(time.Now().Add(time.Hour).Unix())
confirmationInfo := &disperser.ConfirmationInfo{
BatchHeaderHash: batchHeaderHash,
BlobIndex: blobIndex,
Expand All @@ -73,7 +74,7 @@ func TestFinalizedBlob(t *testing.T) {
BlobHash: metadataKey1.BlobHash,
MetadataHash: metadataKey1.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
Expiry: expiry,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
Expand All @@ -86,7 +87,7 @@ func TestFinalizedBlob(t *testing.T) {
BlobHash: metadataKey2.BlobHash,
MetadataHash: metadataKey2.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
Expiry: expiry + 1,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
Expand Down Expand Up @@ -164,11 +165,12 @@ func TestUnfinalizedBlob(t *testing.T) {
ConfirmationBlockNumber: uint32(150),
Fee: []byte{0},
}
expiry := uint64(time.Now().Add(100000).Unix())
metadata := &disperser.BlobMetadata{
BlobHash: metadataKey.BlobHash,
MetadataHash: metadataKey.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
Expiry: expiry,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
Expand Down Expand Up @@ -234,11 +236,12 @@ func TestNoReceipt(t *testing.T) {
ConfirmationBlockNumber: uint32(150),
Fee: []byte{0},
}
expiry := uint64(time.Now().Add(100000).Unix())
metadata := &disperser.BlobMetadata{
BlobHash: metadataKey.BlobHash,
MetadataHash: metadataKey.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
Expiry: expiry,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
Expand Down
35 changes: 30 additions & 5 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
statusIndexName = "StatusIndex"
batchIndexName = "BatchIndex"
expiryIndexName = "Status-Expiry-Index"
)

// BlobMetadataStore is a blob metadata storage backed by DynamoDB
Expand Down Expand Up @@ -121,7 +122,7 @@ func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented.
func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) ([]*disperser.BlobMetadata, error) {
items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, statusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{
items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
Expand All @@ -143,11 +144,11 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status
return metadata, nil
}

// GetBlobMetadataByStatusCount returns the count of all the metadata with the given status
// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented.
func (s *BlobMetadataStore) GetBlobMetadataByStatusCount(ctx context.Context, status disperser.BlobStatus) (int32, error) {
count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, statusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{
func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status disperser.BlobStatus) (int32, error) {
count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
Expand Down Expand Up @@ -180,7 +181,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co
}
}

queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, statusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{
queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
Expand Down Expand Up @@ -440,6 +441,10 @@ func GenerateTableSchema(metadataTableName string, readCapacityUnits int64, writ
AttributeName: aws.String("BlobIndex"),
AttributeType: types.ScalarAttributeTypeN,
},
{
AttributeName: aws.String("Expiry"),
AttributeType: types.ScalarAttributeTypeN,
},
},
KeySchema: []types.KeySchemaElement{
{
Expand Down Expand Up @@ -493,6 +498,26 @@ func GenerateTableSchema(metadataTableName string, readCapacityUnits int64, writ
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
{
IndexName: aws.String(expiryIndexName),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("BlobStatus"),
KeyType: types.KeyTypeHash,
},
{
AttributeName: aws.String("Expiry"),
KeyType: types.KeyTypeRange,
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
Expand Down
Loading

0 comments on commit 0622f30

Please sign in to comment.