Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Remove the need for external tools for managing ElasticSearch #6291

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -77,6 +78,7 @@ func newTestServer(t *testing.T) *testServer {
assert.NoError(t, server.Serve(lis))
exited.Done()
}()
time.Sleep(time.Second)
t.Cleanup(func() {
server.Stop()
exited.Wait() // don't allow test to finish before server exits
Expand Down
26 changes: 26 additions & 0 deletions cmd/jaeger/internal/index_cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package internal

import (
"log"

"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/spf13/viper"
)

// StartCleaner runs the index cleaner if enabled and handles config loading
func StartCleaner(v *viper.Viper) {
var cleanerConfig config.Cleaner
if err := v.UnmarshalKey("cleaner", &cleanerConfig); err != nil {
log.Printf("Error loading cleaner configuration: %v", err)
return
}

Check warning on line 16 in cmd/jaeger/internal/index_cleaner.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/index_cleaner.go#L11-L16

Added lines #L11 - L16 were not covered by tests

cfg := &config.Configuration{
Cleaner: cleanerConfig,
}

// If cleaner is enabled, run the cleaner
if cfg.Cleaner.Enabled {
go cfg.RunCleaner()
}

Check warning on line 25 in cmd/jaeger/internal/index_cleaner.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/index_cleaner.go#L18-L25

Added lines #L18 - L25 were not covered by tests
}
2 changes: 2 additions & 0 deletions cmd/jaeger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func main() {
command,
)

internal.StartCleaner(v)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's enabled by config option, no need to "start" anything expicitly


if err := command.Execute(); err != nil {
log.Fatal(err)
}
Expand Down
218 changes: 218 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
"bufio"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -37,6 +40,22 @@
IndexPrefixSeparator = "-"
)

// Cleaner struct for configuring index cleaning
type Cleaner struct {
Enabled bool `mapstructure:"enabled"`
Frequency time.Duration `mapstructure:"frequency"`
IncludeArchive bool `mapstructure:"include_archive"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in v2 archive storage is a separate instance, this property won't make sense

}

// Validation function for Cleaner struct
func (c *Cleaner) Validate() error {
if c.Frequency <= 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validation can be declared in the field tags

return fmt.Errorf("frequency must be greater than 0")
}

return nil
}

// IndexOptions describes the index format and rollover frequency
type IndexOptions struct {
// Priority contains the priority of index template (ESv8 only).
Expand Down Expand Up @@ -136,6 +155,10 @@
Tags TagsAsFields `mapstructure:"tags_as_fields"`
// Enabled, if set to true, enables the namespace for storage pointed to by this configuration.
Enabled bool `mapstructure:"-"`

// ---- ES-specific config ----
// Cleaner holds the configuration for the Elasticsearch index Cleaner.
Cleaner Cleaner `mapstructure:"cleaner"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Cleaner Cleaner `mapstructure:"cleaner"`
IndexCleaner IndexCleaner `mapstructure:"index_cleaner"`

}

// TagsAsFields holds configuration for tag schema.
Expand Down Expand Up @@ -585,3 +608,198 @@
_, err := govalidator.ValidateStruct(c)
return err
}

// Run starts the cleaner that runs periodically based on the Frequency field.
func (cfg *Configuration) RunCleaner() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config package is for configuration, not for business logic. This should be somewhere in plugin/storage/es

if !cfg.Cleaner.Enabled {
log.Println("Cleaner is disabled, skipping...")
return
}

Check warning on line 617 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L613-L617

Added lines #L613 - L617 were not covered by tests

ticker := time.NewTicker(cfg.Cleaner.Frequency)
defer ticker.Stop()

// Use for range to iterate over ticker.C channel
for range ticker.C {
log.Println("Running index cleaner...")
err := cfg.cleanIndices()
if err != nil {
log.Printf("Error cleaning indices: %v", err)
}

Check warning on line 628 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L619-L628

Added lines #L619 - L628 were not covered by tests
}
}

// cleanIndices connects to ElasticSearch and deletes the indices based on the configuration.
func (cfg *Configuration) cleanIndices() error {
log.Println("Cleaning Elasticsearch indices...")

// Delete old indices (older than max span age)
err := cfg.deleteOldIndices()
if err != nil {
return fmt.Errorf("failed to delete old indices: %w", err)
}

Check warning on line 640 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L633-L640

Added lines #L633 - L640 were not covered by tests

// If IncludeArchive is true, also delete archived indices
if cfg.Cleaner.IncludeArchive {
err = cfg.deleteArchivedIndices()
if err != nil {
return fmt.Errorf("failed to delete archived indices: %w", err)
}

Check warning on line 647 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L643-L647

Added lines #L643 - L647 were not covered by tests
}

return nil

Check warning on line 650 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L650

Added line #L650 was not covered by tests
}

// deleteOldIndices deletes indices older than the configured MaxSpanAge.
func (cfg *Configuration) deleteOldIndices() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this logic already exist in in cmd/es-index-cleaner? Why do we need to duplicate it?

esClient, err := esV8.NewClient(
esV8.Config{Addresses: cfg.Servers},
)
if err != nil {
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}

Check warning on line 660 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L654-L660

Added lines #L654 - L660 were not covered by tests

indexNames, err := fetchAllIndexNames(esClient)
if err != nil {
return fmt.Errorf("failed to fetch indices: %w", err)
}

Check warning on line 665 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L662-L665

Added lines #L662 - L665 were not covered by tests

