Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Paginate metadata records in controller #914

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions disperser/cmd/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,21 @@ func NewConfig(ctx *cli.Context) (Config, error) {
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
LoggerConfig: *loggerConfig,
EncodingManagerConfig: controller.EncodingManagerConfig{
PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name),
EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name),
StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name),
NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name),
NumRelayAssignment: uint16(numRelayAssignments),
AvailableRelays: relays,
EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name),
PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name),
EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name),
StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name),
NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name),
NumRelayAssignment: uint16(numRelayAssignments),
AvailableRelays: relays,
EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name),
MaxNumBlobsPerIteration: int32(ctx.GlobalInt(flags.MaxNumBlobsPerIterationFlag.Name)),
},
DispatcherConfig: controller.DispatcherConfig{
PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name),
FinalizationBlockDelay: ctx.GlobalUint64(flags.FinalizationBlockDelayFlag.Name),
NodeRequestTimeout: ctx.GlobalDuration(flags.NodeRequestTimeoutFlag.Name),
NumRequestRetries: ctx.GlobalInt(flags.NumRequestRetriesFlag.Name),
MaxBatchSize: int32(ctx.GlobalInt(flags.MaxBatchSizeFlag.Name)),
},
NumConcurrentEncodingRequests: ctx.GlobalInt(flags.NumConcurrentEncodingRequestsFlag.Name),
NumConcurrentDispersalRequests: ctx.GlobalInt(flags.NumConcurrentDispersalRequestsFlag.Name),
Expand Down
18 changes: 18 additions & 0 deletions disperser/cmd/controller/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONCURRENT_ENCODING_REQUESTS"),
Value: 250,
}
MaxNumBlobsPerIterationFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-num-blobs-per-iteration"),
Usage: "Max number of blobs to encode in a single iteration",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NUM_BLOBS_PER_ITERATION"),
Value: 128,
}

// Dispatcher Flags
DispatcherPullIntervalFlag = cli.DurationFlag{
Expand Down Expand Up @@ -150,6 +157,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "NODE_CLIENT_CACHE_NUM_ENTRIES"),
Value: 400,
}
MaxBatchSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-batch-size"),
Usage: "Max number of blobs to disperse in a batch",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BATCH_SIZE"),
Value: 128,
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -160,6 +174,7 @@ var requiredFlags = []cli.Flag{
EncodingPullIntervalFlag,
AvailableRelaysFlag,
EncoderAddressFlag,

DispatcherPullIntervalFlag,
NodeRequestTimeoutFlag,
NumConnectionsToNodesFlag,
Expand All @@ -172,10 +187,13 @@ var optionalFlags = []cli.Flag{
NumEncodingRetriesFlag,
NumRelayAssignmentFlag,
NumConcurrentEncodingRequestsFlag,
MaxNumBlobsPerIterationFlag,

FinalizationBlockDelayFlag,
NumRequestRetriesFlag,
NumConcurrentDispersalRequestsFlag,
NodeClientCacheNumEntriesFlag,
MaxBatchSizeFlag,
}

var Flags []cli.Flag
Expand Down
4 changes: 2 additions & 2 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func RunController(ctx *cli.Context) error {
}
encodingPool := workerpool.New(config.NumConcurrentEncodingRequests)
encodingManager, err := controller.NewEncodingManager(
config.EncodingManagerConfig,
&config.EncodingManagerConfig,
blobMetadataStore,
encodingPool,
encoderClient,
Expand Down Expand Up @@ -131,7 +131,7 @@ func RunController(ctx *cli.Context) error {
return fmt.Errorf("failed to create node client manager: %v", err)
}
dispatcher, err := controller.NewDispatcher(
config.DispatcherConfig,
&config.DispatcherConfig,
blobMetadataStore,
dispatcherPool,
ics,
Expand Down
149 changes: 149 additions & 0 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ var (
ErrInvalidStateTransition = errors.New("invalid state transition")
)

type StatusIndexCursor struct {
BlobKey *corev2.BlobKey
UpdatedAt uint64
}

// BlobMetadataStore is a blob metadata storage backed by DynamoDB
type BlobMetadataStore struct {
dynamoDBClient commondynamodb.Client
Expand Down Expand Up @@ -123,6 +128,19 @@ func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey corev2
return err
}

func (s *BlobMetadataStore) DeleteBlobMetadata(ctx context.Context, blobKey corev2.BlobKey) error {
err := s.dynamoDBClient.DeleteItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
},
"SK": &types.AttributeValueMemberS{
Value: blobMetadataSK,
},
})

return err
}

func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey corev2.BlobKey) (*v2.BlobMetadata, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Expand Down Expand Up @@ -151,6 +169,7 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey corev2.

// GetBlobMetadataByStatus returns all the metadata with the given status that were updated after lastUpdatedAt
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// Results are ordered by UpdatedAt in ascending order.
func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus, lastUpdatedAt uint64) ([]*v2.BlobMetadata, error) {
items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND UpdatedAt > :updatedAt", commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{
Expand All @@ -174,6 +193,110 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status
return metadata, nil
}

