Skip to content

Commit

Permalink
Make starting the pprof server optional. (apache#91)
Browse files Browse the repository at this point in the history
* Make starting the pprof server optional.

* Add debug option for log level.
  • Loading branch information
cckellogg authored and aahmed-se committed Nov 12, 2019
1 parent 1325df1 commit 163fba0
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 79 deletions.
72 changes: 36 additions & 36 deletions perf/perf-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
package main

import (
"context"
"encoding/json"
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/spf13/cobra"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar"
)

// ConsumeArgs define the parameters required by consume
Expand All @@ -36,24 +36,30 @@ type ConsumeArgs struct {
ReceiverQueueSize int
}

var consumeArgs ConsumeArgs
func newConsumerCommand() *cobra.Command {
consumeArgs := ConsumeArgs{}
cmd := &cobra.Command{
Use: "consume <topic>",
Short: "Consume from topic",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
stop := stopCh()
if FlagProfile {
RunProfiling(stop)
}
consumeArgs.Topic = args[0]
consume(&consumeArgs, stop)
},
}

var cmdConsume = &cobra.Command{
Use: "consume <topic>",
Short: "Consume from topic",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
consumeArgs.Topic = args[0]
consume()
},
}
flags := cmd.Flags()
flags.StringVarP(&consumeArgs.SubscriptionName, "subscription", "s", "sub", "Subscription name")
flags.IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size", "r", 1000, "Receiver queue size")

func initConsumer() {
cmdConsume.Flags().StringVarP(&consumeArgs.SubscriptionName, "subscription", "s", "sub", "Subscription name")
cmdConsume.Flags().IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size", "r", 1000, "Receiver queue size")
return cmd
}

func consume() {
func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
b, _ := json.MarshalIndent(clientArgs, "", " ")
log.Info("Client config: ", string(b))
b, _ = json.MarshalIndent(consumeArgs, "", " ")
Expand All @@ -80,32 +86,24 @@ func consume() {

defer consumer.Close()

ctx := context.Background()

var msgReceived int64
var bytesReceived int64

go func() {
for {
msg, err := consumer.Receive(ctx)
if err != nil {
return
}

atomic.AddInt64(&msgReceived, 1)
atomic.AddInt64(&bytesReceived, int64(len(msg.Payload())))

if err := consumer.Ack(msg); err != nil {
return
}
}
}()
// keep message stats
msgReceived := int64(0)
bytesReceived := int64(0)

// Print stats of the consume rate
tick := time.NewTicker(10 * time.Second)

for {
select {
case cm, ok := <-consumer.Chan():
if !ok {
return
}
msgReceived++
bytesReceived += int64(len(cm.Message.Payload()))
if err := consumer.Ack(cm.Message); err != nil {
return
}
case <-tick.C:
currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
currentBytesReceived := atomic.SwapInt64(&bytesReceived, 0)
Expand All @@ -114,6 +112,8 @@ func consume() {

log.Infof(`Stats - Consume rate: %6.1f msg/s - %6.1f Mbps`,
msgRate, bytesRate*8/1024/1024)
case <-stop:
return
}
}
}
63 changes: 39 additions & 24 deletions perf/perf-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"encoding/json"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/beefsack/go-rate"
"github.com/bmizerany/perks/quantile"
"github.com/spf13/cobra"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar"
)

// ProduceArgs define the parameters required by produce
Expand All @@ -39,39 +40,46 @@ type ProduceArgs struct {
ProducerQueueSize int
}

var produceArgs ProduceArgs

var cmdProduce = &cobra.Command{
Use: "produce ",
Short: "Produce on a topic and measure performance",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
produceArgs.Topic = args[0]
produce()
},
}
func newProducerCommand() *cobra.Command {
produceArgs := ProduceArgs{}
cmd := &cobra.Command{
Use: "produce ",
Short: "Produce on a topic and measure performance",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
stop := stopCh()
if FlagProfile {
RunProfiling(stop)
}
produceArgs.Topic = args[0]
produce(&produceArgs, stop)
},
}

func initProducer() {
cmdProduce.Flags().IntVarP(&produceArgs.Rate, "rate", "r", 100, "Publish rate. Set to 0 to go unthrottled")
cmdProduce.Flags().IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1, "Batching grouping time in millis")
cmdProduce.Flags().IntVarP(&produceArgs.MessageSize, "size", "s", 1024, "Message size")
cmdProduce.Flags().IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000, "Produce queue size")
// add flags
flags := cmd.Flags()
flags.IntVarP(&produceArgs.Rate, "rate", "r", 100,
"Publish rate. Set to 0 to go unthrottled")
flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
"Batching grouping time in millis")
flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
"Message size")
flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
"Produce queue size")

