Skip to content

Commit

Permalink
Improve DLQ merge and purge command (#4075)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Mar 31, 2021
1 parent 5a10dc4 commit bd5859b
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions tools/cli/adminDLQCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,17 @@ func AdminPurgeDLQMessages(c *cli.Context) {
adminClient := cFactory.ServerAdminClient(c)
for shardID := lowerShardBound; shardID <= upperShardBound; shardID++ {
ctx, cancel := newContext(c)
if err := adminClient.PurgeDLQMessages(ctx, &types.PurgeDLQMessagesRequest{
err := adminClient.PurgeDLQMessages(ctx, &types.PurgeDLQMessagesRequest{
Type: toQueueType(dlqType),
SourceCluster: sourceCluster,
ShardID: int32(shardID),
InclusiveEndMessageID: lastMessageID,
}); err != nil {
cancel()
ErrorAndExit("Failed to purge dlq", err)
}
})
cancel()
if err != nil {
fmt.Printf("Failed to purge DLQ message in shard %v with error: %v.\n", shardID, err)
continue
}
time.Sleep(10 * time.Millisecond)
fmt.Printf("Successfully purge DLQ Messages in shard %v.\n", shardID)
}
Expand All @@ -173,8 +174,8 @@ func AdminMergeDLQMessages(c *cli.Context) {
}

adminClient := cFactory.ServerAdminClient(c)
ShardIDLoop:
for shardID := lowerShardBound; shardID <= upperShardBound; shardID++ {
ctx, cancel := newContext(c)
request := &types.MergeDLQMessagesRequest{
Type: toQueueType(dlqType),
SourceCluster: sourceCluster,
Expand All @@ -184,10 +185,12 @@ func AdminMergeDLQMessages(c *cli.Context) {
}

for {
ctx, cancel := newContext(c)
response, err := adminClient.MergeDLQMessages(ctx, request)
cancel()
if err != nil {
fmt.Printf("Failed to merge DLQ message in shard %v with error: %v.\n", shardID, err)
break
continue ShardIDLoop
}

if len(response.NextPageToken) == 0 {
Expand All @@ -196,7 +199,6 @@ func AdminMergeDLQMessages(c *cli.Context) {

request.NextPageToken = response.NextPageToken
}
cancel()
fmt.Printf("Successfully merged all messages in shard %v.\n", shardID)
}
}
Expand Down

0 comments on commit bd5859b

Please sign in to comment.