// GetBlobMetadataByStatusPaginated returns all the metadata with the given status that were updated after the given cursor.
// It also returns a new cursor (last evaluated key) to be used for the next page
// even when there are no more results or there are no results at all.
// This cursor can be used to get new set of results when they become available.
// Therefore, it's possible to get an empty result from a request with exclusive start key returned from previous response.
func (s *BlobMetadataStore) GetBlobMetadataByStatusPaginated(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about actually build a cursor that streams the data?
The StatusIndexCursor is really a position or offset; and an actual Cursor will take a cursor position/offset, and keep iterating forward (and internalizing the bookkeeping of moving position).
A Cursor (or Iterator) will provide Next(), Item(), Position() etc. so the client (encoder manager and dispatcher) can stream through the data.
It'll be a better abstraction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that's a better abstraction for querying individual blobs one by one.
However, there isn't a use case for that right now as the controller will always request a batch of the blobs by status.
We can probably make your cursor abstraction work with batches, but I'm not sure if the usability will significantly improve vs. effort to introduce this new abstraction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My cursor abstraction can take in status which serves as a filter of blobs to iterate over

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main point wasn't that it can't take in status. It was more about the need to support iteration one by one vs. batches

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the underlying mechanism isn't one by one -- it's the same by a small batch and buffer the results. Consuming that buffer is one by one, which is no difference than what's written here with a for loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

punting this as discussed offline

ctx context.Context,
status v2.BlobStatus,
exclusiveStartKey *StatusIndexCursor,
limit int32,
) ([]*v2.BlobMetadata, *StatusIndexCursor, error) {
var cursor map[string]types.AttributeValue
if exclusiveStartKey != nil {
pk := blobKeyPrefix
if exclusiveStartKey.BlobKey != nil && len(exclusiveStartKey.BlobKey) == 32 {
pschork marked this conversation as resolved.
Show resolved Hide resolved
pk = blobKeyPrefix + exclusiveStartKey.BlobKey.Hex()
}
cursor = map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: pk,
},
"SK": &types.AttributeValueMemberS{
Value: blobMetadataSK,
},
"UpdatedAt": &types.AttributeValueMemberN{
Value: strconv.FormatUint(exclusiveStartKey.UpdatedAt, 10),
pschork marked this conversation as resolved.
Show resolved Hide resolved
},
"BlobStatus": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
}
} else {
cursor = map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix,
},
"SK": &types.AttributeValueMemberS{
Value: blobMetadataSK,
},
"UpdatedAt": &types.AttributeValueMemberN{
Value: "0",
},
"BlobStatus": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
}
}
res, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, StatusIndexName, "BlobStatus = :status", commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
}, limit, cursor)
if err != nil {
return nil, nil, err
}

// No results
if len(res.Items) == 0 && res.LastEvaluatedKey == nil {
// return the same cursor
return nil, exclusiveStartKey, nil
}

metadata := make([]*v2.BlobMetadata, 0, len(res.Items))
for _, item := range res.Items {
m, err := UnmarshalBlobMetadata(item)
// Skip invalid/corrupt items
if err != nil {
s.logger.Errorf("failed to unmarshal blob metadata: %v", err)
continue
}
metadata = append(metadata, m)
}

lastEvaludatedKey := res.LastEvaluatedKey
if lastEvaludatedKey == nil {
lastItem := res.Items[len(res.Items)-1]
updatedAt, err := strconv.ParseUint(lastItem["UpdatedAt"].(*types.AttributeValueMemberN).Value, 10, 64)
if err != nil {
return nil, nil, err
}
bk, err := UnmarshalBlobKey(lastItem)
if err != nil {
return nil, nil, err
}
return metadata, &StatusIndexCursor{
BlobKey: &bk,
UpdatedAt: updatedAt,
}, nil
}

newCursor := StatusIndexCursor{}
err = attributevalue.UnmarshalMap(lastEvaludatedKey, &newCursor)
if err != nil {
return nil, nil, err
}
bk, err := UnmarshalBlobKey(lastEvaludatedKey)
if err != nil {
return nil, nil, err
}
newCursor.BlobKey = &bk

return metadata, &newCursor, nil
}

// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status
// Because this function scans the entire index, it should only be used for status with a limited number of items.
func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) {
Expand Down Expand Up @@ -203,6 +326,19 @@ func (s *BlobMetadataStore) PutBlobCertificate(ctx context.Context, blobCert *co
return err
}

func (s *BlobMetadataStore) DeleteBlobCertificate(ctx context.Context, blobKey corev2.BlobKey) error {
err := s.dynamoDBClient.DeleteItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: blobKeyPrefix + blobKey.Hex(),
},
"SK": &types.AttributeValueMemberS{
Value: blobCertSK,
},
})

return err
}

func (s *BlobMetadataStore) GetBlobCertificate(ctx context.Context, blobKey corev2.BlobKey) (*corev2.BlobCertificate, *encoding.FragmentInfo, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Expand Down Expand Up @@ -357,6 +493,19 @@ func (s *BlobMetadataStore) PutBatchHeader(ctx context.Context, batchHeader *cor
return err
}

func (s *BlobMetadataStore) DeleteBatchHeader(ctx context.Context, batchHeaderHash [32]byte) error {
err := s.dynamoDBClient.DeleteItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: batchHeaderKeyPrefix + hex.EncodeToString(batchHeaderHash[:]),
},
"SK": &types.AttributeValueMemberS{
Value: batchHeaderSK,
},
})

return err
}

func (s *BlobMetadataStore) GetBatchHeader(ctx context.Context, batchHeaderHash [32]byte) (*corev2.BatchHeader, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Expand Down
Loading
Loading