Skip to content

Commit

Permalink
v0.6.5-beta
Browse files Browse the repository at this point in the history
  • Loading branch information
prezha committed Feb 13, 2023
1 parent d7de885 commit 474477f
Show file tree
Hide file tree
Showing 12 changed files with 921 additions and 2 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*.dll
*.so
*.dylib
pubsubctl*

# Test binary, built with `go test -c`
*.test
Expand All @@ -12,4 +13,7 @@
*.out

# Dependency directories (remove the comment below to include it)
# vendor/
vendor/

# backup
backup
61 changes: 60 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,61 @@
# pubsubctl
pubsubctl is a basic Google Cloud Platform Pub/Sub [Emulator] CLI
pubsubctl is a basic Google Pub/Sub Emulator/Cloud CLI

## quick start - basic _emulator_ usage
1. start pubsub emulator in a separate session (to monitor logs)
- using the gCloud Docker image:
> docker run --rm -ti -p 127.0.0.1:8085:8085 gcr.io/google.com/cloudsdktool/google-cloud-cli:latest gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=my-project --log-http --verbosity=debug --user-output-enabled
- using the Google Cloud CLI:
> gcloud beta emulators pubsub start --host-port=127.0.0.1:8085 --project=my-project --log-http --verbosity=debug --user-output-enabled
2. set the environment variables
> export PUBSUB_EMULATOR_HOST=127.0.0.1:8085
> export PUBSUB_PROJECT_ID=my-project
3. create topic and subscription
> pubsubctl create -t my-topic -s my-sub
4. publish a message to topic
> pubsubctl publish -t my-topic -m "my message"
5. receive message from subscription
> pubsubctl receive -s my-sub
## getting help
```
$ ./pubsubctl --help
pubsubctl v0.6.5-beta
pubsubctl is a basic Google Cloud Platform Pub/Sub [Emulator] CLI
Usage:
pubsubctl [command]
Available Commands:
completion Generate the autocompletion script for the specified shell
create create topic/subscription
delete delete topic and/or subscription
help Help about any command
list list topics and/or subscriptions
publish publish message
receive receive message
test test pubsub emulator
Flags:
--cloud use cloud pubsub instead of the emulator
-h, --help help for pubsubctl
--host string [address:port] of the emulator host, defaulting to PUBSUB_EMULATOR_HOST environment variable value (if set), ignored if 'cloud' flag is also set (default "localhost:8085")
-p, --project string pubsub project, defaulting to PUBSUB_PROJECT_ID environment variable value (if set) (default "default")
-s, --subscription string pubsub subscription
--timeout duration time to wait for command execution (value <=0 disables timeout) (default 5s)
-t, --topic string pubsub topic
Use "pubsubctl [command] --help" for more information about a command.
```

## build from source
- linux
> GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o pubsubctl
- macos
> GOOS=darwin GOARCH=arm64 go build -ldflags="-w -s" -o pubsubctl
## references
- https://cloud.google.com/pubsub/docs/overview
- https://cloud.google.com/pubsub/docs/emulator
- https://cloud.google.com/sdk/gcloud/reference/beta/emulators/pubsub
87 changes: 87 additions & 0 deletions cmd/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package cmd

import (
"context"
"fmt"
"log"

"cloud.google.com/go/pubsub"
"github.com/spf13/cobra"
)

func init() {
rootCmd.AddCommand(createCmd)
}

var createCmd = &cobra.Command{
Use: "create",
Short: "create topic/subscription",
Long: `create the specified topic and optionally subscription (if given) under the pubsub project/topic, eg:
pubsubctl create [--project=<projectID>] --topic=<topicID> [--subscription=<subID>]`,
Run: func(cmd *cobra.Command, args []string) {
topic, err := createTopic(ctx, client, topicID)
if err != nil {
log.Fatalf("cannot create topic %q: %v", topicID, err)
}
defer topic.Stop()
log.Printf("created %q", topic.String())

if subID != "" {
sub, err := createSubscription(ctx, client, topic, subID)
if err != nil {
log.Fatalf("cannot create subscription %q: %v", subID, err)
}
log.Printf("created %q", sub.String())
}
},
}

// createTopic returns exiting topic or creates new one.
func createTopic(ctx context.Context, client *pubsub.Client, id string) (*pubsub.Topic, error) {
if id == "" {
return nil, fmt.Errorf("nil topic")
}

topic := client.Topic(id)
ok, err := topic.Exists(ctx)
if err != nil {
return nil, err
}
if !ok {
topic, err = client.CreateTopic(ctx, id)
if err != nil {
return nil, err
}
}
return topic, nil
}

// createSubscription returns exiting subscription for topic or creates new one.
func createSubscription(ctx context.Context, client *pubsub.Client, topic *pubsub.Topic, id string) (*pubsub.Subscription, error) {
if id == "" {
return nil, fmt.Errorf("nil subscription")
}

sub := client.Subscription(id)
ok, err := sub.Exists(ctx)
if err != nil {
return nil, err
}

// delete existing subscription if its config cannot be fetched or it's not linked to same topic name
if ok {
cfg, err := sub.Config(ctx)
if err != nil || cfg.Topic.String() != topic.String() {
if err := delete(ctx, client, sub.String()); err != nil {
return nil, fmt.Errorf("cannot delete existing subscription: %v", err)
}
}
}

sub, err = client.CreateSubscription(ctx, id, pubsub.SubscriptionConfig{Topic: topic})
if err != nil {
return nil, err
}

return sub, nil
}
83 changes: 83 additions & 0 deletions cmd/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package cmd

