Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable ES doc count #2453

Merged
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ Changes by Version
### Backend Changes

#### Breaking Changes
* Configurable ES doc count ([#2453](https://github.com/jaegertracing/jaeger/pull/2453), [@albertteoh](https://github.com/albertteoh))

The `--es.max-num-spans` flag has been deprecated in favour of `--es.max-doc-count`.
`--es.max-num-spans` is marked for removal in v1.21.0 as indicated in the flag description.

If both `--es.max-num-spans` and `--es.max-doc-count` are set, the lesser of the two will be used.

The use of `--es.max-doc-count` (which defaults to 10,000) will limit the results from all Elasticsearch
queries by the configured value, limiting counts for Jaeger UI:

* Services
* Operations
* Dependencies (edges in a dependency graph)
* Span fetch size for a trace

#### New Features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ import (
)

const (
host = "0.0.0.0"
esPort = "9200"
esHostPort = host + ":" + esPort
esURL = "http://" + esHostPort
indexPrefix = "integration-test"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
numShards = 5
numReplicas = 0
host = "0.0.0.0"
esPort = "9200"
esHostPort = host + ":" + esPort
esURL = "http://" + esHostPort
indexPrefix = "integration-test"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
numShards = 5
numReplicas = 0
defaultMaxDocCount = 10_000
)

type IntegrationTest struct {
Expand Down Expand Up @@ -119,12 +120,12 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error {
IndexPrefix: indexPrefix,
TagDotReplacement: tagKeyDeDotChar,
MaxSpanAge: maxSpanAge,
MaxNumSpans: 10_000,
MaxDocCount: defaultMaxDocCount,
})
s.SpanReader = reader

depMapping := es.GetDependenciesMappings(numShards, numReplicas, esVersion)
depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix)
depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, defaultMaxDocCount)
if err := depStore.CreateTemplates(depMapping); err != nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *StorageFactory) CreateSpanReader() (spanstore.Reader, error) {
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxNumSpans: cfg.GetMaxNumSpans(),
MaxDocCount: cfg.GetMaxDocCount(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
}
Expand All @@ -100,7 +100,7 @@ func (s *StorageFactory) CreateDependencyReader() (dependencystore.Reader, error
if err != nil {
return nil, err
}
return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix()), nil
return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()), nil
}

// CreateArchiveSpanReader creates archive spanstore.Reader
Expand All @@ -115,7 +115,7 @@ func (s *StorageFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxNumSpans: cfg.GetMaxNumSpans(),
MaxDocCount: cfg.GetMaxDocCount(),
TagDotReplacement: cfg.GetTagDotReplacement(),
}), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ const (

timestampField = "timestamp"

// default number of documents to fetch in a query
// see search.max_buckets and index.max_result_window
defaultDocCount = 10_000
indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20
)

Expand All @@ -47,20 +44,22 @@ type DependencyStore struct {
client esclient.ElasticsearchClient
logger *zap.Logger
indexPrefix string
maxDocCount int
}

var _ dependencystore.Reader = (*DependencyStore)(nil)
var _ dependencystore.Writer = (*DependencyStore)(nil)

// NewDependencyStore creates dependency store.
func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string) *DependencyStore {
func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore {
if indexPrefix != "" {
indexPrefix += "-"
}
return &DependencyStore{
client: client,
logger: logger,
indexPrefix: indexPrefix + dependencyIndexBaseName + "-",
maxDocCount: maxDocCount,
}
}

Expand All @@ -84,10 +83,10 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D

// GetDependencies implements dependencystore.Reader
func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchBody := getSearchBody(endTs, lookback)
searchBody := getSearchBody(endTs, lookback, r.maxDocCount)

indices := dailyIndices(r.indexPrefix, endTs, lookback)
response, err := r.client.Search(ctx, searchBody, defaultDocCount, indices...)
response, err := r.client.Search(ctx, searchBody, r.maxDocCount, indices...)
if err != nil {
return nil, err
}
Expand All @@ -106,12 +105,12 @@ func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time,
return dbmodel.ToDomainDependencies(dependencies), nil
}

