Skip to content

Commit

Permalink
Tool to replay a pull subscription to a topic (#350)
Browse files Browse the repository at this point in the history
This tool uses the local gcloud credentials to listen to a pull
subscription (intended for the pull subscription we created by default
for each dead letter topic), and send it back to a topic (the original
topic).

Example usage: 
```
❯ go run . --source=nghia-...-dlq-pjxu --dest=nghia-...-us-central1 --projectID nghia-...
Listening for messages.
Found message: { "content": "this is a test" }
Found message: { "content": "this is a test" }
Found message: { "content": "this is a test" }
Found message: { "content": "this is a test" }
Found message: { "content": "this is a test" }
Found message: { "content": "this is a test" }
Found message: { "content": "this is a test" }
Found message: { "content": "this is a test" }
Found message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
Found message: { "content": "this is a test" }
Replayed message: { "content": "this is a test" }
No messages received in the last 10s . Exiting.
```

Signed-off-by: Nghia Tran <tcnghia@gmail.com>
  • Loading branch information
tcnghia authored May 16, 2024
1 parent 99bad0f commit 3e90d0e
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions cmd/replayer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2024 Chainguard, Inc.
SPDX-License-Identifier: Apache-2.0
*/
package main

import (
"context"
"flag"
"fmt"
"log"
"os"
"time"

"cloud.google.com/go/pubsub"
)

const PollTimeout = 10 * time.Second

// Pulls messages from a pull subscription and replays them to a topic.
// This is useful for replaying messages from a pull subscription of a dead-letter topic
// to the original topic.
//
// Usage:
//
// replayer --source=dead-letter-pull-sub --dest=original-topic --projectID=project-id
func main() {
var srcSub, dstTop, prjID string
flag.StringVar(&srcSub, "source", "", "source subscription")
flag.StringVar(&dstTop, "dest", "", "destination topic")
flag.StringVar(&prjID, "projectID", "", "project id")

flag.Parse()
if srcSub == "" {
log.Fatal("--source is required")
}
if dstTop == "" {
log.Fatal("--dest is required")
}

if prjID == "" {
log.Fatal("--projectID is required")
}
ctx := context.Background()
client, err := pubsub.NewClient(ctx, prjID)
if err != nil {
log.Fatalf("pubsub.NewClient: %v", err)
}
defer client.Close()

sub := client.Subscription(srcSub)
top := client.Topic(dstTop)

fmt.Println("Listening for messages.")

lastReceived := time.Now()
go exitOnIdling(ctx, &lastReceived)

// Receive blocks until the context is cancelled or an error occurs.
_ = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
lastReceived = time.Now()
fmt.Println("Found message:", string(msg.Data))

// TODO: supporting a filter, either based on message content or attributes.
// if filter(msg) {
// msg.Nack()
// return
// }
result := top.Publish(ctx, msg)
if _, err := result.Get(ctx); err == nil {
fmt.Printf("Replayed message: %s\n", string(msg.Data))
msg.Ack()
} else {
fmt.Printf("Failed to publish message: %v\n", err)
msg.Nack()
}
})
}

// exitOnIdling exits the program if no messages are received in the last PollTimeout.
func exitOnIdling(_ context.Context, lastReceived *time.Time) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
// nolint:all for { select {} } is the recommended way.
for {
select {
case <-ticker.C:
if time.Since(*lastReceived) > PollTimeout {
fmt.Println("No messages received in the last", PollTimeout, ". Exiting.")
// nolint:all We can exit without running ticker.Stop()
os.Exit(0)
}
}
}
}

0 comments on commit 3e90d0e

Please sign in to comment.