Skip to content

Commit

Permalink
Do not skip service/operation indexing for firehose
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
Yuri Shkuro committed Feb 26, 2020
1 parent 5fb7d72 commit 090cd92
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
14 changes: 11 additions & 3 deletions plugin/storage/cassandra/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,15 @@ func (s *SpanWriter) WriteSpan(span *model.Span) error {
return err
}
}
if s.storageMode&indexFlag == indexFlag && !span.Flags.IsFirehoseEnabled() {
if err := s.writeIndexes(span, ds); err != nil {
if s.storageMode&indexFlag == indexFlag {
if err := s.writeServiceOperationIndex(span, ds); err != nil {
return err
}
if !span.Flags.IsFirehoseEnabled() {
if err := s.writeOtherIndexes(span, ds); err != nil {
return err
}
}
}
return nil
}
Expand All @@ -171,7 +176,7 @@ func (s *SpanWriter) writeSpan(span *model.Span, ds *dbmodel.Span) error {
return nil
}

func (s *SpanWriter) writeIndexes(span *model.Span, ds *dbmodel.Span) error {
func (s *SpanWriter) writeServiceOperationIndex(span *model.Span, ds *dbmodel.Span) error {
spanKind, _ := span.GetSpanKind()
if err := s.saveServiceNameAndOperationName(dbmodel.Operation{
ServiceName: ds.ServiceName,
Expand All @@ -181,7 +186,10 @@ func (s *SpanWriter) writeIndexes(span *model.Span, ds *dbmodel.Span) error {
// should this be a soft failure?
return s.logError(ds, err, "Failed to insert service name and operation name", s.logger)
}
return nil
}

func (s *SpanWriter) writeOtherIndexes(span *model.Span, ds *dbmodel.Span) error {
if err := s.indexByTags(span, ds); err != nil {
return s.logError(ds, err, "Failed to index tags", s.logger)
}
Expand Down
26 changes: 20 additions & 6 deletions plugin/storage/cassandra/spanstore/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -352,23 +353,36 @@ func TestStorageMode_IndexOnly_WithFilter(t *testing.T) {

func TestStorageMode_IndexOnly_FirehoseSpan(t *testing.T) {
withSpanWriter(0, func(w *spanWriterTest) {

w.writer.serviceNamesWriter = func(serviceName string) error { return nil }
w.writer.operationNamesWriter = func(operation dbmodel.Operation) error { return nil }
serviceWritten := atomic.NewString("")
operationWritten := &atomic.Value{}
w.writer.serviceNamesWriter = func(serviceName string) error {
serviceWritten.Store(serviceName)
return nil
}
w.writer.operationNamesWriter = func(operation dbmodel.Operation) error {
operationWritten.Store(operation)
return nil
}
span := &model.Span{
TraceID: model.NewTraceID(0, 1),
TraceID: model.NewTraceID(0, 1),
OperationName: "package-delivery",
Process: &model.Process{
ServiceName: "service-a",
ServiceName: "planet-express",
},
Flags: model.Flags(8),
}

err := w.writer.WriteSpan(span)
assert.NoError(t, err)
w.session.AssertExpectations(t)
w.session.AssertNotCalled(t, "Query", stringMatcher(serviceOperationIndex))
w.session.AssertNotCalled(t, "Query", stringMatcher(serviceNameIndex))
w.session.AssertNotCalled(t, "Query", stringMatcher(durationIndex))
assert.Equal(t, "planet-express", serviceWritten.Load())
assert.Equal(t, dbmodel.Operation{
ServiceName: "planet-express",
SpanKind: "",
OperationName: "package-delivery",
}, operationWritten.Load())
}, StoreIndexesOnly())
}

Expand Down

0 comments on commit 090cd92

Please sign in to comment.