Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Remove the need for external tools for managing ElasticSearch #6291

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -77,6 +78,7 @@ func newTestServer(t *testing.T) *testServer {
assert.NoError(t, server.Serve(lis))
exited.Done()
}()
time.Sleep(time.Second)
t.Cleanup(func() {
server.Stop()
exited.Wait() // don't allow test to finish before server exits
Expand Down
Binary file added cmd/es-index-cleaner/es-index-cleaner
Binary file not shown.
8 changes: 4 additions & 4 deletions cmd/jaeger/config-elasticsearch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ extensions:
index_prefix: "jaeger-main"
spans:
date_layout: "2006-01-02"
rollover_frequency: "day"
rollover_frequency: "1day"
Copy link
Member

@yurishkuro yurishkuro Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you create another PR (to merge before this one) that changes the RolloverFrequency field type to time.Duration ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay!

shards: 5
replicas: 1
services:
date_layout: "2006-01-02"
rollover_frequency: "day"
rollover_frequency: "1day"
shards: 5
replicas: 1
dependencies:
date_layout: "2006-01-02"
rollover_frequency: "day"
rollover_frequency: "1day"
shards: 5
replicas: 1
sampling:
date_layout: "2006-01-02"
rollover_frequency: "day"
rollover_frequency: "1day"
shards: 5
replicas: 1
another_storage:
Expand Down
3 changes: 3 additions & 0 deletions cmd/jaeger/internal/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/otelcol"

"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage/es"
)

//go:embed all-in-one.yaml
Expand Down Expand Up @@ -59,6 +60,8 @@ func Command() *cobra.Command {
return checkConfigAndRun(cmd, args, yamlAllInOne.ReadFile, otelRunE)
}

cmd.AddCommand(es.NewEsMappingsCommand())

cmd.Short = description
cmd.Long = description

Expand Down
18 changes: 18 additions & 0 deletions cmd/jaeger/internal/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ func TestCommand(t *testing.T) {

cmd.ParseFlags([]string{"--config", "bad-file-name"})
err := cmd.Execute()
require.Error(t, err)
require.ErrorContains(t, err, "bad-file-name")

t.Run("Should have es-mappings command", func(t *testing.T) {
foundCmd, _, _ := cmd.Find([]string{"es-mappings"})
assert.NotNil(t, foundCmd, "es-mappings command should be present")
assert.Equal(t, "es-mappings", foundCmd.Use, "Found command should match 'es-mappings'")
})
}

func TestCheckConfigAndRun_DefaultConfig(t *testing.T) {
Expand All @@ -50,4 +57,15 @@ func TestCheckConfigAndRun_DefaultConfig(t *testing.T) {
}
err = checkConfigAndRun(cmd, nil, getCfgErr, runE)
require.ErrorIs(t, err, errGetCfg)

t.Run("Should load custom config if provided", func(t *testing.T) {
cmd.Flags().Set("config", "custom-config.yaml")
getCfg := func(name string) ([]byte, error) {
assert.Equal(t, "custom-config.yaml", name)
return []byte("custom-config"), nil
}

err := checkConfigAndRun(cmd, nil, getCfg, runE)
require.NoError(t, err)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ func TestXYZsearch(t *testing.T) {
Elasticsearch: &esCfg.Configuration{
Servers: []string{server.URL},
LogLevel: "error",
Rollover: esCfg.Rollover{
Enabled: true,
},
},
})
})
Expand All @@ -232,6 +235,9 @@ func TestXYZsearch(t *testing.T) {
Opensearch: &esCfg.Configuration{
Servers: []string{server.URL},
LogLevel: "error",
Rollover: esCfg.Rollover{
Enabled: true,
},
},
})
})
Expand Down
23 changes: 23 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ const (
IndexPrefixSeparator = "-"
)