import (
"context"
"fmt"
"log"
"strings"

"cloud.google.com/go/pubsub"
"github.com/spf13/cobra"
)

func init() {
rootCmd.AddCommand(deleteCmd)
}

var deleteCmd = &cobra.Command{
Use: "delete",
Short: "delete topic and/or subscription",
Long: `delete specified object with the given path path, eg:
pubsubctl delete [--project=<projectID>] --topic=<topicID>
or
pubsubctl delete [--project=<projectID>] --topic=<topicID> --subscription=<subID>
or
pubsubctl delete [--project=<projectID>] --subscription=<subID>
or
pubsubctl delete <path>`,
Run: func(cmd *cobra.Command, args []string) {
if topicID == "" && subID == "" && len(args) == 0 {
log.Fatalf("topic, subscription or path must be specified")
}

if topicID != "" {
path := fmt.Sprintf("projects/%s/topics/%s", projectID, topicID)
if err := delete(ctx, client, path); err != nil {
log.Fatalf("cannot delete topic %q: %v", path, err)
}
log.Printf("topic %q deleted", path)
}

if subID != "" {
path := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subID)
if err := delete(ctx, client, path); err != nil {
log.Fatalf("cannot delete subscription %q: %v", path, err)
}
log.Printf("subscription %q deleted", path)
}

if len(args) > 0 {
path := args[0]
if err := delete(ctx, client, path); err != nil {
log.Fatalf("cannot delete path %q: %v", path, err)
}
log.Printf("path %q deleted", path)
}
},
}

// delete deletes object with given path.
// ref: https://cloud.google.com/pubsub/docs/admin#delete_a_topic
// ref: https://cloud.google.com/pubsub/docs/create-subscription#delete_subscription
// ref: https://cloud.google.com/pubsub/docs/create-subscription#detach_a_subscription_from_a_topic
func delete(ctx context.Context, client *pubsub.Client, path string) error {
p := strings.Split(path, "/")
if len(p) != 4 {
return fmt.Errorf("invalid path %q", path)
}

obj := p[2]
id := p[3]
switch obj {
case "topics":
t := client.Topic(id)
defer t.Stop()
err := t.Delete(ctx)
return err
case "subscriptions":
s := client.Subscription(id)
return s.Delete(ctx)
default:
return fmt.Errorf("invalid path %q", path)
}
}
117 changes: 117 additions & 0 deletions cmd/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package cmd

import (
"context"
"encoding/json"
"fmt"
"log"

"cloud.google.com/go/pubsub"
"github.com/spf13/cobra"
"google.golang.org/api/iterator"
)

func init() {
rootCmd.AddCommand(listCmd)
}

var listCmd = &cobra.Command{
Use: "list",
Short: "list topics and/or subscriptions",
Long: `list topics and/or subscriptions under the specified pubsub project, eg:
pubsubctl list [--project=<projectID>] [topics | subscriptions]`,
Run: func(cmd *cobra.Command, args []string) {
obj := ""
if len(args) > 0 {
obj = args[0]
}
if obj != "" {
out, err := list(ctx, client, obj)
if err != nil {
log.Fatalf("cannot list %q: %v", obj, err)
}
log.Printf("%s under project %q: %s", obj, projectID, out)
return
}

out, err := list(ctx, client, "topics")
if err != nil {
log.Fatalf("cannot list topics: %v", err)
}
log.Printf("%q project's topics: %s", projectID, out)

out, err = list(ctx, client, "subscriptions")
if err != nil {
log.Fatalf("cannot list subscriptions: %v", err)
}
log.Printf("%q project's subscriptions: %s", projectID, out)
},
}

func list(ctx context.Context, client *pubsub.Client, obj string) (string, error) {
switch obj {
case "topics":
t, err := topics(ctx, client)
if err != nil {
return "", err
}
b, err := json.MarshalIndent(t, "", " ")
if err != nil {
return "", err
}
return string(b), nil
case "subscriptions":
s, err := subscriptions(ctx, client)
if err != nil {
return "", err
}
b, err := json.MarshalIndent(s, "", " ")
if err != nil {
return "", err
}
return string(b), nil
default:
return "", fmt.Errorf("invalid list option %q, valid ones are: \"topics\" or \"subscriptions\"", obj)
}
}

func topics(ctx context.Context, client *pubsub.Client) (map[string][]string, error) {
it := client.Topics(ctx)
list := map[string][]string{}
for {
t, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("cannot iterate pubsub topics: %v", err)
}
subs, err := subIter(t.Subscriptions(ctx))
if err != nil {
return nil, fmt.Errorf("cannot iterate pubsub subscriptions of topic %q: %v", t.String(), err)
}

list[t.String()] = subs
}
return list, nil
}

func subscriptions(ctx context.Context, client *pubsub.Client) ([]string, error) {
it := client.Subscriptions(ctx)
return subIter(it)
}

func subIter(it *pubsub.SubscriptionIterator) ([]string, error) {
list := []string{}
for {
s, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("cannot iterate pubsub subscriptions: %v", err)
}
list = append(list, s.String())
}
return list, nil
}
Loading

0 comments on commit 474477f

Please sign in to comment.