diff --git a/cmd/anonymizer/app/query/query_test.go b/cmd/anonymizer/app/query/query_test.go index 86b7fd55ed8..ea6797e9e89 100644 --- a/cmd/anonymizer/app/query/query_test.go +++ b/cmd/anonymizer/app/query/query_test.go @@ -7,6 +7,7 @@ import ( "net" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -77,6 +78,7 @@ func newTestServer(t *testing.T) *testServer { assert.NoError(t, server.Serve(lis)) exited.Done() }() + time.Sleep(time.Second) t.Cleanup(func() { server.Stop() exited.Wait() // don't allow test to finish before server exits diff --git a/cmd/es-index-cleaner/es-index-cleaner b/cmd/es-index-cleaner/es-index-cleaner new file mode 100755 index 00000000000..3724b97993d Binary files /dev/null and b/cmd/es-index-cleaner/es-index-cleaner differ diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index 6fa4d247e20..2f1663a030c 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -36,22 +36,22 @@ extensions: index_prefix: "jaeger-main" spans: date_layout: "2006-01-02" - rollover_frequency: "day" + rollover_frequency: "1day" shards: 5 replicas: 1 services: date_layout: "2006-01-02" - rollover_frequency: "day" + rollover_frequency: "1day" shards: 5 replicas: 1 dependencies: date_layout: "2006-01-02" - rollover_frequency: "day" + rollover_frequency: "1day" shards: 5 replicas: 1 sampling: date_layout: "2006-01-02" - rollover_frequency: "day" + rollover_frequency: "1day" shards: 5 replicas: 1 another_storage: diff --git a/cmd/jaeger/internal/command.go b/cmd/jaeger/internal/command.go index 6ac217dadd6..1ecdc4d1137 100644 --- a/cmd/jaeger/internal/command.go +++ b/cmd/jaeger/internal/command.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/otelcol" "github.com/jaegertracing/jaeger/pkg/version" + "github.com/jaegertracing/jaeger/plugin/storage/es" ) //go:embed all-in-one.yaml @@ -59,6 +60,8 @@ func Command() *cobra.Command { return checkConfigAndRun(cmd, args, yamlAllInOne.ReadFile, otelRunE) } + cmd.AddCommand(es.NewEsMappingsCommand()) + cmd.Short = description cmd.Long = description diff --git a/cmd/jaeger/internal/command_test.go b/cmd/jaeger/internal/command_test.go index 396a4ad5a9a..ca42342ab86 100644 --- a/cmd/jaeger/internal/command_test.go +++ b/cmd/jaeger/internal/command_test.go @@ -23,7 +23,14 @@ func TestCommand(t *testing.T) { cmd.ParseFlags([]string{"--config", "bad-file-name"}) err := cmd.Execute() + require.Error(t, err) require.ErrorContains(t, err, "bad-file-name") + + t.Run("Should have es-mappings command", func(t *testing.T) { + foundCmd, _, _ := cmd.Find([]string{"es-mappings"}) + assert.NotNil(t, foundCmd, "es-mappings command should be present") + assert.Equal(t, "es-mappings", foundCmd.Use, "Found command should match 'es-mappings'") + }) } func TestCheckConfigAndRun_DefaultConfig(t *testing.T) { @@ -50,4 +57,15 @@ func TestCheckConfigAndRun_DefaultConfig(t *testing.T) { } err = checkConfigAndRun(cmd, nil, getCfgErr, runE) require.ErrorIs(t, err, errGetCfg) + + t.Run("Should load custom config if provided", func(t *testing.T) { + cmd.Flags().Set("config", "custom-config.yaml") + getCfg := func(name string) ([]byte, error) { + assert.Equal(t, "custom-config.yaml", name) + return []byte("custom-config"), nil + } + + err := checkConfigAndRun(cmd, nil, getCfg, runE) + require.NoError(t, err) + }) } diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go index 6213c912765..b535e9ed4fd 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go @@ -224,6 +224,9 @@ func TestXYZsearch(t *testing.T) { Elasticsearch: &esCfg.Configuration{ Servers: []string{server.URL}, LogLevel: "error", + Rollover: esCfg.Rollover{ + Enabled: true, + }, }, }) }) @@ -232,6 +235,9 @@ func TestXYZsearch(t *testing.T) { Opensearch: &esCfg.Configuration{ Servers: []string{server.URL}, LogLevel: "error", + Rollover: esCfg.Rollover{ + Enabled: true, + }, }, }) }) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index d0ae9c6ad3d..b1bd26dfa2a 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -37,6 +37,23 @@ const ( IndexPrefixSeparator = "-" ) +// IndexCleaner struct for configuring index cleaning +type IndexCleaner struct { + // Enabled specifies whether the index cleaner functionality is enabled. + // when set to true, the index cleaner will periodically run based on the specified frequency. + Enabled bool `mapstructure:"enabled"` + // Frequency defines the interval at which the index cleaner runs. + Frequency time.Duration `mapstructure:"frequency" validate:"gt=0"` +} + +// Rollover struct for configuring roll over +type Rollover struct { + // Enabled specifies whether the rollover functionality is enabled. + Enabled bool `mapstructure:"enabled"` + // Frequency specifies the interval at which the rollover process runs. + Frequency time.Duration `mapstructure:"frequency" validate:"gt=0"` +} + // IndexOptions describes the index format and rollover frequency type IndexOptions struct { // Priority contains the priority of index template (ESv8 only). @@ -136,6 +153,12 @@ type Configuration struct { Tags TagsAsFields `mapstructure:"tags_as_fields"` // Enabled, if set to true, enables the namespace for storage pointed to by this configuration. Enabled bool `mapstructure:"-"` + + // ---- ES-specific config ---- + // IndexCleaner holds the configuration for the Elasticsearch index Cleaner. + IndexCleaner IndexCleaner `mapstructure:"index_cleaner"` + // Rollover holds the configuration for the Elasticsearh roll over. + Rollover Rollover `mapstructure:"rollover"` } // TagsAsFields holds configuration for tag schema. diff --git a/pkg/es/config/config_test.go b/pkg/es/config/config_test.go index c295d37b70c..c6cca7c5699 100644 --- a/pkg/es/config/config_test.go +++ b/pkg/es/config/config_test.go @@ -733,6 +733,9 @@ func TestValidate(t *testing.T) { name: "All valid input are set", config: &Configuration{ Servers: []string{"localhost:8000/dummyserver"}, + Rollover: Rollover{ + Enabled: true, + }, }, expectedError: false, }, diff --git a/plugin/storage/es/es_mapping_generator.go b/plugin/storage/es/es_mapping_generator.go new file mode 100644 index 00000000000..050bc8f0085 --- /dev/null +++ b/plugin/storage/es/es_mapping_generator.go @@ -0,0 +1,36 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package es + +import ( + "fmt" + "log" + "os" + "os/exec" + + "github.com/spf13/cobra" +) + +// NewEsMappingsCommand defines the new CLI command to run esmapping-generator +func NewEsMappingsCommand() *cobra.Command { + return &cobra.Command{ + Use: "es-mappings", + Short: "Print Elasticsearch index mappings to stdout", + Run: func(_ *cobra.Command, _ []string) { + if _, err := os.Stat("./cmd/esmapping-generator"); os.IsNotExist(err) { + log.Fatalf("esmapping-generator not found: %v", err) + } + + // Run the esmapping-generator command + execCmd := exec.Command("go", "run", "./cmd/esmapping-generator") + output, err := execCmd.CombinedOutput() + if err != nil { + log.Printf("Error executing esmapping-generator: %v\nOutput: %s", err, output) + return + } + fmt.Println("esmapping-generator executed successfully") + fmt.Printf("%s\n", output) + }, + } +} diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 4da9a6c6f45..6ec1b4f5845 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -267,9 +267,13 @@ func TestESStorageFactoryWithConfig(t *testing.T) { w.Write(mockEsServerResponse) })) defer server.Close() + cfg := escfg.Configuration{ Servers: []string{server.URL}, LogLevel: "error", + Rollover: escfg.Rollover{ + Enabled: true, + }, } factory, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) @@ -286,6 +290,9 @@ func TestConfigurationValidation(t *testing.T) { name: "valid configuration", cfg: escfg.Configuration{ Servers: []string{"http://localhost:9200"}, + Rollover: escfg.Rollover{ + Enabled: true, + }, }, wantErr: false, }, @@ -315,6 +322,9 @@ func TestESStorageFactoryWithConfigError(t *testing.T) { cfg := escfg.Configuration{ Servers: []string{"http://127.0.0.1:65535"}, LogLevel: "error", + Rollover: escfg.Rollover{ + Enabled: true, + }, } _, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) require.ErrorContains(t, err, "failed to create primary Elasticsearch client") diff --git a/plugin/storage/es/index_cleaner.go b/plugin/storage/es/index_cleaner.go new file mode 100644 index 00000000000..cb07dbe22f8 --- /dev/null +++ b/plugin/storage/es/index_cleaner.go @@ -0,0 +1,55 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package es + +import ( + "errors" + "fmt" + "log" + "os/exec" + "time" + + "github.com/jaegertracing/jaeger/pkg/es/config" +) + +type CleanerConfigWrapper struct { + *config.Configuration +} + +func (cfg *CleanerConfigWrapper) RunCleaner() { + if !cfg.IndexCleaner.Enabled { + log.Println("Cleaner is disabled, skipping...") + return + } + + ticker := time.NewTicker(cfg.IndexCleaner.Frequency) + defer ticker.Stop() + + for range ticker.C { + log.Println("Running index cleaner...") + + // Call for existing cmd/es-index-cleaner logic here + err := cfg.runESIndexCleaner() + if err != nil { + log.Printf("Error running es-index-cleaner: %v", err) + } + } +} + +// runESIndexCleaner runs the cmd/es-index-cleaner command. +func (cfg *CleanerConfigWrapper) runESIndexCleaner() error { + if len(cfg.Servers) == 0 { + return errors.New("elasticsearch server URL is missing in configuration") + } + + cmd := exec.Command("./cmd/es-index-cleaner") + + err := cmd.Run() + if err != nil { + return fmt.Errorf("failed to run es-index-cleaner: %w", err) + } + + log.Println("Index cleaner executed successfully.") + return nil +} diff --git a/plugin/storage/es/index_rollover.go b/plugin/storage/es/index_rollover.go new file mode 100644 index 00000000000..313ef3ade10 --- /dev/null +++ b/plugin/storage/es/index_rollover.go @@ -0,0 +1,40 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package es + +import ( + "log" + "os/exec" + "time" + + "github.com/jaegertracing/jaeger/pkg/es/config" +) + +type RolloverConfigWrapper struct { + *config.Configuration +} + +// RunRollover schedules and runs the rollover process based on the configured frequency. +func (cfg *RolloverConfigWrapper) RunRollover() { + if !cfg.Rollover.Enabled { + log.Println("Rollover is disabled, skipping...") + return + } + + ticker := time.NewTicker(cfg.Rollover.Frequency) + defer ticker.Stop() + + for range ticker.C { + log.Println("Running index rollover...") + + cmd := exec.Command("./cmd/es-rollover") + err := cmd.Run() + + if err != nil { + log.Printf("Error running es-rollover: %v", err) + } else { + log.Println("Index rollover executed successfully.") + } + } +}