Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix msgs and bytes per sec rates #66

Merged
merged 2 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions nats-top.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
ui "gopkg.in/gizak/termui.v1"
)

const version = "0.5.0"
const version = "0.5.2"

var (
host = flag.String("s", "127.0.0.1", "The nats server host.")
Expand Down Expand Up @@ -403,7 +403,6 @@ func generateParagraphCSV(
inMsgs, inBytes, inMsgsRate, inBytesRate,
outMsgs, outBytes, outMsgsRate, outBytesRate,
)

text += fmt.Sprintf("\n\nConnections Polled:[__DELIM__]%d\n", numConns)

displaySubs := engine.DisplaySubs
Expand Down
65 changes: 35 additions & 30 deletions util/toputils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -17,16 +16,18 @@ import (
const DisplaySubscriptions = 1

type Engine struct {
Host string
Port int
HttpClient *http.Client
Uri string
Conns int
SortOpt server.SortOpt
Delay int
DisplaySubs bool
StatsCh chan *Stats
ShutdownCh chan struct{}
Host string
Port int
HttpClient *http.Client
Uri string
Conns int
SortOpt server.SortOpt
Delay int
DisplaySubs bool
StatsCh chan *Stats
ShutdownCh chan struct{}
LastStats *Stats
LastPollTime time.Time
}

func NewEngine(host string, port int, conns int, delay int) *Engine {
Expand Down Expand Up @@ -92,34 +93,24 @@ func (engine *Engine) Request(path string) (interface{}, error) {
// which can modify how poll values then sends to channel.
func (engine *Engine) MonitorStats() error {
delay := time.Duration(engine.Delay) * time.Second
isFirstTime := true
lastPollTime := time.Now()

for {
select {
case <-engine.ShutdownCh:
return nil
case <-time.After(delay):
stats, newLastPollTime := engine.fetchStats(isFirstTime, lastPollTime)
if stats != nil && errors.Is(stats.Error, errDud) {
isFirstTime = false
lastPollTime = newLastPollTime
}

engine.StatsCh <- stats
engine.StatsCh <- engine.fetchStats()
}
}
}

func (engine *Engine) FetchStatsSnapshot() *Stats {
stats, _ := engine.fetchStats(true, time.Now())

return stats
return engine.fetchStats()
}

var errDud = fmt.Errorf("")

func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Stats, time.Time) {
func (engine *Engine) fetchStats() *Stats {
var inMsgsDelta int64
var outMsgsDelta int64
var inBytesDelta int64
Expand Down Expand Up @@ -147,7 +138,7 @@ func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Sta
result, err := engine.Request("/varz")
if err != nil {
stats.Error = err
return stats, time.Time{}
return stats
}

if varz, ok := result.(*server.Varz); ok {
Expand All @@ -160,14 +151,24 @@ func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Sta
result, err := engine.Request("/connz")
if err != nil {
stats.Error = err
return stats, time.Time{}
return stats
}

if connz, ok := result.(*server.Connz); ok {
stats.Connz = connz
}
}

var isFirstTime bool
if engine.LastStats != nil {
inMsgsLastVal = engine.LastStats.Varz.InMsgs
outMsgsLastVal = engine.LastStats.Varz.OutMsgs
inBytesLastVal = engine.LastStats.Varz.InBytes
outBytesLastVal = engine.LastStats.Varz.OutBytes
} else {
isFirstTime = true
}

// Periodic snapshot to get per sec metrics
inMsgsVal := stats.Varz.InMsgs
outMsgsVal := stats.Varz.OutMsgs
Expand All @@ -185,7 +186,7 @@ func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Sta
outBytesLastVal = outBytesVal

now := time.Now()
tdelta := now.Sub(lastPollTime)
tdelta := now.Sub(engine.LastPollTime)

// Calculate rates but the first time
if !isFirstTime {
Expand All @@ -194,15 +195,19 @@ func (engine *Engine) fetchStats(isFirstTime bool, lastPollTime time.Time) (*Sta
inBytesRate = float64(inBytesDelta) / tdelta.Seconds()
outBytesRate = float64(outBytesDelta) / tdelta.Seconds()
}

stats.Rates = &Rates{
rates := &Rates{
InMsgsRate: inMsgsRate,
OutMsgsRate: outMsgsRate,
InBytesRate: inBytesRate,
OutBytesRate: outBytesRate,
}
stats.Rates = rates

return stats, now
// Snapshot stats.
engine.LastStats = stats
engine.LastPollTime = now

return stats
}

// SetupHTTPS sets up the http client and uri to use for polling.
Expand Down