Skip to content

Commit

Permalink
[v2] Paginate metadata records in controller (#914)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 21, 2024
1 parent 6ada199 commit 41e80d5
Show file tree
Hide file tree
Showing 10 changed files with 539 additions and 203 deletions.
16 changes: 9 additions & 7 deletions disperser/cmd/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,21 @@ func NewConfig(ctx *cli.Context) (Config, error) {
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
LoggerConfig: *loggerConfig,
EncodingManagerConfig: controller.EncodingManagerConfig{
PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name),
EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name),
StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name),
NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name),
NumRelayAssignment: uint16(numRelayAssignments),
AvailableRelays: relays,
EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name),
PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name),
EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name),
StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name),
NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name),
NumRelayAssignment: uint16(numRelayAssignments),
AvailableRelays: relays,
EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name),
MaxNumBlobsPerIteration: int32(ctx.GlobalInt(flags.MaxNumBlobsPerIterationFlag.Name)),
},
DispatcherConfig: controller.DispatcherConfig{
PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name),
FinalizationBlockDelay: ctx.GlobalUint64(flags.FinalizationBlockDelayFlag.Name),
NodeRequestTimeout: ctx.GlobalDuration(flags.NodeRequestTimeoutFlag.Name),
NumRequestRetries: ctx.GlobalInt(flags.NumRequestRetriesFlag.Name),
MaxBatchSize: int32(ctx.GlobalInt(flags.MaxBatchSizeFlag.Name)),
},
NumConcurrentEncodingRequests: ctx.GlobalInt(flags.NumConcurrentEncodingRequestsFlag.Name),
NumConcurrentDispersalRequests: ctx.GlobalInt(flags.NumConcurrentDispersalRequestsFlag.Name),
Expand Down
18 changes: 18 additions & 0 deletions disperser/cmd/controller/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONCURRENT_ENCODING_REQUESTS"),
Value: 250,
}
MaxNumBlobsPerIterationFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-num-blobs-per-iteration"),
Usage: "Max number of blobs to encode in a single iteration",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NUM_BLOBS_PER_ITERATION"),
Value: 128,
}

// Dispatcher Flags
DispatcherPullIntervalFlag = cli.DurationFlag{
Expand Down Expand Up @@ -150,6 +157,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "NODE_CLIENT_CACHE_NUM_ENTRIES"),
Value: 400,
}
MaxBatchSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-batch-size"),
Usage: "Max number of blobs to disperse in a batch",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BATCH_SIZE"),
Value: 128,
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -160,6 +174,7 @@ var requiredFlags = []cli.Flag{
EncodingPullIntervalFlag,
AvailableRelaysFlag,
EncoderAddressFlag,

DispatcherPullIntervalFlag,
NodeRequestTimeoutFlag,
NumConnectionsToNodesFlag,
Expand All @@ -172,10 +187,13 @@ var optionalFlags = []cli.Flag{
NumEncodingRetriesFlag,
NumRelayAssignmentFlag,
NumConcurrentEncodingRequestsFlag,
MaxNumBlobsPerIterationFlag,

FinalizationBlockDelayFlag,
NumRequestRetriesFlag,
NumConcurrentDispersalRequestsFlag,
NodeClientCacheNumEntriesFlag,
MaxBatchSizeFlag,
}

var Flags []cli.Flag
Expand Down
4 changes: 2 additions & 2 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func RunController(ctx *cli.Context) error {
}
encodingPool := workerpool.New(config.NumConcurrentEncodingRequests)
encodingManager, err := controller.NewEncodingManager(
config.EncodingManagerConfig,
&config.EncodingManagerConfig,
blobMetadataStore,
encodingPool,
encoderClient,
Expand Down Expand Up @@ -131,7 +131,7 @@ func RunController(ctx *cli.Context) error {
return fmt.Errorf("failed to create node client manager: %v", err)
}
dispatcher, err := controller.NewDispatcher(
config.DispatcherConfig,
&config.DispatcherConfig,
blobMetadataStore,
dispatcherPool,
ics,
Expand Down
149 changes: 149 additions & 0 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ var (
ErrInvalidStateTransition = errors.New("invalid state transition")
)

type StatusIndexCursor struct {
BlobKey *corev2.BlobKey
UpdatedAt uint64
}

// BlobMetadataStore is a blob metadata storage backed by DynamoDB
type BlobMetadataStore struct {
dynamoDBClient commondynamodb.Client
Expand Down Expand Up @@ -123,6 +128,19 @@ func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey corev2
return err
}

func (s *BlobMetadataStore) DeleteBlobMetadata(ctx context.Context, blobKey corev2.BlobKey) error {
err := s.dynamoDBClient.DeleteItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
},
"SK": &types.AttributeValueMemberS{
Value: blobMetadataSK,
},
})

return err
}

