Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/scollector: bugfix - datapoint validation #1856

Merged
merged 1 commit into from
Aug 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ var (
// Dropped is the number of dropped data points due to a full queue.
dropped int64

// Dropped is the number of discarded data points due to being invalid
discarded int64

// Sent is the number of sent data points.
sent int64

Expand All @@ -71,6 +74,7 @@ var (

const (
descCollectAlloc = "Total number of bytes allocated and still in use by the runtime (via runtime.ReadMemStats)."
descCollectDiscarded = "Counter of discarded data points due to being invalid."
descCollectDropped = "Counter of dropped data points due to the queue being full."
descCollectGoRoutines = "Total number of goroutines that currently exist (via runtime.NumGoroutine)."
descCollectGcCpuFraction = "fraction of CPU time used by GC"
Expand Down Expand Up @@ -131,6 +135,13 @@ func InitChan(tsdbhost *url.URL, root string, ch chan *opentsdb.DataPoint) error
slock.Unlock()
return
})
Set("collect.discarded", Tags, func() (i interface{}) {
slock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you even need to lock if you are using the atomic operations. You could use atomic.LoadInt64 and not need locks at all. I wish we had done all these stats like that from the start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want me to move all those to atomic.* ? I'd be happy to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please. I'm going to merge this for now, but if you wanted to do that in a seperate pr it would be appreciated. Thanks!

i = discarded
slock.Unlock()
return
})

Set("collect.sent", Tags, func() (i interface{}) {
slock.Lock()
i = sent
Expand Down Expand Up @@ -176,6 +187,7 @@ func InitChan(tsdbhost *url.URL, root string, ch chan *opentsdb.DataPoint) error
metadata.AddMetricMeta(metricRoot+"collect.queued", metadata.Gauge, metadata.Item, descCollectQueued)
metadata.AddMetricMeta(metricRoot+"collect.sent", metadata.Counter, metadata.PerSecond, descCollectSent)
metadata.AddMetricMeta(metricRoot+"collect.dropped", metadata.Counter, metadata.PerSecond, descCollectDropped)
metadata.AddMetricMeta(metricRoot+"collect.discarded", metadata.Counter, metadata.PerSecond, descCollectDiscarded)
// Make sure these get zeroed out instead of going unknown on restart
Add("collect.post.error", Tags, 0)
Add("collect.post.bad_status", Tags, 0)
Expand Down
6 changes: 5 additions & 1 deletion collect/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
func queuer() {
for dp := range tchan {
if err := dp.Clean(); err != nil {
atomic.AddInt64(&dropped, 1)
atomic.AddInt64(&discarded, 1)
continue // if anything gets this far that can't be made valid, just drop it silently.
}
qlock.Lock()
Expand All @@ -32,6 +32,10 @@ func queuer() {
queue = append(queue, dp)
select {
case dp = <-tchan:
if err := dp.Clean(); err != nil {
atomic.AddInt64(&discarded, 1)
break // if anything gets this far that can't be made valid, just drop it silently.
}
continue
default:
}
Expand Down
3 changes: 3 additions & 0 deletions opentsdb/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ func (t TagSet) Clean() error {
if err != nil {
return fmt.Errorf("cleaning value %s for tag %s: %s", v, k, err)
}
if kc == "" || vc == "" {
return fmt.Errorf("cleaning value [%s] for tag [%s] result in an empty string", v, k)
}
if kc != k || vc != v {
delete(t, k)
t[kc] = vc
Expand Down