Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix archive storage not querying old spans older than maxSpanAge #1617

Merged
merged 6 commits into from
Jun 21, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ type SpanReader struct {
// The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day,
// this will be rounded down to UTC 00:00 of that day.
maxSpanAge time.Duration
maxNumSpans int
serviceOperationStorage *ServiceOperationStorage
spanIndexPrefix []string
serviceIndexPrefix []string
spanConverter dbmodel.ToDomain
timeRangeIndices timeRangeIndexFn
sourceFn sourceFn
}

// SpanReaderParams holds constructor params for NewSpanReader
Expand All @@ -124,17 +124,19 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
client: p.Client,
logger: p.Logger,
maxSpanAge: p.MaxSpanAge,
maxNumSpans: p.MaxNumSpans,
serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics
spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex),
serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex),
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases),
sourceFn: getSourceFn(p.Archive, p.MaxNumSpans),
}
}

type timeRangeIndexFn func(indexName []string, startTime time.Time, endTime time.Time) []string

type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource

func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn {
if archive {
var archivePrefix string
Expand All @@ -159,6 +161,20 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn {
return timeRangeIndices
}

func getSourceFn(archive bool, maxNumSpans int) sourceFn {
return func(query elastic.Query, nextTime uint64) *elastic.SearchSource {
s := elastic.NewSearchSource().
Query(query).
Size(defaultDocCount).
TerminateAfter(maxNumSpans).
Sort("startTime", true)
if !archive {
s.SearchAfter(nextTime)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query uses scroll API https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html.

SearchAfter says to include results with startTime parameter after nextTime. In archive storage we want to include all results.

}
return s
}
}

// timeRangeIndices returns the array of indices that we need to query, based on query params
func timeRangeIndices(indexNames []string, startTime time.Time, endTime time.Time) []string {
var indices []string
Expand Down Expand Up @@ -305,6 +321,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour))
nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour))

fmt.Println(indices)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still required?

searchAfterTime := make(map[model.TraceID]uint64)
totalDocumentsFetched := make(map[model.TraceID]int)
tracesMap := make(map[model.TraceID]*model.Trace)
Expand All @@ -318,16 +335,13 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
if val, ok := searchAfterTime[traceID]; ok {
nextTime = val
}

s := s.sourceFn(query, nextTime)

searchRequests[i] = elastic.NewSearchRequest().
IgnoreUnavailable(true).
Type(spanType).
Source(
elastic.NewSearchSource().
Query(query).
Size(defaultDocCount).
TerminateAfter(s.maxNumSpans).
Sort("startTime", true).
SearchAfter(nextTime))
Source(s)
}
// set traceIDs to empty
traceIDs = nil
Expand Down
68 changes: 54 additions & 14 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es"
Expand All @@ -43,6 +45,7 @@ const (
password = "changeme" // the elasticsearch default password
indexPrefix = "integration-test"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
)

type ESStorageIntegration struct {
Expand All @@ -53,7 +56,7 @@ type ESStorageIntegration struct {
logger *zap.Logger
}

func (s *ESStorageIntegration) initializeES(allTagsAsFields bool) error {
func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error {
rawClient, err := elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetBasicAuth(username, password),
Expand All @@ -70,22 +73,22 @@ func (s *ESStorageIntegration) initializeES(allTagsAsFields bool) error {
dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix)
s.DependencyReader = dependencyStore
s.DependencyWriter = dependencyStore
s.initSpanstore(allTagsAsFields)
s.initSpanstore(allTagsAsFields, archive)
s.CleanUp = func() error {
return s.esCleanUp(allTagsAsFields)
return s.esCleanUp(allTagsAsFields, archive)
}
s.Refresh = s.esRefresh
s.esCleanUp(allTagsAsFields)
s.esCleanUp(allTagsAsFields, archive)
return nil
}

func (s *ESStorageIntegration) esCleanUp(allTagsAsFields bool) error {
func (s *ESStorageIntegration) esCleanUp(allTagsAsFields, archive bool) error {
_, err := s.client.DeleteIndex("*").Do(context.Background())
s.initSpanstore(allTagsAsFields)
s.initSpanstore(allTagsAsFields, archive)
return err
}

func (s *ESStorageIntegration) initSpanstore(allTagsAsFields bool) {
func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) {
bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background())
client := eswrapper.WrapESClient(s.client, bp)
spanMapping, serviceMapping := es.GetMappings(5, 1)
Expand All @@ -99,14 +102,16 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields bool) {
TagDotReplacement: tagKeyDeDotChar,
SpanMapping: spanMapping,
ServiceMapping: serviceMapping,
Archive: archive,
})
s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{
Client: client,
Logger: s.logger,
MetricsFactory: metrics.NullFactory,
IndexPrefix: indexPrefix,
MaxSpanAge: 72 * time.Hour,
MaxSpanAge: maxSpanAge,
TagDotReplacement: tagKeyDeDotChar,
Archive: archive,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: formatting looks wrong here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check passed fine

})
}

Expand All @@ -129,22 +134,57 @@ func healthCheck() error {
return errors.New("elastic search is not ready")
}

func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) {
func testElasticsearchStorage(t *testing.T, allTagsAsFields, archive bool) {
if os.Getenv("STORAGE") != "elasticsearch" {
t.Skip("Integration test against ElasticSearch skipped; set STORAGE env var to elasticsearch to run this")
}
if err := healthCheck(); err != nil {
t.Fatal(err)
}
s := &ESStorageIntegration{}
require.NoError(t, s.initializeES(allTagsAsFields))
s.IntegrationTestAll(t)
require.NoError(t, s.initializeES(allTagsAsFields, archive))

if archive {
t.Run("ArchiveTrace", s.testArchiveTrace)
} else {
s.IntegrationTestAll(t)
}
}

func TestElasticsearchStorage(t *testing.T) {
testElasticsearchStorage(t, false)
testElasticsearchStorage(t, false, false)
}

func TestElasticsearchStorage_AllTagsAsObjectFields(t *testing.T) {
testElasticsearchStorage(t, true, false)
}

func TestElasticsearchStorage_Archive(t *testing.T) {
testElasticsearchStorage(t, false, true)
}

func TestElasticsearchStorageAllTagsAsObjectFields(t *testing.T) {
testElasticsearchStorage(t, true)
func (s *StorageIntegration) testArchiveTrace(t *testing.T) {
defer s.cleanUp(t)
tId := model.NewTraceID(uint64(11), uint64(22))
expected := &model.Span{
OperationName: "archive_span",
StartTime: time.Now().Add(-maxSpanAge*5),
TraceID: tId,
SpanID: model.NewSpanID(55),
References: []model.SpanRef{},
Process: model.NewProcess("archived_service", model.KeyValues{}),
}

require.NoError(t, s.SpanWriter.WriteSpan(expected))
s.refresh(t)

var actual *model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), tId)
return err == nil && len(actual.Spans) == 1
})
if !assert.True(t, found) {
CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual)
}
}