From 65788ff78eab0bd40848d14be452d4af4ff07fa6 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 26 Feb 2020 16:43:07 -0500 Subject: [PATCH 1/6] Do not skip service/operation indexing for firehose Signed-off-by: Yuri Shkuro --- plugin/storage/cassandra/spanstore/writer.go | 14 +++++++--- .../cassandra/spanstore/writer_test.go | 26 ++++++++++++++----- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/plugin/storage/cassandra/spanstore/writer.go b/plugin/storage/cassandra/spanstore/writer.go index ecb16388a0c..6c3e0f973fe 100644 --- a/plugin/storage/cassandra/spanstore/writer.go +++ b/plugin/storage/cassandra/spanstore/writer.go @@ -141,10 +141,15 @@ func (s *SpanWriter) WriteSpan(span *model.Span) error { return err } } - if s.storageMode&indexFlag == indexFlag && !span.Flags.IsFirehoseEnabled() { - if err := s.writeIndexes(span, ds); err != nil { + if s.storageMode&indexFlag == indexFlag { + if err := s.writeServiceOperationIndex(span, ds); err != nil { return err } + if !span.Flags.IsFirehoseEnabled() { + if err := s.writeOtherIndexes(span, ds); err != nil { + return err + } + } } return nil } @@ -171,7 +176,7 @@ func (s *SpanWriter) writeSpan(span *model.Span, ds *dbmodel.Span) error { return nil } -func (s *SpanWriter) writeIndexes(span *model.Span, ds *dbmodel.Span) error { +func (s *SpanWriter) writeServiceOperationIndex(span *model.Span, ds *dbmodel.Span) error { spanKind, _ := span.GetSpanKind() if err := s.saveServiceNameAndOperationName(dbmodel.Operation{ ServiceName: ds.ServiceName, @@ -181,7 +186,10 @@ func (s *SpanWriter) writeIndexes(span *model.Span, ds *dbmodel.Span) error { // should this be a soft failure? return s.logError(ds, err, "Failed to insert service name and operation name", s.logger) } + return nil +} +func (s *SpanWriter) writeOtherIndexes(span *model.Span, ds *dbmodel.Span) error { if err := s.indexByTags(span, ds); err != nil { return s.logError(ds, err, "Failed to index tags", s.logger) } diff --git a/plugin/storage/cassandra/spanstore/writer_test.go b/plugin/storage/cassandra/spanstore/writer_test.go index 5966b7d9427..36e9d6b288e 100644 --- a/plugin/storage/cassandra/spanstore/writer_test.go +++ b/plugin/storage/cassandra/spanstore/writer_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/uber/jaeger-lib/metrics/metricstest" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" @@ -352,13 +353,21 @@ func TestStorageMode_IndexOnly_WithFilter(t *testing.T) { func TestStorageMode_IndexOnly_FirehoseSpan(t *testing.T) { withSpanWriter(0, func(w *spanWriterTest) { - - w.writer.serviceNamesWriter = func(serviceName string) error { return nil } - w.writer.operationNamesWriter = func(operation dbmodel.Operation) error { return nil } + serviceWritten := atomic.NewString("") + operationWritten := &atomic.Value{} + w.writer.serviceNamesWriter = func(serviceName string) error { + serviceWritten.Store(serviceName) + return nil + } + w.writer.operationNamesWriter = func(operation dbmodel.Operation) error { + operationWritten.Store(operation) + return nil + } span := &model.Span{ - TraceID: model.NewTraceID(0, 1), + TraceID: model.NewTraceID(0, 1), + OperationName: "package-delivery", Process: &model.Process{ - ServiceName: "service-a", + ServiceName: "planet-express", }, Flags: model.Flags(8), } @@ -366,9 +375,14 @@ func TestStorageMode_IndexOnly_FirehoseSpan(t *testing.T) { err := w.writer.WriteSpan(span) assert.NoError(t, err) w.session.AssertExpectations(t) - w.session.AssertNotCalled(t, "Query", stringMatcher(serviceOperationIndex)) w.session.AssertNotCalled(t, "Query", stringMatcher(serviceNameIndex)) w.session.AssertNotCalled(t, "Query", stringMatcher(durationIndex)) + assert.Equal(t, "planet-express", serviceWritten.Load()) + assert.Equal(t, dbmodel.Operation{ + ServiceName: "planet-express", + SpanKind: "", + OperationName: "package-delivery", + }, operationWritten.Load()) }, StoreIndexesOnly()) } From 19369d0d7b033c541d2d80406e3982f4bd1c8d3c Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 26 Feb 2020 19:13:58 -0500 Subject: [PATCH 2/6] review Signed-off-by: Yuri Shkuro --- plugin/storage/cassandra/spanstore/writer.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/plugin/storage/cassandra/spanstore/writer.go b/plugin/storage/cassandra/spanstore/writer.go index 6c3e0f973fe..2ffc278c6f5 100644 --- a/plugin/storage/cassandra/spanstore/writer.go +++ b/plugin/storage/cassandra/spanstore/writer.go @@ -142,14 +142,9 @@ func (s *SpanWriter) WriteSpan(span *model.Span) error { } } if s.storageMode&indexFlag == indexFlag { - if err := s.writeServiceOperationIndex(span, ds); err != nil { + if err := s.writeIndexes(span, ds); err != nil { return err } - if !span.Flags.IsFirehoseEnabled() { - if err := s.writeOtherIndexes(span, ds); err != nil { - return err - } - } } return nil } @@ -176,7 +171,7 @@ func (s *SpanWriter) writeSpan(span *model.Span, ds *dbmodel.Span) error { return nil } -func (s *SpanWriter) writeServiceOperationIndex(span *model.Span, ds *dbmodel.Span) error { +func (s *SpanWriter) writeIndexes(span *model.Span, ds *dbmodel.Span) error { spanKind, _ := span.GetSpanKind() if err := s.saveServiceNameAndOperationName(dbmodel.Operation{ ServiceName: ds.ServiceName, @@ -186,10 +181,11 @@ func (s *SpanWriter) writeServiceOperationIndex(span *model.Span, ds *dbmodel.Sp // should this be a soft failure? return s.logError(ds, err, "Failed to insert service name and operation name", s.logger) } - return nil -} -func (s *SpanWriter) writeOtherIndexes(span *model.Span, ds *dbmodel.Span) error { + if span.Flags.IsFirehoseEnabled() { + return nil // skipping expensive indexing + } + if err := s.indexByTags(span, ds); err != nil { return s.logError(ds, err, "Failed to index tags", s.logger) } From 597bcb2585b18b331c6df2b709c49ccbd9ab1d44 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Thu, 27 Feb 2020 11:17:17 -0500 Subject: [PATCH 3/6] log commands Signed-off-by: Yuri Shkuro --- scripts/travis/cassandra-integration-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/travis/cassandra-integration-test.sh b/scripts/travis/cassandra-integration-test.sh index 2cfbe55e69e..4f516bf8cfc 100755 --- a/scripts/travis/cassandra-integration-test.sh +++ b/scripts/travis/cassandra-integration-test.sh @@ -1,6 +1,6 @@ #!/bin/bash -set -e +set -ex # Clean up before starting. docker rm -f cassandra || true From f8978c508a0aeabf4c9663b0a8c3188645e3c464 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Thu, 27 Feb 2020 11:22:27 -0500 Subject: [PATCH 4/6] Use unqualified path to cqlsh, as the old path does not seem valid anymore. which cqlsh prints /opt/cassandra/bin/cqlsh Signed-off-by: Yuri Shkuro --- plugin/storage/cassandra/schema/docker.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/cassandra/schema/docker.sh b/plugin/storage/cassandra/schema/docker.sh index 4a55902c330..c95f7a1ca2f 100755 --- a/plugin/storage/cassandra/schema/docker.sh +++ b/plugin/storage/cassandra/schema/docker.sh @@ -3,7 +3,7 @@ # This script is used in the Docker image jaegertracing/jaeger-cassandra-schema # that allows installing Jaeger keyspace and schema without installing cqlsh. -CQLSH=${CQLSH:-"/usr/bin/cqlsh"} +CQLSH=${CQLSH:-"cqlsh"} CQLSH_HOST=${CQLSH_HOST:-"cassandra"} CQLSH_SSL=${CQLSH_SSL:-""} CASSANDRA_WAIT_TIMEOUT=${CASSANDRA_WAIT_TIMEOUT:-"60"} From b89a791317ab0c576a18a403603cca7ab1d75258 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Thu, 27 Feb 2020 11:46:16 -0500 Subject: [PATCH 5/6] different path Signed-off-by: Yuri Shkuro --- plugin/storage/cassandra/schema/docker.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/cassandra/schema/docker.sh b/plugin/storage/cassandra/schema/docker.sh index c95f7a1ca2f..d0265e8f93d 100755 --- a/plugin/storage/cassandra/schema/docker.sh +++ b/plugin/storage/cassandra/schema/docker.sh @@ -3,7 +3,7 @@ # This script is used in the Docker image jaegertracing/jaeger-cassandra-schema # that allows installing Jaeger keyspace and schema without installing cqlsh. -CQLSH=${CQLSH:-"cqlsh"} +CQLSH=${CQLSH:-"/opt/cassandra/bin/cqlsh"} CQLSH_HOST=${CQLSH_HOST:-"cassandra"} CQLSH_SSL=${CQLSH_SSL:-""} CASSANDRA_WAIT_TIMEOUT=${CASSANDRA_WAIT_TIMEOUT:-"60"} From 1e1cbd524b2177737bd696d5bd2b96d1ed5e0d30 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Thu, 27 Feb 2020 12:00:35 -0500 Subject: [PATCH 6/6] fix flaky test Signed-off-by: Yuri Shkuro --- cmd/agent/app/reporter/client_metrics_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/agent/app/reporter/client_metrics_test.go b/cmd/agent/app/reporter/client_metrics_test.go index 8d1651b0207..345cc386773 100644 --- a/cmd/agent/app/reporter/client_metrics_test.go +++ b/cmd/agent/app/reporter/client_metrics_test.go @@ -256,15 +256,16 @@ func TestClientMetricsReporter_Expire(t *testing.T) { t.Run(fmt.Sprintf("iter%d:gauge=%d,log=%s", i, test.expGauge, test.expLog), func(t *testing.T) { // Expire loop runs every 100us, and removes the client after 5ms. // We check for condition in each test for up to 5ms (10*500us). + var gaugeValue int64 = -1 for i := 0; i < 10; i++ { _, gauges := tr.mb.Snapshot() - if gauges["client_stats.connected_clients"] == int64(test.expGauge) { + gaugeValue = gauges["client_stats.connected_clients"] + if gaugeValue == int64(test.expGauge) { break } time.Sleep(500 * time.Microsecond) } - tr.mb.AssertGaugeMetrics(t, - metricstest.ExpectedMetric{Name: "client_stats.connected_clients", Value: test.expGauge}) + assert.EqualValues(t, test.expGauge, gaugeValue) tr.assertLog(t, test.expLog, clientUUID) // sleep between tests long enough to exceed the 5ms TTL.