Skip to content

Commit

Permalink
Add es-index-cleaner golang implementation (#3192)
Browse files Browse the repository at this point in the history
* Add golang implementation for es-index-cleaner

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* review comments

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>
  • Loading branch information
pavolloffay authored Aug 12, 2021
1 parent d316c68 commit 94bc987
Show file tree
Hide file tree
Showing 7 changed files with 1,027 additions and 0 deletions.
69 changes: 69 additions & 0 deletions cmd/es-index-cleaner/app/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"flag"

"github.com/spf13/viper"
)

const (
indexPrefix = "index-prefix"
archive = "archive"
rollover = "rollover"
timeout = "timeout"
indexDateSeparator = "index-date-separator"
username = "es.username"
password = "es.password"
)

// Config holds configuration for index cleaner binary.
type Config struct {
IndexPrefix string
Archive bool
Rollover bool
MasterNodeTimeoutSeconds int
IndexDateSeparator string
Username string
Password string
TLSEnabled bool
}

// AddFlags adds flags for TLS to the FlagSet.
func (c *Config) AddFlags(flags *flag.FlagSet) {
flags.String(indexPrefix, "", "Index prefix")
flags.Bool(archive, false, "Whether to remove archive indices")
flags.Bool(rollover, false, "Whether to remove indices created by rollover")
flags.Int(timeout, 120, "Number of seconds to wait for master node response")
flags.String(indexDateSeparator, "-", "Index date separator")
flags.String(username, "", "The username required by storage")
flags.String(password, "", "The password required by storage")
}

// InitFromViper initializes config from viper.Viper.
func (c *Config) InitFromViper(v *viper.Viper) {
c.IndexPrefix = v.GetString(indexPrefix)
if c.IndexPrefix != "" {
c.IndexPrefix += "-"
}

c.Archive = v.GetBool(archive)
c.Rollover = v.GetBool(rollover)
c.MasterNodeTimeoutSeconds = v.GetInt(timeout)
c.IndexDateSeparator = v.GetString(indexDateSeparator)
c.Username = v.GetString(username)
c.Password = v.GetString(password)
}
55 changes: 55 additions & 0 deletions cmd/es-index-cleaner/app/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"flag"
"testing"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBindFlags(t *testing.T) {
v := viper.New()
c := &Config{}
command := cobra.Command{}
flags := &flag.FlagSet{}
c.AddFlags(flags)
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

err := command.ParseFlags([]string{
"--index-prefix=tenant1",
"--rollover=true",
"--archive=true",
"--timeout=150",
"--index-date-separator=@",
"--es.username=admin",
"--es.password=admin",
})
require.NoError(t, err)

c.InitFromViper(v)
assert.Equal(t, "tenant1-", c.IndexPrefix)
assert.Equal(t, true, c.Rollover)
assert.Equal(t, true, c.Archive)
assert.Equal(t, 150, c.MasterNodeTimeoutSeconds)
assert.Equal(t, "@", c.IndexDateSeparator)
assert.Equal(t, "admin", c.Username)
assert.Equal(t, "admin", c.Password)
}
144 changes: 144 additions & 0 deletions cmd/es-index-cleaner/app/index_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"
)

// Index represents ES index.
type Index struct {
// Index name.
Index string
// Index creation time.
CreationTime time.Time
// Aliases
Aliases map[string]bool
}

// IndicesClient is a client used to manipulate indices.
type IndicesClient struct {
// Http client.
Client *http.Client
// ES server endpoint.
Endpoint string
// ES master_timeout parameter.
MasterTimeoutSeconds int
BasicAuth string
}

// GetJaegerIndices queries all Jaeger indices including the archive and rollover.
// Jaeger daily indices are:
// jaeger-span-2019-01-01, jaeger-service-2019-01-01, jaeger-dependencies-2019-01-01
// jaeger-span-archive
// Rollover indices:
// aliases: jaeger-span-read, jaeger-span-write, jaeger-service-read, jaeger-service-write
// indices: jaeger-span-000001, jaeger-service-000001 etc.
// aliases: jaeger-span-archive-read, jaeger-span-archive-write
// indices: jaeger-span-archive-000001
func (i *IndicesClient) GetJaegerIndices(prefix string) ([]Index, error) {
prefix += "jaeger-*"
r, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s?flat_settings=true&filter_path=*.aliases,*.settings", i.Endpoint, prefix), nil)
if err != nil {
return nil, err
}
i.setAuthorization(r)
res, err := i.Client.Do(r)
if err != nil {
return nil, fmt.Errorf("failed to query indices: %w", err)
}

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to query indices: %w", handleFailedRequest(res))
}

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("failed to query indices and read response body: %w", err)
}

type indexInfo struct {
Aliases map[string]interface{} `json:"aliases"`
Settings map[string]string `json:"settings"`
}
var indicesInfo map[string]indexInfo
if err = json.Unmarshal(body, &indicesInfo); err != nil {
return nil, fmt.Errorf("failed to query indices and unmarshall response body: %q: %w", body, err)
}

var indices []Index
for k, v := range indicesInfo {
aliases := map[string]bool{}
for alias := range v.Aliases {
aliases[alias] = true
}
// ignoring error, ES should return valid date
creationDate, _ := strconv.ParseInt(v.Settings["index.creation_date"], 10, 64)

indices = append(indices, Index{
Index: k,
CreationTime: time.Unix(0, int64(time.Millisecond)*creationDate),
Aliases: aliases,
})
}
return indices, nil
}

// DeleteIndices deletes specified set of indices.
func (i *IndicesClient) DeleteIndices(indices []Index) error {
concatIndices := ""
for _, i := range indices {
concatIndices += i.Index
concatIndices += ","
}

r, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%s?master_timeout=%ds", i.Endpoint, concatIndices, i.MasterTimeoutSeconds), nil)
if err != nil {
return err
}
i.setAuthorization(r)

res, err := i.Client.Do(r)
if err != nil {
return fmt.Errorf("failed to delete indices: %w", err)
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to delete indices: %s, %w", concatIndices, handleFailedRequest(res))
}
return nil
}

func handleFailedRequest(res *http.Response) error {
var body string
if res.Body != nil {
bodyBytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("request failed and failed to read response body, status code: %d, %w", res.StatusCode, err)
}
body = string(bodyBytes)
}
return fmt.Errorf("request failed, status code: %d, body: %s", res.StatusCode, body)
}

func (i *IndicesClient) setAuthorization(r *http.Request) {
if i.BasicAuth != "" {
r.Header.Add("Authorization", fmt.Sprintf("Basic %s", i.BasicAuth))
}
}
Loading

0 comments on commit 94bc987

Please sign in to comment.