Skip to content
This repository was archived by the owner on Dec 3, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ func main() {
DebugLogs: true,
ReportingInterval: 35 * time.Second,
})
defer ss.Close(context.Background())

cfg := metrics.DefaultConfig("go-metrics-stackdriver")
cfg.EnableHostname = false
metrics.NewGlobal(metrics.DefaultConfig("go-metrics-stackdriver"), ss)
metrics.NewGlobal(cfg, ss)

// start listener
log.Printf("starting server")
Expand Down Expand Up @@ -89,7 +91,7 @@ func main() {
<-c
log.Printf("ctrl+c detected... shutting down")
cancel()
srv.Shutdown(ctx)
srv.Shutdown(context.Background())
}()

// generate data
Expand Down
41 changes: 31 additions & 10 deletions stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ type Logger interface {
// Sink performs in-process aggregation of metrics to limit calls to
// stackdriver.
type Sink struct {
client *monitoring.MetricClient
interval time.Duration
firstTime time.Time
client *monitoring.MetricClient
interval time.Duration
firstTime time.Time
closeCtx context.Context
closeCtxCancel context.CancelFunc
closeDoneC chan struct{}

gauges map[string]*gauge
counters map[string]*counter
Expand Down Expand Up @@ -199,10 +202,13 @@ func NewSink(client *monitoring.MetricClient, config *Config) *Sink {
Job: config.Job,
TaskID: config.TaskID,
},
debugLogs: config.DebugLogs,
log: config.Logger,
debugLogs: config.DebugLogs,
log: config.Logger,
closeDoneC: make(chan struct{}),
}

s.closeCtx, s.closeCtxCancel = context.WithCancel(context.Background())

if s.log == nil {
s.log = log.New(os.Stderr, "go-metrics-stackdriver: ", log.LstdFlags)
}
Expand Down Expand Up @@ -263,7 +269,7 @@ func NewSink(client *monitoring.MetricClient, config *Config) *Sink {
s.reset()

// run cancelable goroutine that reports on interval
go s.flushMetrics(context.Background())
go s.reportOnInterval()

return s
}
Expand All @@ -274,7 +280,7 @@ func isValidMetricsPrefix(s string) bool {
return err == nil && match
}

func (s *Sink) flushMetrics(ctx context.Context) {
func (s *Sink) reportOnInterval() {
if s.interval == 0*time.Second {
return
}
Expand All @@ -284,11 +290,12 @@ func (s *Sink) flushMetrics(ctx context.Context) {

for {
select {
case <-ctx.Done():
s.log.Println("stopped flushing metrics")
case <-s.closeCtx.Done():
s.log.Println("stopped reporting metrics on interval")
close(s.closeDoneC)
return
case <-ticker.C:
s.report(ctx)
s.report(s.closeCtx)
}
}
}
Expand Down Expand Up @@ -561,6 +568,20 @@ func (s *Sink) AddSampleWithLabels(key []string, val float32, labels []metrics.L
}
}

// Close closes the sink and flushes any remaining data.
func (s *Sink) Close(ctx context.Context) error {
s.closeCtxCancel()
select {
case <-ctx.Done():
s.log.Println("sink close finished prematurely")
return ctx.Err()
case <-s.closeDoneC:
}
s.report(ctx)

return nil
}

var _ metrics.MetricSink = (*Sink)(nil)

// Series holds the naming for a timeseries metric.
Expand Down
6 changes: 5 additions & 1 deletion stackdriver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func TestNewSinkSetCustomPrefix(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ss := NewSink(nil, &Config{Prefix: tc.configPrefix})
defer ss.Close(context.Background())

if ss.prefix != tc.expectedPrefix {
t.Errorf("prefix should be initalized as '" + tc.expectedPrefix + "' but got " + ss.prefix)
Expand Down Expand Up @@ -1044,7 +1045,7 @@ func newTestSink(interval time.Duration, client *monitoring.MetricClient) *Sink
s.extractor = DefaultLabelExtractor
s.log = log.New(os.Stderr, "go-metrics-stackdriver: ", log.LstdFlags)
s.reset()
go s.flushMetrics(context.Background())
go s.reportOnInterval()
return s
}

Expand Down Expand Up @@ -1123,6 +1124,7 @@ func TestCustomMonitorResource(t *testing.T) {
Type: "k8s_container",
},
})
defer sink.Close(context.Background())

monitoredResourceDiff(t, sink, labels)
}
Expand All @@ -1132,6 +1134,7 @@ func TestCustomMonitorResourceWithDefaultLabels(t *testing.T) {
ProjectID: "example_project",
Prefix: sPtr(""),
})
defer sink.Close(context.Background())

labels := defaultMonitoredResource(sink.taskInfo).GetLabels()

Expand Down Expand Up @@ -1160,6 +1163,7 @@ func TestCustomMonitorResourceWithInvalidLabels(t *testing.T) {
Type: "k8s_container",
},
})
defer sink.Close(context.Background())

if diff := cmp.Diff(sink.monitoredResource.GetLabels(), invalidLabels); diff == "" {
t.Error("Monitored Resource labels should not be equal")
Expand Down