Skip to content

Commit

Permalink
config-reloader: adds new metrics
Browse files Browse the repository at this point in the history
* adds metrics for config-reloader. It must improves observability.
* adds resyncInterval, to enforce config updates with last seen configuration
* return error if key with configuration not found at k8s Secret data

#916
  • Loading branch information
f41gh7 committed Apr 15, 2024
1 parent 21ab045 commit b0443f5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ aliases:
## Next release
**Update note: [vmcluster](./api.md#vmcluster): remove fields `VMClusterSpec.VMInsert.Name`, `VMClusterSpec.VMStorage.Name`, `VMClusterSpec.VMSelect.Name`, they're marked as deprecated since v0.21.0. See [this pull request](https://github.com/VictoriaMetrics/operator/pull/907).**

- [config-reloader](./README.md): adds error metrics to the config-reloader container - `configreloader_last_reload_successful`, `configreloader_last_reload_errors_total`, `configreloader_config_last_reload_total`, `configreloader_k8s_watch_errors_total`, `configreloader_secret_content_update_errors_total`, `configreloader_last_reload_success_timestamp_seconds`. See this [issue](https://github.com/VictoriaMetrics/operator/issues/916) for details.
- [operator](./README.md): Changes error handling for reconcile. Operator sends `Events` into kubernetes API, if any error happened during object reconcile. See this [issue](https://github.com/VictoriaMetrics/operator/issues/900) for details.
- [operator](./README.md): updates base Docker image and prometheus_client to versions with with CVE fixes
- [operator](./README.md): adds reconcile retries on conflicts. See this [issue](https://github.com/VictoriaMetrics/operator/issues/901) for details.
Expand Down
16 changes: 13 additions & 3 deletions internal/config-reloader/file_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -45,7 +46,6 @@ func (fw *fileWatcher) startWatch(ctx context.Context, updates chan struct{}) er
update := func(fileName string) error {
newData, err := readFileContent(fileName)
if err != nil {
logger.Errorf("cannot read file: %s content, err: %s", fileName, err)
return fmt.Errorf("cannot read file: %s content, err: %w", fileName, err)
}
if bytes.Equal(prevContent, newData) {
Expand All @@ -72,11 +72,21 @@ func (fw *fileWatcher) startWatch(ctx context.Context, updates chan struct{}) er
}
go func() {
defer fw.wg.Done()

var t time.Ticker
if *resyncInternal > 0 {
t = *time.NewTicker(*resyncInternal)
defer t.Stop()
}
for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := update(*configFileName); err != nil {
logger.Errorf("cannot update file at force resync :%s", err)
contentUpdateErrosTotal.Inc()
continue
}
case event := <-fw.w.Events:
if event.Name != *configFileName {
logger.Infof("file name not match: %s", event.Name)
Expand All @@ -85,6 +95,7 @@ func (fw *fileWatcher) startWatch(ctx context.Context, updates chan struct{}) er
logger.Infof("changed: %s, %s", event.Name, event.Op.String())
if err := update(*configFileName); err != nil {
logger.Errorf("cannot update file :%s", err)
contentUpdateErrosTotal.Inc()
continue
}
}
Expand Down Expand Up @@ -177,7 +188,6 @@ func (dw *dirWatcher) startWatch(ctx context.Context, updates chan struct{}) {
logger.Infof("update needed file for path: %s", path)
return nil
})

if err != nil {
logger.Errorf("cannot walk: %s", err)
return false, fmt.Errorf("cannot walk path: %s, err: %w", eventPath, err)
Expand Down
36 changes: 27 additions & 9 deletions internal/config-reloader/k8s_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,18 @@ func newKubernetesWatcher(ctx context.Context, secretName, namespace string) (*k
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
var s v1.SecretList
if err := c.List(ctx, &s, listOpts); err != nil {
k8sAPIWatchErrorsTotal.Inc()
return nil, fmt.Errorf("cannot get secret from k8s api: %w", err)
}

return &s, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return c.Watch(ctx, &v1.SecretList{}, listOpts)
wi, err := c.Watch(ctx, &v1.SecretList{}, listOpts)
if err != nil {
k8sAPIWatchErrorsTotal.Inc()
}
return wi, err
},
}, &v1.Secret{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

Expand Down Expand Up @@ -90,17 +95,15 @@ func (k *k8sWatcher) startWatch(ctx context.Context, updates chan struct{}) erro
updateSecret := func(secret *v1.Secret) error {
newData, ok := secret.Data[*configSecretKey]
if !ok {
// bad case no such key.
logger.Warnf("key not found")
return fmt.Errorf("key=%q with content not found at secret=%q", *configSecretKey, secret.Name)
}
if bytes.Equal(prevContent, newData) {
logger.Infof("secret config update not needed,file content the same")
return errNotModified
}
logger.Infof("updating local file content for secret: %s", secret.Name)
if err := writeNewContent(newData); err != nil {
logger.Errorf("cannot write file content to disk: %s", err)
return err
return fmt.Errorf("cannot write file content to disk: %w", err)
}
prevContent = newData
time.Sleep(time.Second)
Expand All @@ -113,30 +116,45 @@ func (k *k8sWatcher) startWatch(ctx context.Context, updates chan struct{}) erro
}
go k.inf.Run(ctx.Done())

var s v1.Secret
if err := k.c.Get(ctx, types.NamespacedName{Namespace: k.namespace, Name: k.secretName}, &s); err != nil {
var lastSecret v1.Secret
if err := k.c.Get(ctx, types.NamespacedName{Namespace: k.namespace, Name: k.secretName}, &lastSecret); err != nil {
logger.Fatalf("cannot get secret during init secretName: %s, namespace: %s, err: %s", k.secretName, k.namespace, err)
}
if err := updateSecret(&s); err != nil {
if err := updateSecret(&lastSecret); err != nil {
if *onlyInitConfig {
return err
}
logger.Errorf("cannot update secret: %s", err)
}
k.wg.Add(1)

go func() {
defer k.wg.Done()

var t time.Ticker
if *resyncInternal > 0 {
t = *time.NewTicker(*resyncInternal)
defer t.Stop()
}
for {
select {
case <-t.C:
if err := updateSecret(&lastSecret); err != nil {
if errors.Is(err, errNotModified) {
continue
}
contentUpdateErrosTotal.Inc()
logger.Errorf("cannot force sync secret content: %s", err)
}
case item := <-k.events:
s := item.obj
lastSecret = *s
logger.Infof("get k8s sync event type: %s, for secret: %s", item.op, item.obj.Name)

if err := updateSecret(s); err != nil {
if errors.Is(err, errNotModified) {
continue
}
contentUpdateErrosTotal.Inc()
logger.Errorf("cannot sync secret content: %s", err)
}
case <-ctx.Done():
Expand Down
24 changes: 24 additions & 0 deletions internal/config-reloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/metrics"
"github.com/pires/go-proxyproto"
)

Expand All @@ -37,6 +38,16 @@ var (
reloadURL = flag.String("reload-url", "http://127.0.0.1:8429/-/reload", "reload URL to trigger config reload")
listenAddr = flag.String("http.listenAddr", ":8435", "http server listen addr")
useProxyProtocolClient = flag.Bool("reload-use-proxy-protocol", false, "enables proxy-protocol for reload connections.")
resyncInternal = flag.Duration("resync-interval", 0, "interval for force resync of the last configuration")
)

var (
configLastOkReloadTime = metrics.NewCounter(`configreloader_last_reload_success_timestamp_seconds`)
configLastReloadSuccess = metrics.NewCounter(`configreloader_last_reload_successful`)
configReloadErrorsTotal = metrics.NewCounter(`configreloader_last_reload_errors_total`)
configReloadsTotal = metrics.NewCounter(`configreloader_config_last_reload_total`)
k8sAPIWatchErrorsTotal = metrics.NewCounter(`configreloader_k8s_watch_errors_total`)
contentUpdateErrosTotal = metrics.NewCounter(`configreloader_secret_content_update_errors_total`)
)

func main() {
Expand Down Expand Up @@ -140,6 +151,7 @@ type reloader struct {
}

func (r *reloader) reload(ctx context.Context) error {
configReloadsTotal.Inc()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, *reloadURL, nil)
if err != nil {
return fmt.Errorf("cannot build request for reload api: %w", err)
Expand Down Expand Up @@ -174,8 +186,12 @@ func (c *cfgWatcher) start(ctx context.Context) {
}
if err := c.reloader(ctx); err != nil {
logger.Errorf("cannot trigger api reload: %s", err.Error())
configLastReloadSuccess.Set(0)
configReloadErrorsTotal.Inc()
return
}
configLastReloadSuccess.Set(1)
configLastOkReloadTime.Set(uint64(time.Now().UnixMilli()))
logger.Infof("reload config ok.")
}()

Expand Down Expand Up @@ -258,5 +274,13 @@ func writeNewContent(data []byte) error {
}

func requestHandler(w http.ResponseWriter, r *http.Request) bool {
switch r.URL.Path {
case "/metrics":
w.WriteHeader(http.StatusOK)
metrics.WritePrometheus(w, true)
case "/health":
w.WriteHeader(http.StatusOK)
w.Write([]byte(`OK`))
}
return false
}

0 comments on commit b0443f5

Please sign in to comment.