Skip to content

Commit 03dfd1f

Browse files
authored
feat: add apm agent configuration package (#143)
* feat: add apm agent configuration package * fix: improve fields alignment
1 parent 9c826f2 commit 03dfd1f

File tree

8 files changed

+956
-0
lines changed

8 files changed

+956
-0
lines changed

agentcfg/elasticsearch.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package agentcfg // import "github.com/elastic/opentelemetry-collector-components/internal/agentcfg"
19+
20+
import (
21+
"context"
22+
"encoding/json"
23+
"errors"
24+
"fmt"
25+
"io"
26+
"net/http"
27+
"sync"
28+
"sync/atomic"
29+
"time"
30+
31+
"go.uber.org/zap"
32+
33+
"github.com/elastic/go-elasticsearch/v8"
34+
"github.com/elastic/go-elasticsearch/v8/esapi"
35+
)
36+
37+
const ElasticsearchIndexName = ".apm-agent-configuration"
38+
39+
const (
40+
// ErrInfrastructureNotReady is returned when a fetch request comes in while
41+
// the infrastructure is not ready to serve the request.
42+
// This may happen when the local cache is not initialized and no fallback fetcher is configured.
43+
ErrInfrastructureNotReady = "agentcfg infrastructure is not ready"
44+
45+
// ErrNoValidElasticsearchConfig is an error where the server is
46+
// not properly configured to fetch agent configuration.
47+
ErrNoValidElasticsearchConfig = "no valid elasticsearch config to fetch agent config"
48+
)
49+
50+
const (
51+
refreshCacheTimeout = 5 * time.Second
52+
loggerRateLimit = time.Minute
53+
)
54+
55+
// TODO:
56+
// - Add Otel tracer
57+
// - Collection metrics
58+
type ElasticsearchFetcher struct {
59+
last time.Time
60+
client *elasticsearch.Client
61+
logger *zap.Logger
62+
cache []AgentConfig
63+
cacheDuration time.Duration
64+
searchSize int
65+
mu sync.RWMutex
66+
invalidESCfg atomic.Bool
67+
cacheInitialized atomic.Bool
68+
}
69+
70+
func NewElasticsearchFetcher(
71+
client *elasticsearch.Client,
72+
cacheDuration time.Duration,
73+
logger *zap.Logger,
74+
) *ElasticsearchFetcher {
75+
return &ElasticsearchFetcher{
76+
client: client,
77+
cacheDuration: cacheDuration,
78+
searchSize: 100,
79+
logger: logger,
80+
}
81+
}
82+
83+
// Fetch finds a matching agent config based on the received query.
84+
func (f *ElasticsearchFetcher) Fetch(ctx context.Context, query Query) (Result, error) {
85+
if f.cacheInitialized.Load() {
86+
// Happy path: serve fetch requests using an initialized cache.
87+
f.mu.RLock()
88+
defer f.mu.RUnlock()
89+
return matchAgentConfig(query, f.cache), nil
90+
}
91+
92+
if f.invalidESCfg.Load() {
93+
return Result{}, errors.New(ErrNoValidElasticsearchConfig)
94+
}
95+
96+
return Result{}, errors.New(ErrInfrastructureNotReady)
97+
}
98+
99+
// Run refreshes the fetcher cache by querying Elasticsearch periodically.
100+
func (f *ElasticsearchFetcher) Run(ctx context.Context) error {
101+
refresh := func() bool {
102+
// refresh returns a bool that indicates whether Run should return
103+
// immediately without error, e.g. due to invalid Elasticsearch config.
104+
if err := f.refreshCache(ctx); err != nil {
105+
106+
f.logger.Error(fmt.Sprintf("refresh cache error: %s", err))
107+
if f.invalidESCfg.Load() {
108+
f.logger.Warn("stopping refresh cache background job: elasticsearch config is invalid")
109+
return true
110+
}
111+
} else {
112+
f.logger.Debug("refresh cache success")
113+
}
114+
return false
115+
}
116+
117+
// Trigger initial run.
118+
select {
119+
case <-ctx.Done():
120+
return ctx.Err()
121+
default:
122+
if stop := refresh(); stop {
123+
return nil
124+
}
125+
}
126+
127+
// Then schedule subsequent runs.
128+
t := time.NewTicker(f.cacheDuration)
129+
defer t.Stop()
130+
for {
131+
select {
132+
case <-ctx.Done():
133+
return ctx.Err()
134+
case <-t.C:
135+
if stop := refresh(); stop {
136+
return nil
137+
}
138+
}
139+
}
140+
}
141+
142+
type cacheResult struct {
143+
ScrollID string `json:"_scroll_id"`
144+
Hits struct {
145+
Hits []struct {
146+
Source struct {
147+
Settings map[string]string `json:"settings"`
148+
Service struct {
149+
Name string `json:"name"`
150+
Environment string `json:"environment"`
151+
} `json:"service"`
152+
AgentName string `json:"agent_name"`
153+
ETag string `json:"etag"`
154+
} `json:"_source"`
155+
} `json:"hits"`
156+
} `json:"hits"`
157+
}
158+
159+
func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
160+
scrollID := ""
161+
buffer := make([]AgentConfig, 0, len(f.cache))
162+
163+
// The refresh cache operation should complete within refreshCacheTimeout.
164+
ctx, cancel := context.WithTimeout(ctx, refreshCacheTimeout)
165+
defer cancel()
166+
167+
for {
168+
result, err := f.singlePageRefresh(ctx, scrollID)
169+
if err != nil {
170+
f.clearScroll(ctx, scrollID)
171+
return err
172+
}
173+
174+
for _, hit := range result.Hits.Hits {
175+
buffer = append(buffer, AgentConfig{
176+
ServiceName: hit.Source.Service.Name,
177+
ServiceEnvironment: hit.Source.Service.Environment,
178+
AgentName: hit.Source.AgentName,
179+
Etag: hit.Source.ETag,
180+
Config: hit.Source.Settings,
181+
})
182+
}
183+
scrollID = result.ScrollID
184+
if len(result.Hits.Hits) == 0 {
185+
break
186+
}
187+
}
188+
189+
f.clearScroll(ctx, scrollID)
190+
191+
f.mu.Lock()
192+
f.cache = buffer
193+
f.mu.Unlock()
194+
f.cacheInitialized.Store(true)
195+
f.last = time.Now()
196+
return nil
197+
}
198+
199+
func (f *ElasticsearchFetcher) clearScroll(ctx context.Context, scrollID string) {
200+
resp, err := esapi.ClearScrollRequest{
201+
ScrollID: []string{scrollID},
202+
}.Do(ctx, f.client)
203+
if err != nil {
204+
f.logger.Warn(fmt.Sprintf("failed to clear scroll: %v", err))
205+
return
206+
}
207+
208+
if resp.IsError() {
209+
f.logger.Warn(fmt.Sprintf("clearscroll request returned error: %s", resp.Status()))
210+
}
211+
212+
resp.Body.Close()
213+
}
214+
215+
func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID string) (cacheResult, error) {
216+
var result cacheResult
217+
var err error
218+
var resp *esapi.Response
219+
220+
switch scrollID {
221+
case "":
222+
resp, err = esapi.SearchRequest{
223+
Index: []string{ElasticsearchIndexName},
224+
Size: &f.searchSize,
225+
Scroll: f.cacheDuration,
226+
}.Do(ctx, f.client)
227+
default:
228+
resp, err = esapi.ScrollRequest{
229+
ScrollID: scrollID,
230+
Scroll: f.cacheDuration,
231+
}.Do(ctx, f.client)
232+
}
233+
if err != nil {
234+
return result, err
235+
}
236+
defer resp.Body.Close()
237+
238+
if resp.StatusCode >= http.StatusBadRequest {
239+
// Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission
240+
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
241+
f.invalidESCfg.Store(true)
242+
}
243+
bodyBytes, err := io.ReadAll(resp.Body)
244+
if err == nil {
245+
f.logger.Debug(fmt.Sprintf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes)))
246+
}
247+
return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode)
248+
}
249+
return result, json.NewDecoder(resp.Body).Decode(&result)
250+
}

0 commit comments

Comments
 (0)