func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey corev2.BlobKey) (*v2.BlobMetadata, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Expand Down Expand Up @@ -151,6 +169,7 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey corev2.

// GetBlobMetadataByStatus returns all the metadata with the given status that were updated after lastUpdatedAt
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// Results are ordered by UpdatedAt in ascending order.
func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus, lastUpdatedAt uint64) ([]*v2.BlobMetadata, error) {
items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND UpdatedAt > :updatedAt", commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{
Expand All @@ -174,6 +193,110 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status
return metadata, nil
}

// GetBlobMetadataByStatusPaginated returns all the metadata with the given status that were updated after the given cursor.
// It also returns a new cursor (last evaluated key) to be used for the next page
// even when there are no more results or there are no results at all.
// This cursor can be used to get new set of results when they become available.
// Therefore, it's possible to get an empty result from a request with exclusive start key returned from previous response.
func (s *BlobMetadataStore) GetBlobMetadataByStatusPaginated(
ctx context.Context,
status v2.BlobStatus,
exclusiveStartKey *StatusIndexCursor,
limit int32,
) ([]*v2.BlobMetadata, *StatusIndexCursor, error) {
var cursor map[string]types.AttributeValue
if exclusiveStartKey != nil {
pk := blobKeyPrefix
if exclusiveStartKey.BlobKey != nil && len(exclusiveStartKey.BlobKey) == 32 {
pk = blobKeyPrefix + exclusiveStartKey.BlobKey.Hex()
}
cursor = map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: pk,
},
"SK": &types.AttributeValueMemberS{
Value: blobMetadataSK,
},
"UpdatedAt": &types.AttributeValueMemberN{
Value: strconv.FormatUint(exclusiveStartKey.UpdatedAt, 10),
},
"BlobStatus": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
}
} else {
cursor = map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix,
},
"SK": &types.AttributeValueMemberS{
Value: blobMetadataSK,
},
"UpdatedAt": &types.AttributeValueMemberN{
Value: "0",
},
"BlobStatus": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
}
}
res, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, StatusIndexName, "BlobStatus = :status", commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
}, limit, cursor)
if err != nil {
return nil, nil, err
}

// No results
if len(res.Items) == 0 && res.LastEvaluatedKey == nil {
// return the same cursor
return nil, exclusiveStartKey, nil
}

metadata := make([]*v2.BlobMetadata, 0, len(res.Items))
for _, item := range res.Items {
m, err := UnmarshalBlobMetadata(item)
// Skip invalid/corrupt items
if err != nil {
s.logger.Errorf("failed to unmarshal blob metadata: %v", err)
continue
}
metadata = append(metadata, m)
}

lastEvaludatedKey := res.LastEvaluatedKey
if lastEvaludatedKey == nil {
lastItem := res.Items[len(res.Items)-1]
updatedAt, err := strconv.ParseUint(lastItem["UpdatedAt"].(*types.AttributeValueMemberN).Value, 10, 64)
if err != nil {
return nil, nil, err
}
bk, err := UnmarshalBlobKey(lastItem)
if err != nil {
return nil, nil, err
}
return metadata, &StatusIndexCursor{
BlobKey: &bk,
UpdatedAt: updatedAt,
}, nil
}

newCursor := StatusIndexCursor{}
err = attributevalue.UnmarshalMap(lastEvaludatedKey, &newCursor)
if err != nil {
return nil, nil, err
}
bk, err := UnmarshalBlobKey(lastEvaludatedKey)
if err != nil {
return nil, nil, err
}
newCursor.BlobKey = &bk

return metadata, &newCursor, nil
}

// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status
// Because this function scans the entire index, it should only be used for status with a limited number of items.
func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) {
Expand Down Expand Up @@ -203,6 +326,19 @@ func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *co
return err
}

func (s *BlobMetadataStore) DeleteBlobCertificate(ctx context.Context, blobKey corev2.BlobKey) error {
err := s.dynamoDBClient.DeleteItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
},
"SK": &types.AttributeValueMemberS{
Value: blobCertSK,
},
})

return err
}

func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey corev2.BlobKey) (*corev2.BlobCertificate, *encoding.FragmentInfo, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Expand Down Expand Up @@ -357,6 +493,19 @@ func (s *BlobMetadataStore) PutBatchHeader(ctx context.Context, batchHeader *cor
return err
}

func (s *BlobMetadataStore) DeleteBatchHeader(ctx context.Context, batchHeaderHash [32]byte) error {
err := s.dynamoDBClient.DeleteItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: batchHeaderKeyPrefix + hex.EncodeToString(batchHeaderHash[:]),
},
"SK": &types.AttributeValueMemberS{
Value: batchHeaderSK,
},
})

return err
}

func (s *BlobMetadataStore) GetBatchHeader(ctx context.Context, batchHeaderHash [32]byte) (*corev2.BatchHeader, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Expand Down
Loading

0 comments on commit 41e80d5

Please sign in to comment.