func getSearchBody(endTs time.Time, lookback time.Duration) esclient.SearchBody {
func getSearchBody(endTs time.Time, lookback time.Duration, maxDocCount int) esclient.SearchBody {
return esclient.SearchBody{
Query: &esclient.Query{
RangeQueries: map[string]esclient.RangeQuery{timestampField: {GTE: endTs.Add(-lookback), LTE: endTs}},
},
Size: defaultDocCount,
Size: maxDocCount,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore/dbmodel"
)

const defaultMaxDocCount = 10_000

func TestCreateTemplates(t *testing.T) {
client := &mockClient{}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
template := "template"
err := store.CreateTemplates(template)
require.NoError(t, err)
Expand All @@ -46,7 +48,7 @@ func TestCreateTemplates(t *testing.T) {

func TestWriteDependencies(t *testing.T) {
client := &mockClient{}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
dependencies := []model.DependencyLink{{Parent: "foo", Child: "bar", CallCount: 1}}
tsNow := time.Now()
err := store.WriteDependencies(tsNow, dependencies)
Expand Down Expand Up @@ -85,7 +87,7 @@ func TestGetDependencies(t *testing.T) {
},
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.NoError(t, err)
assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{
Expand All @@ -107,7 +109,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) {
},
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Contains(t, err.Error(), "invalid character")
assert.Nil(t, dependencies)
Expand All @@ -118,7 +120,7 @@ func TestGetDependencies_err_client(t *testing.T) {
client := &mockClient{
searchErr: searchErr,
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
tsNow := time.Now()
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Error(t, err)
Expand All @@ -141,7 +143,7 @@ const query = `{

func TestSearchBody(t *testing.T) {
date := time.Date(2020, 8, 30, 15, 0, 0, 0, time.UTC)
sb := getSearchBody(date, time.Hour)
sb := getSearchBody(date, time.Hour, defaultMaxDocCount)
jsonQuery, err := json.MarshalIndent(sb, "", " ")
require.NoError(t, err)
assert.Equal(t, query, string(jsonQuery))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ func findTraceIDsSearchBody(converter dbmodel.ToDomain, query *spanstore.TraceQu
}
}

func getServicesSearchBody() esclient.SearchBody {
aggs := fmt.Sprintf(getServicesAggregation, defaultDocCount)
func getServicesSearchBody(maxDocCount int) esclient.SearchBody {
aggs := fmt.Sprintf(getServicesAggregation, maxDocCount)
return esclient.SearchBody{
Aggregations: json.RawMessage(aggs),
}
}

func getOperationsSearchBody(serviceName string) esclient.SearchBody {
aggs := fmt.Sprintf(getOperationsAggregation, defaultDocCount)
func getOperationsSearchBody(serviceName string, maxDocCount int) esclient.SearchBody {
aggs := fmt.Sprintf(getOperationsAggregation, maxDocCount)
return esclient.SearchBody{
Aggregations: json.RawMessage(aggs),
Query: &esclient.Query{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

const (
defaultMaxDocCount = 10_000
servicesSearchBodyFixture = `{
"aggs": {
"serviceName": {
Expand Down Expand Up @@ -231,14 +232,14 @@ const (
)

func TestGetServicesSearchBody(t *testing.T) {
sb := getServicesSearchBody()
sb := getServicesSearchBody(defaultMaxDocCount)
jsonQuery, err := json.MarshalIndent(sb, "", " ")
require.NoError(t, err)
assert.Equal(t, servicesSearchBodyFixture, string(jsonQuery))
}

func TestGetOperationsSearchBody(t *testing.T) {
sb := getOperationsSearchBody("foo")
sb := getOperationsSearchBody("foo", defaultMaxDocCount)
jsonQuery, err := json.MarshalIndent(sb, "", " ")
require.NoError(t, err)
assert.Equal(t, operationsSearchBodyFixture, string(jsonQuery))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ import (
const (
// by default UI fetches 20 results
defaultNumTraces = 20
// default number of documents to fetch in a query
// see search.max_buckets and index.max_result_window
defaultDocCount = 10_000

spanIndexBaseName = "jaeger-span"
serviceIndexBaseName = "jaeger-service"
Expand Down Expand Up @@ -62,8 +59,7 @@ type Reader struct {
serviceIndexName indexNameProvider
spanIndexName indexNameProvider
maxSpanAge time.Duration
// maximum number of spans to fetch per query in multi search
maxNumberOfSpans int
maxDocCount int
archive bool
}

Expand All @@ -75,7 +71,7 @@ type Config struct {
UseReadWriteAliases bool
IndexPrefix string
MaxSpanAge time.Duration
MaxNumSpans int
MaxDocCount int
TagDotReplacement string
}

Expand All @@ -86,7 +82,7 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co
logger: logger,
archive: config.Archive,
maxSpanAge: config.MaxSpanAge,
maxNumberOfSpans: config.MaxNumSpans,
maxDocCount: config.MaxDocCount,
converter: dbmodel.NewToDomain(config.TagDotReplacement),
spanIndexName: newIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive),
serviceIndexName: newIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.UseReadWriteAliases, config.Archive),
Expand Down Expand Up @@ -162,7 +158,7 @@ func (r *Reader) findTraceIDs(ctx context.Context, query *spanstore.TraceQueryPa

// GetServices implements spanstore.Reader
func (r *Reader) GetServices(ctx context.Context) ([]string, error) {
searchBody := getServicesSearchBody()
searchBody := getServicesSearchBody(r.maxDocCount)
currentTime := time.Now()
indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime)
response, err := r.client.Search(ctx, searchBody, 0, indices...)
Expand All @@ -182,7 +178,7 @@ func (r *Reader) GetServices(ctx context.Context) ([]string, error) {

// GetOperations implements spanstore.Reader
func (r *Reader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {
searchBody := getOperationsSearchBody(query.ServiceName)
searchBody := getOperationsSearchBody(query.ServiceName, r.maxDocCount)
currentTime := time.Now()
indices := r.serviceIndexName.get(currentTime.Add(-r.maxSpanAge), currentTime)
response, err := r.client.Search(ctx, searchBody, 0, indices...)
Expand Down Expand Up @@ -290,8 +286,8 @@ func (r *Reader) multiSearchRequests(indices []string, traceIDs []model.TraceID,
s := esclient.SearchBody{
Indices: indices,
Query: traceIDQuery(traceID),
Size: defaultDocCount,
TerminateAfter: r.maxNumberOfSpans,
Size: r.maxDocCount,
TerminateAfter: r.maxDocCount,
}
if !r.archive {
s.SearchAfter = []interface{}{nextTime}
Expand Down
16 changes: 8 additions & 8 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Configuration struct {
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxNumSpans int `mapstructure:"-"` // defines maximum number of spans to fetch from storage per query
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Expand Down Expand Up @@ -87,7 +87,7 @@ type ClientBuilder interface {
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetMaxNumSpans() int
GetMaxDocCount() int
GetIndexPrefix() string
GetTagsFilePath() string
GetAllTagsAsFields() bool
Expand Down Expand Up @@ -197,9 +197,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.MaxSpanAge == 0 {
c.MaxSpanAge = source.MaxSpanAge
}
if c.MaxNumSpans == 0 {
c.MaxNumSpans = source.MaxNumSpans
}
if c.NumShards == 0 {
c.NumShards = source.NumShards
}
Expand Down Expand Up @@ -233,6 +230,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Tags.File == "" {
c.Tags.File = source.Tags.File
}
if c.MaxDocCount == 0 {
c.MaxDocCount = source.MaxDocCount
}
}

// GetNumShards returns number of shards from Configuration
Expand All @@ -250,9 +250,9 @@ func (c *Configuration) GetMaxSpanAge() time.Duration {
return c.MaxSpanAge
}

// GetMaxNumSpans returns max spans allowed per query from Configuration
func (c *Configuration) GetMaxNumSpans() int {
return c.MaxNumSpans
// GetMaxDocCount returns the maximum number of documents that a query should return
func (c *Configuration) GetMaxDocCount() int {
return c.MaxDocCount
}

// GetIndexPrefix returns index prefix
Expand Down
Loading