diff --git a/cmd/trace-agent/sampler.go b/cmd/trace-agent/sampler.go index 9f3c82ba9..9dbb1d59f 100644 --- a/cmd/trace-agent/sampler.go +++ b/cmd/trace-agent/sampler.go @@ -3,6 +3,7 @@ package main import ( "fmt" "reflect" + "sync/atomic" "time" log "github.com/cihub/seelog" @@ -16,9 +17,10 @@ import ( // Sampler chooses wich spans to write to the API type Sampler struct { // For stats - keptTraceCount int - totalTraceCount int - lastFlush time.Time + keptTraceCount uint64 + totalTraceCount uint64 + + lastFlush time.Time // actual implementation of the sampling logic engine sampler.Engine @@ -62,10 +64,10 @@ func (s *Sampler) Run() { // Add samples a trace and returns true if trace was sampled (should be kept), false otherwise func (s *Sampler) Add(t processedTrace) bool { - s.totalTraceCount++ + atomic.AddUint64(&s.totalTraceCount, 1) if s.engine.Sample(t.Trace, t.Root, t.Env) { - s.keptTraceCount++ + atomic.AddUint64(&s.keptTraceCount, 1) return true } @@ -79,12 +81,9 @@ func (s *Sampler) Stop() { // logStats reports statistics and update the info exposed. func (s *Sampler) logStats() { - for now := range time.Tick(10 * time.Second) { - keptTraceCount := s.keptTraceCount - totalTraceCount := s.totalTraceCount - s.keptTraceCount = 0 - s.totalTraceCount = 0 + keptTraceCount := atomic.SwapUint64(&s.keptTraceCount, 0) + totalTraceCount := atomic.SwapUint64(&s.totalTraceCount, 0) duration := now.Sub(s.lastFlush) s.lastFlush = now diff --git a/writer/fixtures_test.go b/writer/fixtures_test.go index 07f707069..7c9c44c60 100644 --- a/writer/fixtures_test.go +++ b/writer/fixtures_test.go @@ -99,8 +99,7 @@ func (c *testPayloadSender) Start() { // Run executes the core loop of this sender. func (c *testPayloadSender) Run() { - c.exitWG.Add(1) - defer c.exitWG.Done() + defer close(c.exit) for { select { @@ -140,8 +139,7 @@ type testPayloadSenderMonitor struct { sender PayloadSender - exit chan struct{} - exitWG sync.WaitGroup + exit chan struct{} } // newTestPayloadSenderMonitor creates a new testPayloadSenderMonitor monitoring the specified sender. @@ -159,8 +157,7 @@ func (m *testPayloadSenderMonitor) Start() { // Run executes the core loop of this monitor. func (m *testPayloadSenderMonitor) Run() { - m.exitWG.Add(1) - defer m.exitWG.Done() + defer close(m.exit) for { select { @@ -187,8 +184,8 @@ func (m *testPayloadSenderMonitor) Run() { // Stop stops this payload monitor and waits for it to stop. func (m *testPayloadSenderMonitor) Stop() { - close(m.exit) - m.exitWG.Wait() + m.exit <- struct{}{} + <-m.exit } // SuccessPayloads returns a slice containing all successful payloads. diff --git a/writer/payload.go b/writer/payload.go index d32b13b2f..e8bc14572 100644 --- a/writer/payload.go +++ b/writer/payload.go @@ -3,7 +3,6 @@ package writer import ( "container/list" "fmt" - "sync" "time" log "github.com/cihub/seelog" @@ -73,8 +72,7 @@ type BasePayloadSender struct { monitor chan interface{} endpoint Endpoint - exit chan struct{} - exitWG sync.WaitGroup + exit chan struct{} } // NewBasePayloadSender creates a new instance of a BasePayloadSender using the provided endpoint. @@ -94,8 +92,8 @@ func (s *BasePayloadSender) Send(payload *Payload) { // Stop asks this sender to stop and waits until it correctly stops. func (s *BasePayloadSender) Stop() { - close(s.exit) - s.exitWG.Wait() + s.exit <- struct{}{} + <-s.exit close(s.in) close(s.monitor) } @@ -192,8 +190,7 @@ func (s *QueuablePayloadSender) Start() { // Run executes the QueuablePayloadSender main logic synchronously. func (s *QueuablePayloadSender) Run() { - s.exitWG.Add(1) - defer s.exitWG.Done() + defer close(s.exit) for { select { diff --git a/writer/service_writer.go b/writer/service_writer.go index 1c1f3bea4..bddaf22c9 100644 --- a/writer/service_writer.go +++ b/writer/service_writer.go @@ -53,8 +53,7 @@ func (w *ServiceWriter) Start() { // Run runs the main loop of the writer goroutine. If buffers // services read from input chan and flushes them when necessary. func (w *ServiceWriter) Run() { - w.exitWG.Add(1) - defer w.exitWG.Done() + defer close(w.exit) // for now, simply flush every x seconds flushTicker := time.NewTicker(w.conf.FlushPeriod) @@ -112,8 +111,8 @@ func (w *ServiceWriter) Run() { // Stop stops the main Run loop. func (w *ServiceWriter) Stop() { - close(w.exit) - w.exitWG.Wait() + w.exit <- struct{}{} + <-w.exit w.BaseWriter.Stop() } diff --git a/writer/stats_writer.go b/writer/stats_writer.go index e089e3945..1e6b3235d 100644 --- a/writer/stats_writer.go +++ b/writer/stats_writer.go @@ -71,8 +71,7 @@ func (w *StatsWriter) Start() { // Run runs the event loop of the writer's main goroutine. It reads stat buckets // from InStats, builds stat payloads and sends them out using the base writer. func (w *StatsWriter) Run() { - w.exitWG.Add(1) - defer w.exitWG.Done() + defer close(w.exit) log.Debug("starting stats writer") @@ -89,8 +88,8 @@ func (w *StatsWriter) Run() { // Stop stops the writer func (w *StatsWriter) Stop() { - close(w.exit) - w.exitWG.Wait() + w.exit <- struct{}{} + <-w.exit // Closing the base writer, among other things, will close the // w.payloadSender.Monitor() channel, stoping the monitoring diff --git a/writer/trace_writer.go b/writer/trace_writer.go index 6c540ec3e..bcc1758ef 100644 --- a/writer/trace_writer.go +++ b/writer/trace_writer.go @@ -76,8 +76,7 @@ func (w *TraceWriter) Start() { // Run runs the main loop of the writer goroutine. It sends traces to the payload constructor, flushing it periodically // and collects stats which are also reported periodically. func (w *TraceWriter) Run() { - w.exitWG.Add(1) - defer w.exitWG.Done() + defer close(w.exit) // for now, simply flush every x seconds flushTicker := time.NewTicker(w.conf.FlushPeriod) @@ -137,8 +136,8 @@ func (w *TraceWriter) Run() { // Stop stops the main Run loop. func (w *TraceWriter) Stop() { - close(w.exit) - w.exitWG.Wait() + w.exit <- struct{}{} + <-w.exit w.BaseWriter.Stop() } diff --git a/writer/writer.go b/writer/writer.go index 7833c9617..bc9a5a4f1 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -1,8 +1,6 @@ package writer import ( - "sync" - "github.com/DataDog/datadog-trace-agent/statsd" log "github.com/cihub/seelog" @@ -15,8 +13,7 @@ type BaseWriter struct { statsClient statsd.StatsClient - exit chan struct{} - exitWG *sync.WaitGroup + exit chan struct{} } // NewBaseWriter creates a new instance of a BaseWriter. @@ -35,7 +32,6 @@ func NewBaseWriter(conf *config.AgentConfig, path string, senderFactory func(End payloadSender: senderFactory(endpoint), statsClient: statsd.Client, exit: make(chan struct{}), - exitWG: &sync.WaitGroup{}, } }