// IndexCleaner struct for configuring index cleaning
type IndexCleaner struct {
// Enabled specifies whether the index cleaner functionality is enabled.
// when set to true, the index cleaner will periodically run based on the specified frequency.
Enabled bool `mapstructure:"enabled"`
// Frequency defines the interval at which the index cleaner runs.
Frequency time.Duration `mapstructure:"frequency" validate:"gt=0"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment describing the arg

}

// Rollover struct for configuring roll over
type Rollover struct {
// Enabled specifies whether the rollover functionality is enabled.
Enabled bool `mapstructure:"enabled"`
// Frequency specifies the interval at which the rollover process runs.
Frequency time.Duration `mapstructure:"frequency" validate:"gt=0"`
}

// IndexOptions describes the index format and rollover frequency
type IndexOptions struct {
// Priority contains the priority of index template (ESv8 only).
Expand Down Expand Up @@ -136,6 +153,12 @@ type Configuration struct {
Tags TagsAsFields `mapstructure:"tags_as_fields"`
// Enabled, if set to true, enables the namespace for storage pointed to by this configuration.
Enabled bool `mapstructure:"-"`

// ---- ES-specific config ----
// IndexCleaner holds the configuration for the Elasticsearch index Cleaner.
IndexCleaner IndexCleaner `mapstructure:"index_cleaner"`
// Rollover holds the configuration for the Elasticsearh roll over.
Rollover Rollover `mapstructure:"rollover"`
}

// TagsAsFields holds configuration for tag schema.
Expand Down
3 changes: 3 additions & 0 deletions pkg/es/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,9 @@ func TestValidate(t *testing.T) {
name: "All valid input are set",
config: &Configuration{
Servers: []string{"localhost:8000/dummyserver"},
Rollover: Rollover{
Enabled: true,
},
},
expectedError: false,
},
Expand Down
36 changes: 36 additions & 0 deletions plugin/storage/es/es_mapping_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package es

import (
"fmt"
"log"
"os"
"os/exec"

"github.com/spf13/cobra"
)

// NewEsMappingsCommand defines the new CLI command to run esmapping-generator
func NewEsMappingsCommand() *cobra.Command {
return &cobra.Command{
Use: "es-mappings",
Short: "Print Elasticsearch index mappings to stdout",
Run: func(_ *cobra.Command, _ []string) {
if _, err := os.Stat("./cmd/esmapping-generator"); os.IsNotExist(err) {
log.Fatalf("esmapping-generator not found: %v", err)
}

Check warning on line 23 in plugin/storage/es/es_mapping_generator.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/es_mapping_generator.go#L16-L23

Added lines #L16 - L23 were not covered by tests

// Run the esmapping-generator command
execCmd := exec.Command("go", "run", "./cmd/esmapping-generator")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you imagine this working in production if someone runs Jaeger as a container image? Where would they get go toolchain and Jaeger source code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you guide me here little

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you guide me here little

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first, I suggest you focus on one tool at a time, start with the cleaner only. The logic of the cleaner is already implemented in Go, we can call it directly as a library from the ES implementation. You can reuse all the sub-commands defined in cmd/es-cleaner, but you may have to move them out of main() into a reusable package (that would be a standalone refactoring and should be its own PR).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thing - I think the easiest too to start with is es-mapping, because it's purely an on-demand command run with no need to set up periodic jobs.

I was wrong in the comment above - I don't think sub-commands will be needed once the cleaner/rollover are integrated into the main binary, because the only reason those sub-commands exist today is to allow the script to be run as externally managed cron job, so it needs to do different things. But if it's run as internal periodic job, it's one combined functionality.

output, err := execCmd.CombinedOutput()
if err != nil {
log.Printf("Error executing esmapping-generator: %v\nOutput: %s", err, output)
return
}
fmt.Println("esmapping-generator executed successfully")
fmt.Printf("%s\n", output)

Check warning on line 33 in plugin/storage/es/es_mapping_generator.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/es_mapping_generator.go#L26-L33

Added lines #L26 - L33 were not covered by tests
},
}
}
10 changes: 10 additions & 0 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,13 @@ func TestESStorageFactoryWithConfig(t *testing.T) {
w.Write(mockEsServerResponse)
}))
defer server.Close()

cfg := escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "error",
Rollover: escfg.Rollover{
Enabled: true,
},
}
factory, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
Expand All @@ -286,6 +290,9 @@ func TestConfigurationValidation(t *testing.T) {
name: "valid configuration",
cfg: escfg.Configuration{
Servers: []string{"http://localhost:9200"},
Rollover: escfg.Rollover{
Enabled: true,
},
},
wantErr: false,
},
Expand Down Expand Up @@ -315,6 +322,9 @@ func TestESStorageFactoryWithConfigError(t *testing.T) {
cfg := escfg.Configuration{
Servers: []string{"http://127.0.0.1:65535"},
LogLevel: "error",
Rollover: escfg.Rollover{
Enabled: true,
},
}
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.ErrorContains(t, err, "failed to create primary Elasticsearch client")
Expand Down
55 changes: 55 additions & 0 deletions plugin/storage/es/index_cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package es

import (
"errors"
"fmt"
"log"
"os/exec"
"time"

"github.com/jaegertracing/jaeger/pkg/es/config"
)

type CleanerConfigWrapper struct {
*config.Configuration
}

func (cfg *CleanerConfigWrapper) RunCleaner() {
if !cfg.IndexCleaner.Enabled {
log.Println("Cleaner is disabled, skipping...")
return
}

Check warning on line 24 in plugin/storage/es/index_cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/index_cleaner.go#L20-L24

Added lines #L20 - L24 were not covered by tests

ticker := time.NewTicker(cfg.IndexCleaner.Frequency)
defer ticker.Stop()

for range ticker.C {
log.Println("Running index cleaner...")

// Call for existing cmd/es-index-cleaner logic here
err := cfg.runESIndexCleaner()
if err != nil {
log.Printf("Error running es-index-cleaner: %v", err)
}

Check warning on line 36 in plugin/storage/es/index_cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/index_cleaner.go#L26-L36

Added lines #L26 - L36 were not covered by tests
}
}

// runESIndexCleaner runs the cmd/es-index-cleaner command.
func (cfg *CleanerConfigWrapper) runESIndexCleaner() error {
if len(cfg.Servers) == 0 {
return errors.New("elasticsearch server URL is missing in configuration")
}

Check warning on line 44 in plugin/storage/es/index_cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/index_cleaner.go#L41-L44

Added lines #L41 - L44 were not covered by tests

cmd := exec.Command("./cmd/es-index-cleaner")

err := cmd.Run()
if err != nil {
return fmt.Errorf("failed to run es-index-cleaner: %w", err)
}

Check warning on line 51 in plugin/storage/es/index_cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/index_cleaner.go#L46-L51

Added lines #L46 - L51 were not covered by tests

log.Println("Index cleaner executed successfully.")
return nil

Check warning on line 54 in plugin/storage/es/index_cleaner.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/index_cleaner.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}
40 changes: 40 additions & 0 deletions plugin/storage/es/index_rollover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package es

import (
"log"
"os/exec"
"time"

"github.com/jaegertracing/jaeger/pkg/es/config"
)

type RolloverConfigWrapper struct {
*config.Configuration
}

// RunRollover schedules and runs the rollover process based on the configured frequency.
func (cfg *RolloverConfigWrapper) RunRollover() {
if !cfg.Rollover.Enabled {
log.Println("Rollover is disabled, skipping...")
return
}

Check warning on line 23 in plugin/storage/es/index_rollover.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/index_rollover.go#L19-L23

Added lines #L19 - L23 were not covered by tests

ticker := time.NewTicker(cfg.Rollover.Frequency)
defer ticker.Stop()

for range ticker.C {
log.Println("Running index rollover...")

cmd := exec.Command("./cmd/es-rollover")
err := cmd.Run()

if err != nil {
log.Printf("Error running es-rollover: %v", err)
} else {
log.Println("Index rollover executed successfully.")
}

Check warning on line 38 in plugin/storage/es/index_rollover.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/index_rollover.go#L25-L38

Added lines #L25 - L38 were not covered by tests
}
}
Loading