return cmd
}

func produce() {
func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
b, _ := json.MarshalIndent(clientArgs, "", " ")
log.Info("Client config: ", string(b))
b, _ = json.MarshalIndent(produceArgs, "", " ")
log.Info("Producer config: ", string(b))

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: clientArgs.ServiceURL,
})

client, err := NewClient()
if err != nil {
log.Fatal(err)
}

defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Expand All @@ -84,7 +92,6 @@ func produce() {
if err != nil {
log.Fatal(err)
}

defer producer.Close()

ctx := context.Background()
Expand All @@ -100,6 +107,12 @@ func produce() {
}

for {
select {
case <-stop:
return
default:
}

if rateLimiter != nil {
rateLimiter.Wait()
}
Expand All @@ -126,6 +139,8 @@ func produce() {

for {
select {
case <-stop:
return
case <-tick.C:
messageRate := float64(messagesPublished) / float64(10)
log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max %6.1f`,
Expand Down
101 changes: 82 additions & 19 deletions perf/pulsar-perf-go.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,113 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"

"github.com/spf13/cobra"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar"
)

// global flags
var FlagProfile bool
var flagDebug bool

type ClientArgs struct {
ServiceURL string
}

var clientArgs ClientArgs

func main() {
// use `go tool pprof http://localhost:3000/debug/pprof/profile` to get pprof file(cpu info)
// use `go tool pprof http://localhost:3000/debug/pprof/heap` to get inuse_space file
go func() {
listenAddr := net.JoinHostPort("localhost", "3000")
fmt.Printf("Profile server listening on %s\n", listenAddr)
profileRedirect := http.RedirectHandler("/debug/pprof", http.StatusSeeOther)
http.Handle("/", profileRedirect)
err := fmt.Errorf("%v", http.ListenAndServe(listenAddr, nil))
fmt.Println(err.Error())
}()
func NewClient() (pulsar.Client, error) {
clientOpts := pulsar.ClientOptions{
URL: clientArgs.ServiceURL,
}
return pulsar.NewClient(clientOpts)
}

func initLogger(debug bool) {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
TimestampFormat: "15:04:05.000",
})
log.SetLevel(log.InfoLevel)
level := log.InfoLevel
if debug {
level = log.DebugLevel
}
log.SetLevel(level)
}

initProducer()
initConsumer()
func main() {
rootCmd := &cobra.Command{
PersistentPreRun: func(cmd *cobra.Command, args []string) {
initLogger(flagDebug)
},
Use: "pulsar-perf-go",
}

var rootCmd = &cobra.Command{Use: "pulsar-perf-go"}
rootCmd.Flags().StringVarP(&clientArgs.ServiceURL, "service-url", "u",
flags := rootCmd.PersistentFlags()
flags.BoolVar(&FlagProfile, "profile", false, "enable profiling")
flags.BoolVar(&flagDebug, "debug", false, "enable debug output")
flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
"pulsar://localhost:6650", "The Pulsar service URL")
rootCmd.AddCommand(cmdProduce, cmdConsume)

rootCmd.AddCommand(newProducerCommand())
rootCmd.AddCommand(newConsumerCommand())

err := rootCmd.Execute()
if err != nil {
panic("execute root cmd error, please check.")
fmt.Fprintf(os.Stderr, "executing command error=%+v\n", err)
os.Exit(1)
}
}

func stopCh() <-chan struct{} {
stop := make(chan struct{})
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
go func() {
for {
select {
case <-signalCh:
close(stop)
}
}
}()
return stop
}

func RunProfiling(stop <-chan struct{}) {
go func() {
if err := serveProfiling("0.0.0.0:6060", stop); err != nil && err != http.ErrServerClosed {
log.WithError(err).Error("Unable to start debug profiling server")
}
}()
}

// use `go tool pprof http://addr/debug/pprof/profile` to get pprof file(cpu info)
// use `go tool pprof http://addr/debug/pprof/heap` to get inuse_space file
func serveProfiling(addr string, stop <-chan struct{}) error {
s := http.Server{
Addr: addr,
Handler: http.DefaultServeMux,
}
go func() {
<-stop
log.Infof("Shutting down pprof server")
s.Shutdown(context.Background())
}()

fmt.Printf("Starting pprof server at: %s\n", addr)
fmt.Printf(" use `go tool pprof http://%s/debug/pprof/prof` to get pprof file(cpu info)\n", addr)
fmt.Printf(" use `go tool pprof http://%s/debug/pprof/heap` to get inuse_space file\n", addr)
fmt.Println()

return s.ListenAndServe()
}

0 comments on commit 163fba0

Please sign in to comment.