Skip to content
This repository was archived by the owner on Dec 3, 2024. It is now read-only.

Commit 2fe63ce

Browse files
authored
Merge pull request #17 from yarelm/flush
feat: add close method to sink
2 parents 53151e0 + 575d7e8 commit 2fe63ce

File tree

3 files changed

+40
-13
lines changed

3 files changed

+40
-13
lines changed

example/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ func main() {
5656
DebugLogs: true,
5757
ReportingInterval: 35 * time.Second,
5858
})
59+
defer ss.Close(context.Background())
60+
5961
cfg := metrics.DefaultConfig("go-metrics-stackdriver")
6062
cfg.EnableHostname = false
61-
metrics.NewGlobal(metrics.DefaultConfig("go-metrics-stackdriver"), ss)
63+
metrics.NewGlobal(cfg, ss)
6264

6365
// start listener
6466
log.Printf("starting server")
@@ -89,7 +91,7 @@ func main() {
8991
<-c
9092
log.Printf("ctrl+c detected... shutting down")
9193
cancel()
92-
srv.Shutdown(ctx)
94+
srv.Shutdown(context.Background())
9395
}()
9496

9597
// generate data

stackdriver.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,12 @@ type Logger interface {
5050
// Sink performs in-process aggregation of metrics to limit calls to
5151
// stackdriver.
5252
type Sink struct {
53-
client *monitoring.MetricClient
54-
interval time.Duration
55-
firstTime time.Time
53+
client *monitoring.MetricClient
54+
interval time.Duration
55+
firstTime time.Time
56+
closeCtx context.Context
57+
closeCtxCancel context.CancelFunc
58+
closeDoneC chan struct{}
5659

5760
gauges map[string]*gauge
5861
counters map[string]*counter
@@ -199,10 +202,13 @@ func NewSink(client *monitoring.MetricClient, config *Config) *Sink {
199202
Job: config.Job,
200203
TaskID: config.TaskID,
201204
},
202-
debugLogs: config.DebugLogs,
203-
log: config.Logger,
205+
debugLogs: config.DebugLogs,
206+
log: config.Logger,
207+
closeDoneC: make(chan struct{}),
204208
}
205209

210+
s.closeCtx, s.closeCtxCancel = context.WithCancel(context.Background())
211+
206212
if s.log == nil {
207213
s.log = log.New(os.Stderr, "go-metrics-stackdriver: ", log.LstdFlags)
208214
}
@@ -263,7 +269,7 @@ func NewSink(client *monitoring.MetricClient, config *Config) *Sink {
263269
s.reset()
264270

265271
// run cancelable goroutine that reports on interval
266-
go s.flushMetrics(context.Background())
272+
go s.reportOnInterval()
267273

268274
return s
269275
}
@@ -274,7 +280,7 @@ func isValidMetricsPrefix(s string) bool {
274280
return err == nil && match
275281
}
276282

277-
func (s *Sink) flushMetrics(ctx context.Context) {
283+
func (s *Sink) reportOnInterval() {
278284
if s.interval == 0*time.Second {
279285
return
280286
}
@@ -284,11 +290,12 @@ func (s *Sink) flushMetrics(ctx context.Context) {
284290

285291
for {
286292
select {
287-
case <-ctx.Done():
288-
s.log.Println("stopped flushing metrics")
293+
case <-s.closeCtx.Done():
294+
s.log.Println("stopped reporting metrics on interval")
295+
close(s.closeDoneC)
289296
return
290297
case <-ticker.C:
291-
s.report(ctx)
298+
s.report(s.closeCtx)
292299
}
293300
}
294301
}
@@ -561,6 +568,20 @@ func (s *Sink) AddSampleWithLabels(key []string, val float32, labels []metrics.L
561568
}
562569
}
563570

571+
// Close closes the sink and flushes any remaining data.
572+
func (s *Sink) Close(ctx context.Context) error {
573+
s.closeCtxCancel()
574+
select {
575+
case <-ctx.Done():
576+
s.log.Println("sink close finished prematurely")
577+
return ctx.Err()
578+
case <-s.closeDoneC:
579+
}
580+
s.report(ctx)
581+
582+
return nil
583+
}
584+
564585
var _ metrics.MetricSink = (*Sink)(nil)
565586

566587
// Series holds the naming for a timeseries metric.

stackdriver_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func TestNewSinkSetCustomPrefix(t *testing.T) {
127127
for _, tc := range tests {
128128
t.Run(tc.name, func(t *testing.T) {
129129
ss := NewSink(nil, &Config{Prefix: tc.configPrefix})
130+
defer ss.Close(context.Background())
130131

131132
if ss.prefix != tc.expectedPrefix {
132133
t.Errorf("prefix should be initalized as '" + tc.expectedPrefix + "' but got " + ss.prefix)
@@ -1044,7 +1045,7 @@ func newTestSink(interval time.Duration, client *monitoring.MetricClient) *Sink
10441045
s.extractor = DefaultLabelExtractor
10451046
s.log = log.New(os.Stderr, "go-metrics-stackdriver: ", log.LstdFlags)
10461047
s.reset()
1047-
go s.flushMetrics(context.Background())
1048+
go s.reportOnInterval()
10481049
return s
10491050
}
10501051

@@ -1123,6 +1124,7 @@ func TestCustomMonitorResource(t *testing.T) {
11231124
Type: "k8s_container",
11241125
},
11251126
})
1127+
defer sink.Close(context.Background())
11261128

11271129
monitoredResourceDiff(t, sink, labels)
11281130
}
@@ -1132,6 +1134,7 @@ func TestCustomMonitorResourceWithDefaultLabels(t *testing.T) {
11321134
ProjectID: "example_project",
11331135
Prefix: sPtr(""),
11341136
})
1137+
defer sink.Close(context.Background())
11351138

11361139
labels := defaultMonitoredResource(sink.taskInfo).GetLabels()
11371140

@@ -1160,6 +1163,7 @@ func TestCustomMonitorResourceWithInvalidLabels(t *testing.T) {
11601163
Type: "k8s_container",
11611164
},
11621165
})
1166+
defer sink.Close(context.Background())
11631167

11641168
if diff := cmp.Diff(sink.monitoredResource.GetLabels(), invalidLabels); diff == "" {
11651169
t.Error("Monitored Resource labels should not be equal")

0 commit comments

Comments
 (0)