Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
Add debug logs (#504)
Browse files Browse the repository at this point in the history
* Add debug logs

- Log tracer and reporter closes
- Log samplers on updates to sampling strategies

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Address feedback

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Fix lint

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Trigger build

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Add test for String

Signed-off-by: Prithvi Raj <p.r@uber.com>

* make fmt

Signed-off-by: Prithvi Raj <p.r@uber.com>

* Trigger build

Signed-off-by: Prithvi Raj <p.r@uber.com>
  • Loading branch information
vprithvi authored Apr 22, 2020
1 parent 5b28adb commit 8e7ec7b
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 5 deletions.
2 changes: 2 additions & 0 deletions reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func (r *remoteReporter) Report(span *Span) {

// Close implements Close() method of Reporter by waiting for the queue to be drained.
func (r *remoteReporter) Close() {
r.logger.Debugf("closing reporter")
if swapped := atomic.CompareAndSwapInt64(&r.closed, 0, 1); !swapped {
r.logger.Error("Repeated attempt to close the reporter is ignored")
return
Expand Down Expand Up @@ -307,6 +308,7 @@ func (r *remoteReporter) processQueue() {
r.metrics.ReporterSuccess.Inc(int64(flushed))
// to reduce the number of gauge stats, we only emit queue length on flush
r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
r.logger.Debugf("flushed %d spans", flushed)
}
span.Release()
case reporterQueueItemClose:
Expand Down
11 changes: 6 additions & 5 deletions reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ func (s *reporterSuite) assertCounter(t *testing.T, name string, tags map[string
assert.Equal(t, expectedValue, getValue(), "expected counter: name=%s, tags=%+v", name, tags)
}

func (s *reporterSuite) assertLogs(t *testing.T, expectedLogs string) {
func (s *reporterSuite) assertLogsContain(t *testing.T, expectedLogs string) {
for i := 0; i < 1000; i++ {
if s.logger.String() == expectedLogs {
break
}
time.Sleep(time.Millisecond)
}
assert.Equal(t, expectedLogs, s.logger.String(), "expected logs: %s", expectedLogs)
assert.Contains(t, s.logger.String(), expectedLogs, "expected logs: %s", expectedLogs)
}

func TestRemoteReporterAppend(t *testing.T) {
Expand Down Expand Up @@ -136,11 +136,12 @@ func TestRemoteReporterFailedFlushViaAppend(t *testing.T) {
s.tracer.StartSpan("sp1").Finish()
s.tracer.StartSpan("sp2").Finish()
s.sender.assertFlushedSpans(t, 2)
s.assertLogs(t, "ERROR: error reporting Jaeger span \"sp2\": flush error\n")
s.assertLogsContain(t, "ERROR: error reporting Jaeger span \"sp2\": flush error\n")
s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "err"}, 2)
s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "ok"}, 0)
s.close() // causes explicit flush that also fails with the same error
s.assertLogs(t, "ERROR: error reporting Jaeger span \"sp2\": flush error\nERROR: failed to flush Jaeger spans to server: flush error\n")
s.assertLogsContain(t, "ERROR: error reporting Jaeger span \"sp2\": flush error\n")
s.assertLogsContain(t, "ERROR: failed to flush Jaeger spans to server: flush error\n")
}

func TestRemoteReporterAppendWithPoolAllocator(t *testing.T) {
Expand Down Expand Up @@ -183,7 +184,7 @@ func TestRemoteReporterDoubleClose(t *testing.T) {
reporter := NewRemoteReporter(&fakeSender{}, ReporterOptions.QueueSize(1), ReporterOptions.Logger(logger))
reporter.Close()
reporter.Close()
assert.Equal(t, "ERROR: Repeated attempt to close the reporter is ignored\n", logger.String())
assert.Contains(t, logger.String(), "ERROR: Repeated attempt to close the reporter is ignored\n")
}

func TestRemoteReporterReportAfterClose(t *testing.T) {
Expand Down
22 changes: 22 additions & 0 deletions sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package jaeger
import (
"fmt"
"math"
"strings"
"sync"

"github.com/uber/jaeger-client-go/thrift-gen/sampling"
Expand Down Expand Up @@ -319,6 +320,10 @@ func (s *GuaranteedThroughputProbabilisticSampler) update(lowerBound, samplingRa
}
}

func (s GuaranteedThroughputProbabilisticSampler) String() string {
return fmt.Sprintf("GuaranteedThroughputProbabilisticSampler(lowerBound=%f, samplingRate=%f)", s.lowerBound, s.samplingRate)
}

// -----------------------

// PerOperationSampler is a delegating sampler that applies GuaranteedThroughputProbabilisticSampler
Expand Down Expand Up @@ -456,6 +461,23 @@ func (s *PerOperationSampler) Close() {
s.defaultSampler.Close()
}

func (s *PerOperationSampler) String() string {
var sb strings.Builder

fmt.Fprintf(&sb, "PerOperationSampler(defaultSampler=%v, ", s.defaultSampler)
fmt.Fprintf(&sb, "lowerBound=%f, ", s.lowerBound)
fmt.Fprintf(&sb, "maxOperations=%d, ", s.maxOperations)
fmt.Fprintf(&sb, "operationNameLateBinding=%t, ", s.operationNameLateBinding)
fmt.Fprintf(&sb, "numOperations=%d,\n", len(s.samplers))
fmt.Fprintf(&sb, "samplers=[")
for operationName, sampler := range s.samplers {
fmt.Fprintf(&sb, "\n(operationName=%s, sampler=%v)", operationName, sampler)
}
fmt.Fprintf(&sb, "])")

return sb.String()
}

// Equal is not used.
// TODO (breaking change) remove this in the future
func (s *PerOperationSampler) Equal(other Sampler) bool {
Expand Down
1 change: 1 addition & 0 deletions sampler_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (s *RemotelyControlledSampler) updateSamplerViaUpdaters(strategy interface{
return err
}
if sampler != nil {
s.logger.Debugf("sampler updated: %+v", sampler)
s.sampler = sampler
return nil
}
Expand Down
25 changes: 25 additions & 0 deletions sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,31 @@ func TestAdaptiveSampler(t *testing.T) {
assert.False(t, decision.Sample)
}

func TestPerOperationSampler_String(t *testing.T) {
strategies := &sampling.PerOperationSamplingStrategies{
DefaultSamplingProbability: testDefaultSamplingProbability,
DefaultLowerBoundTracesPerSecond: 2.0,
PerOperationStrategies: []*sampling.OperationSamplingStrategy{
{
Operation: testOperationName,
ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{SamplingRate: 1},
},
},
}

sampler := NewPerOperationSampler(PerOperationSamplerParams{
MaxOperations: testDefaultMaxOperations,
Strategies: strategies,
})

assert.Equal(t,
"PerOperationSampler(defaultSampler=ProbabilisticSampler(samplingRate=0.5), lowerBound=2.000000, "+
"maxOperations=10, operationNameLateBinding=false, numOperations=1,\nsamplers=[\n"+
"(operationName=op, "+
"sampler=GuaranteedThroughputProbabilisticSampler(lowerBound=2.000000, samplingRate=1.000000))])",
sampler.String())
}

func TestAdaptiveSamplerErrors(t *testing.T) {
strategies := &sampling.PerOperationSamplingStrategies{
DefaultSamplingProbability: testDefaultSamplingProbability,
Expand Down
1 change: 1 addition & 0 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func (t *Tracer) Extract(

// Close releases all resources used by the Tracer and flushes any remaining buffered spans.
func (t *Tracer) Close() error {
t.logger.Debugf("closing tracer")
t.reporter.Close()
t.sampler.Close()
if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
Expand Down

0 comments on commit 8e7ec7b

Please sign in to comment.