for _, indexName := range indexNames {
log.Printf("Checking index: %s", indexName)
creationDate, err := fetchIndexCreationDate(cfg.Servers[0], indexName)
if err != nil {
log.Printf("Failed to get creation date for index %s: %v", indexName, err)
continue

Check warning on line 672 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L667-L672

Added lines #L667 - L672 were not covered by tests
}

if time.Since(creationDate) > cfg.MaxSpanAge {
log.Printf("Index %s is older than MaxSpanAge, deleting...", indexName)
if err := deleteIndex(esClient, indexName); err != nil {
log.Printf("Failed to delete index: %s, error: %v", indexName, err)
} else {
log.Printf("Successfully deleted index: %s", indexName)
}

Check warning on line 681 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L675-L681

Added lines #L675 - L681 were not covered by tests
}
}

return nil

Check warning on line 685 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L685

Added line #L685 was not covered by tests
}

// fetchAllIndexNames retrieves all index names from the Elasticsearch cluster.
func fetchAllIndexNames(esClient *esV8.Client) ([]string, error) {
res, err := esClient.Indices.GetAlias()
if err != nil {
return nil, err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

Check warning on line 699 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L689-L699

Added lines #L689 - L699 were not covered by tests

var indicesMap map[string]interface{}
if err := json.Unmarshal(body, &indicesMap); err != nil {
return nil, err
}

Check warning on line 704 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L701-L704

Added lines #L701 - L704 were not covered by tests

var indexNames []string
for indexName := range indicesMap {
indexNames = append(indexNames, indexName)
}

Check warning on line 709 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L706-L709

Added lines #L706 - L709 were not covered by tests

return indexNames, nil

Check warning on line 711 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L711

Added line #L711 was not covered by tests
}

// fetchIndexCreationDate fetches the creation date of the given index.
func fetchIndexCreationDate(serverURL, indexName string) (time.Time, error) {
url := fmt.Sprintf("%s/%s/_settings", serverURL, indexName)
resp, err := http.Get(url)
if err != nil {
return time.Time{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return time.Time{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

Check warning on line 725 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L715-L725

Added lines #L715 - L725 were not covered by tests

body, err := io.ReadAll(resp.Body)
if err != nil {
return time.Time{}, err
}

Check warning on line 730 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L727-L730

Added lines #L727 - L730 were not covered by tests

var settingsMap map[string]interface{}
if err := json.Unmarshal(body, &settingsMap); err != nil {
return time.Time{}, err
}

Check warning on line 735 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L732-L735

Added lines #L732 - L735 were not covered by tests

creationDateStr, ok := settingsMap[indexName].(map[string]interface{})["settings"].(map[string]interface{})["index.creation_date"].(string)
if !ok {
return time.Time{}, fmt.Errorf("creation date not found for index %s", indexName)
}

Check warning on line 740 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L737-L740

Added lines #L737 - L740 were not covered by tests

creationDateMillis, err := strconv.ParseInt(creationDateStr, 10, 64)
if err != nil {
return time.Time{}, err
}

Check warning on line 745 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L742-L745

Added lines #L742 - L745 were not covered by tests

return time.Unix(0, creationDateMillis*int64(time.Millisecond)), nil

Check warning on line 747 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L747

Added line #L747 was not covered by tests
}

// deleteIndex deletes the given index.
func deleteIndex(esClient *esV8.Client, indexName string) error {
_, err := esClient.Indices.Delete([]string{indexName})
return err

Check warning on line 753 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L751-L753

Added lines #L751 - L753 were not covered by tests
}

// deleteArchivedIndices simulates deletion of archived indices if includeArchive is true.
func (cfg *Configuration) deleteArchivedIndices() error {
if !cfg.Cleaner.IncludeArchive {
log.Printf("Skipping archived indices deletion because IncludeArchive is false.")
return nil
}

Check warning on line 761 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L757-L761

Added lines #L757 - L761 were not covered by tests

log.Println("Deleting archived indices...")

// Use the servers from the configuration
esClient, err := esV8.NewClient(esV8.Config{
Addresses: cfg.Servers, // Dynamically use configured servers
})
if err != nil {
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}

Check warning on line 771 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L763-L771

Added lines #L763 - L771 were not covered by tests

// Fetch all indices from Elasticsearch
res, err := esClient.Indices.GetAlias()
if err != nil {
return fmt.Errorf("failed to fetch indices: %w", err)
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}

Check warning on line 783 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L774-L783

Added lines #L774 - L783 were not covered by tests

var indicesMap map[string]interface{}
err = json.Unmarshal(body, &indicesMap)
if err != nil {
return fmt.Errorf("failed to unmarshal response body: %w", err)
}

Check warning on line 789 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L785-L789

Added lines #L785 - L789 were not covered by tests

for indexName := range indicesMap {
if len(indexName) > 8 && indexName[:8] == "archive_" {
log.Printf("Found archived index: %s", indexName)

_, err = esClient.Indices.Delete([]string{indexName})
if err != nil {
log.Printf("Failed to delete archived index: %s, error %v", indexName, err)
} else {
log.Printf("Successfully deleted archived index: %s", indexName)
}

Check warning on line 800 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L791-L800

Added lines #L791 - L800 were not covered by tests
}
}

return nil

Check warning on line 804 in pkg/es/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/config/config.go#L804

Added line #L804 was not covered by tests
}
30 changes: 30 additions & 0 deletions pkg/es/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,3 +799,33 @@ func TestApplyForIndexPrefix(t *testing.T) {
func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}

func TestCleanerValidate(t *testing.T) {
tests := []struct {
name string
cleaner *Cleaner
wantErr bool
}{
{
name: "Valid Cleaner",
cleaner: &Cleaner{Enabled: true, Frequency: time.Minute, IncludeArchive: true},
wantErr: false,
},
{
name: "Invalid Frequency",
cleaner: &Cleaner{Enabled: true, Frequency: 0, IncludeArchive: true},
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cleaner.Validate()
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
Loading