-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
Copy pathelasticsearch_bulk.go
226 lines (187 loc) · 6.75 KB
/
elasticsearch_bulk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package elasticsearchexporter contains an opentelemetry-collector exporter
// for Elasticsearch.
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"time"
"github.com/cenkalti/backoff/v4"
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
esutil7 "github.com/elastic/go-elasticsearch/v7/esutil"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
)
type esClientCurrent = elasticsearch7.Client
type esConfigCurrent = elasticsearch7.Config
type esBulkIndexerCurrent = esutil7.BulkIndexer
type esBulkIndexerItem = esutil7.BulkIndexerItem
type esBulkIndexerResponseItem = esutil7.BulkIndexerResponseItem
// clientLogger implements the estransport.Logger interface
// that is required by the Elasticsearch client for logging.
type clientLogger zap.Logger
// LogRoundTrip should not modify the request or response, except for consuming and closing the body.
// Implementations have to check for nil values in request and response.
func (cl *clientLogger) LogRoundTrip(requ *http.Request, resp *http.Response, err error, _ time.Time, dur time.Duration) error {
zl := (*zap.Logger)(cl)
switch {
case err == nil && resp != nil:
zl.Debug("Request roundtrip completed.",
zap.String("path", sanitize.String(requ.URL.Path)),
zap.String("method", requ.Method),
zap.Duration("duration", dur),
zap.String("status", resp.Status))
case err != nil:
zl.Error("Request failed.", zap.NamedError("reason", err))
}
return nil
}
// RequestBodyEnabled makes the client pass a copy of request body to the logger.
func (*clientLogger) RequestBodyEnabled() bool {
// TODO: introduce setting log the bodies for more detailed debug logs
return false
}
// ResponseBodyEnabled makes the client pass a copy of response body to the logger.
func (*clientLogger) ResponseBodyEnabled() bool {
// TODO: introduce setting log the bodies for more detailed debug logs
return false
}
func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurrent, error) {
tlsCfg, err := config.ClientConfig.LoadTLSConfig()
if err != nil {
return nil, err
}
transport := newTransport(config, tlsCfg)
headers := make(http.Header)
for k, v := range config.Headers {
headers.Add(k, v)
}
// TODO: validate settings:
// - try to parse address and validate scheme (address must be a valid URL)
// - check if cloud ID is valid
// maxRetries configures the maximum number of event publishing attempts,
// including the first send and additional retries.
maxRetries := config.Retry.MaxRequests - 1
retryDisabled := !config.Retry.Enabled || maxRetries <= 0
if retryDisabled {
maxRetries = 0
}
return elasticsearch7.NewClient(esConfigCurrent{
Transport: transport,
// configure connection setup
Addresses: config.Endpoints,
CloudID: config.CloudID,
Username: config.Authentication.User,
Password: string(config.Authentication.Password),
APIKey: string(config.Authentication.APIKey),
Header: headers,
// configure retry behavior
RetryOnStatus: retryOnStatus,
DisableRetry: retryDisabled,
EnableRetryOnTimeout: config.Retry.Enabled,
//RetryOnError: retryOnError, // should be used from esclient version 8 onwards
MaxRetries: maxRetries,
RetryBackoff: createElasticsearchBackoffFunc(&config.Retry),
// configure sniffing
DiscoverNodesOnStart: config.Discovery.OnStart,
DiscoverNodesInterval: config.Discovery.Interval,
// configure internal metrics reporting and logging
EnableMetrics: false, // TODO
EnableDebugLogger: false, // TODO
Logger: (*clientLogger)(logger),
})
}
func newTransport(config *Config, tlsCfg *tls.Config) *http.Transport {
transport := http.DefaultTransport.(*http.Transport).Clone()
if tlsCfg != nil {
transport.TLSClientConfig = tlsCfg
}
if config.ReadBufferSize > 0 {
transport.ReadBufferSize = config.ReadBufferSize
}
if config.WriteBufferSize > 0 {
transport.WriteBufferSize = config.WriteBufferSize
}
return transport
}
func newBulkIndexer(logger *zap.Logger, client *elasticsearch7.Client, config *Config) (esBulkIndexerCurrent, error) {
// TODO: add debug logger
return esutil7.NewBulkIndexer(esutil7.BulkIndexerConfig{
NumWorkers: config.NumWorkers,
FlushBytes: config.Flush.Bytes,
FlushInterval: config.Flush.Interval,
Client: client,
Pipeline: config.Pipeline,
Timeout: config.Timeout,
OnError: func(_ context.Context, err error) {
logger.Error(fmt.Sprintf("Bulk indexer error: %v", err))
},
})
}
func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Duration {
if !config.Enabled {
return nil
}
expBackoff := backoff.NewExponentialBackOff()
if config.InitialInterval > 0 {
expBackoff.InitialInterval = config.InitialInterval
}
if config.MaxInterval > 0 {
expBackoff.MaxInterval = config.MaxInterval
}
expBackoff.Reset()
return func(attempts int) time.Duration {
if attempts == 1 {
expBackoff.Reset()
}
return expBackoff.NextBackOff()
}
}
func shouldRetryEvent(status int) bool {
for _, retryable := range retryOnStatus {
if status == retryable {
return true
}
}
return false
}
func pushDocuments(ctx context.Context, logger *zap.Logger, index string, document []byte, bulkIndexer esBulkIndexerCurrent, maxAttempts int) error {
attempts := 1
body := bytes.NewReader(document)
item := esBulkIndexerItem{Action: createAction, Index: index, Body: body}
// Setup error handler. The handler handles the per item response status based on the
// selective ACKing in the bulk response.
item.OnFailure = func(ctx context.Context, item esBulkIndexerItem, resp esBulkIndexerResponseItem, err error) {
switch {
case attempts < maxAttempts && shouldRetryEvent(resp.Status):
logger.Debug("Retrying to index",
zap.String("name", index),
zap.Int("attempt", attempts),
zap.Int("status", resp.Status),
zap.NamedError("reason", err))
attempts++
_, _ = body.Seek(0, io.SeekStart)
_ = bulkIndexer.Add(ctx, item)
case resp.Status == 0 && err != nil:
// Encoding error. We didn't even attempt to send the event
logger.Error("Drop docs: failed to add docs to the bulk request buffer.",
zap.NamedError("reason", err))
case err != nil:
logger.Error("Drop docs: failed to index",
zap.String("name", index),
zap.Int("attempt", attempts),
zap.Int("status", resp.Status),
zap.NamedError("reason", err))
default:
logger.Error(fmt.Sprintf("Drop docs: failed to index: %#v", resp.Error),
zap.Int("attempt", attempts),
zap.Int("status", resp.Status))
}
}
return bulkIndexer.Add(ctx, item)
}