Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export K6_ELASTICSEARCH_INSECURE_SKIP_VERIFY=true
./k6 run ./examples/script.js -o output-elasticsearch
```

The metrics are stored in the index `k6-metrics` which will be automatically created by this extension. See the [mapping](pkg/esoutput/mapping.json) for details.
The metrics are stored in the index `k6-metrics` by default which will be automatically created by this extension. See the [mapping](pkg/esoutput/mapping.json) for details. The index name can be customized with the environment variable `K6_ELASTICSEARCH_INDEX_NAME`.

## Docker Compose

Expand Down Expand Up @@ -105,7 +105,7 @@ Clone the repo to get started and follow these steps:

5. Visit http://localhost:5601/ to view results in Kibana (default credentials are `elastic` / `changeme`).

- Create a [Data View](https://www.elastic.co/guide/en/kibana/current/data-views.html) for the index `k6-metrics`.
- Create a [Data View](https://www.elastic.co/guide/en/kibana/current/data-views.html) for the index `k6-metrics` or the index name in `K6_ELASTICSEARCH_INDEX_NAME` if it is set.
![Kibana Data View](./images/kibana-data-view.png)
- Go to [Discover](https://www.elastic.co/guide/en/kibana/current/discover.html) to start exploring the metrics.
![Kibana Discover](./images/kibana-discover.png)
Expand Down
12 changes: 12 additions & 0 deletions pkg/esoutput/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

const (
defaultFlushPeriod = time.Second
defaultIndexName = "k6-metrics"
)

type Config struct {
Expand All @@ -50,6 +51,7 @@ type Config struct {
ServiceAccountToken null.String `json:"serviceAccountToken" envconfig:"K6_ELASTICSEARCH_SERVICE_ACCOUNT_TOKEN"`

FlushPeriod types.NullDuration `json:"flushPeriod" envconfig:"K6_ELASTICSEARCH_FLUSH_PERIOD"`
IndexName null.String `json:"indexName" envconfig:"K6_ELASTICSEARCH_INDEX_NAME"`
}

func NewConfig() Config {
Expand All @@ -63,6 +65,7 @@ func NewConfig() Config {
Password: null.NewString("", false),
ServiceAccountToken: null.NewString("", false),
FlushPeriod: types.NullDurationFrom(defaultFlushPeriod),
IndexName: null.StringFrom(defaultIndexName),
}
}

Expand Down Expand Up @@ -101,6 +104,9 @@ func (base Config) Apply(applied Config) Config {
if applied.FlushPeriod.Valid {
base.FlushPeriod = applied.FlushPeriod
}
if applied.IndexName.Valid {
base.IndexName = applied.IndexName
}

return base
}
Expand Down Expand Up @@ -148,6 +154,9 @@ func ParseArg(arg string) (Config, error) {
return c, err
}
}
if v, ok := params["indexName"].(string); ok {
c.IndexName = null.StringFrom(v)
}

return c, nil
}
Expand Down Expand Up @@ -215,6 +224,9 @@ func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, a
if serviceAccountToken, defined := env["K6_ELASTICSEARCH_SERVICE_ACCOUNT_TOKEN"]; defined {
result.ServiceAccountToken = null.StringFrom(serviceAccountToken)
}
if indexName, defined := env["K6_ELASTICSEARCH_INDEX_NAME"]; defined {
result.IndexName = null.StringFrom(indexName)
}

if arg != "" {
argConf, err := ParseArg(arg)
Expand Down
16 changes: 8 additions & 8 deletions pkg/esoutput/esoutput.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const hasPrivilegesBody = `{
"index": [
{
"names": [
"k6-metrics"
"%s"
],
"privileges": [
"write", "create_index"
Expand Down Expand Up @@ -137,21 +137,20 @@ func New(params output.Params) (output.Output, error) {
// security is configured on this cluster. Therefore, we call the has privilege API that is guaranteed to work
//for every user.
if info.StatusCode == 403 {
priv, err := client.Security.HasPrivileges(strings.NewReader(hasPrivilegesBody))
priv, err := client.Security.HasPrivileges(strings.NewReader(fmt.Sprintf(hasPrivilegesBody, config.IndexName.String)))
if err != nil {
return nil, err
}
if priv.StatusCode != 200 {
return nil, fmt.Errorf("cannot connect to Elasticsearch (status code %d)", priv.StatusCode)
}

} else {
return nil, fmt.Errorf("cannot connect to Elasticsearch (status code %d)", info.StatusCode)
}
}

bulkIndexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "k6-metrics",
Index: config.IndexName.String,
Client: client,
OnError: func(ctx context.Context, err error) {
// this happens usually due to permission issues
Expand All @@ -175,17 +174,18 @@ func (*Output) Description() string {
}

func (o *Output) Start() error {
res, err := o.client.Indices.Create("k6-metrics", o.client.Indices.Create.WithBody(bytes.NewReader(mapping)))
indexName := o.config.IndexName.String
res, err := o.client.Indices.Create(indexName, o.client.Indices.Create.WithBody(bytes.NewReader(mapping)))
if err != nil {
return err
}
// 400 usually happens when the index already exists, which is ok for our purposes.
if res.StatusCode > 400 {
body, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("could not read response after failure to create index: %v", err)
return fmt.Errorf("could not read response after failure to create index %s: %v", indexName, err)
}
return fmt.Errorf("could not create index k6-metrics: %s", body)
return fmt.Errorf("could not create index %s: %s", indexName, body)
}
res.Body.Close()

Expand All @@ -194,7 +194,7 @@ func (o *Output) Start() error {
} else {
o.periodicFlusher = periodicFlusher
}
o.logger.Debug("Elasticsearch: starting writing to index k6-metrics")
o.logger.Debugf("Elasticsearch: starting writing to index %s", indexName)

return nil
}
Expand Down