-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathwriter.go
154 lines (133 loc) · 4.92 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0
package spanstore
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/cache"
"github.com/jaegertracing/jaeger/pkg/es"
cfg "github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/internal/dbmodel"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)
const (
spanType = "span"
serviceType = "service"
serviceCacheTTLDefault = 12 * time.Hour
indexCacheTTLDefault = 48 * time.Hour
)
type spanWriterMetrics struct {
indexCreate *spanstoremetrics.WriteMetrics
}
type serviceWriter func(string, *dbmodel.Span)
// SpanWriter is a wrapper around elastic.Client
type SpanWriter struct {
client func() es.Client
logger *zap.Logger
writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn
// indexCache cache.Cache
serviceWriter serviceWriter
spanConverter dbmodel.FromDomain
spanServiceIndex spanAndServiceIndexFn
}
// SpanWriterParams holds constructor parameters for NewSpanWriter
type SpanWriterParams struct {
Client func() es.Client
Logger *zap.Logger
MetricsFactory metrics.Factory
SpanIndex cfg.IndexOptions
ServiceIndex cfg.IndexOptions
IndexPrefix cfg.IndexPrefix
AllTagsAsFields bool
TagKeysAsFields []string
TagDotReplacement string
Archive bool
UseReadWriteAliases bool
ServiceCacheTTL time.Duration
}
// NewSpanWriter creates a new SpanWriter for use
func NewSpanWriter(p SpanWriterParams) *SpanWriter {
serviceCacheTTL := p.ServiceCacheTTL
if p.ServiceCacheTTL == 0 {
serviceCacheTTL = serviceCacheTTLDefault
}
serviceOperationStorage := NewServiceOperationStorage(p.Client, p.Logger, serviceCacheTTL)
return &SpanWriter{
client: p.Client,
logger: p.Logger,
writerMetrics: spanWriterMetrics{
indexCreate: spanstoremetrics.NewWriter(p.MetricsFactory, "index_create"),
},
serviceWriter: serviceOperationStorage.Write,
spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement),
spanServiceIndex: getSpanAndServiceIndexFn(p),
}
}
// CreateTemplates creates index templates.
func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string, indexPrefix cfg.IndexPrefix) error {
jaegerSpanIdx := indexPrefix.Apply("jaeger-span")
jaegerServiceIdx := indexPrefix.Apply("jaeger-service")
_, err := s.client().CreateTemplate(jaegerSpanIdx).Body(spanTemplate).Do(context.Background())
if err != nil {
return fmt.Errorf("failed to create template %q: %w", jaegerSpanIdx, err)
}
_, err = s.client().CreateTemplate(jaegerServiceIdx).Body(serviceTemplate).Do(context.Background())
if err != nil {
return fmt.Errorf("failed to create template %q: %w", jaegerServiceIdx, err)
}
return nil
}
// spanAndServiceIndexFn returns names of span and service indices
type spanAndServiceIndexFn func(spanTime time.Time) (string, string)
func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn {
spanIndexPrefix := p.IndexPrefix.Apply(spanIndexBaseName)
serviceIndexPrefix := p.IndexPrefix.Apply(serviceIndexBaseName)
if p.Archive {
return func(_ time.Time) (string, string) {
if p.UseReadWriteAliases {
return archiveIndex(spanIndexPrefix, archiveWriteIndexSuffix), ""
}
return archiveIndex(spanIndexPrefix, archiveIndexSuffix), ""
}
}
if p.UseReadWriteAliases {
return func(_ /* spanTime */ time.Time) (string, string) {
return spanIndexPrefix + "write", serviceIndexPrefix + "write"
}
}
return func(date time.Time) (string, string) {
return indexWithDate(spanIndexPrefix, p.SpanIndex.DateLayout, date), indexWithDate(serviceIndexPrefix, p.ServiceIndex.DateLayout, date)
}
}
// WriteSpan writes a span and its corresponding service:operation in ElasticSearch
func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error {
spanIndexName, serviceIndexName := s.spanServiceIndex(span.StartTime)
jsonSpan := s.spanConverter.FromDomainEmbedProcess(span)
if serviceIndexName != "" {
s.writeService(serviceIndexName, jsonSpan)
}
s.writeSpan(spanIndexName, jsonSpan)
s.logger.Debug("Wrote span to ES index", zap.String("index", spanIndexName))
return nil
}
// Close closes SpanWriter
func (s *SpanWriter) Close() error {
return s.client().Close()
}
func keyInCache(key string, c cache.Cache) bool {
return c.Get(key) != nil
}
func writeCache(key string, c cache.Cache) {
c.Put(key, key)
}
func (s *SpanWriter) writeService(indexName string, jsonSpan *dbmodel.Span) {
s.serviceWriter(indexName, jsonSpan)
}
func (s *SpanWriter) writeSpan(indexName string, jsonSpan *dbmodel.Span) {
s.client().Index().Index(indexName).Type(spanType).BodyJson(&jsonSpan).Add()
}