Skip to content

Commit

Permalink
Add missing context to api ingest pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
mrodm committed Oct 1, 2024
1 parent 1b74d57 commit 6943fb8
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
2 changes: 1 addition & 1 deletion internal/benchrunner/runners/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *runner) SetUp(ctx context.Context) error {
return errors.New("data stream root not found")
}

r.entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(r.options.API, dataStreamPath)
r.entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(ctx, r.options.API, dataStreamPath)
if err != nil {
return fmt.Errorf("installing ingest pipelines failed: %w", err)
}
Expand Down
30 changes: 17 additions & 13 deletions internal/elasticsearch/ingest/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ingest

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -45,7 +46,7 @@ type RerouteProcessor struct {
Namespace []string `yaml:"namespace"`
}

func InstallDataStreamPipelines(api *elasticsearch.API, dataStreamPath string) (string, []Pipeline, error) {
func InstallDataStreamPipelines(ctx context.Context, api *elasticsearch.API, dataStreamPath string) (string, []Pipeline, error) {
dataStreamManifest, err := packages.ReadDataStreamManifest(filepath.Join(dataStreamPath, packages.DataStreamManifestFile))
if err != nil {
return "", nil, fmt.Errorf("reading data stream manifest failed: %w", err)
Expand All @@ -59,7 +60,7 @@ func InstallDataStreamPipelines(api *elasticsearch.API, dataStreamPath string) (
return "", nil, fmt.Errorf("loading ingest pipeline files failed: %w", err)
}

err = installPipelinesInElasticsearch(api, pipelines)
err = installPipelinesInElasticsearch(ctx, api, pipelines)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -232,9 +233,9 @@ func convertValue(value interface{}, label string) ([]string, error) {
}
}

func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []Pipeline) error {
func installPipelinesInElasticsearch(ctx context.Context, api *elasticsearch.API, pipelines []Pipeline) error {
for _, p := range pipelines {
if err := installPipeline(api, p); err != nil {
if err := installPipeline(ctx, api, p); err != nil {
return err
}
}
Expand All @@ -251,20 +252,22 @@ func pipelineError(err error, pipeline Pipeline, format string, args ...interfac
return fmt.Errorf("%s: %w", errorStr, err)
}

func installPipeline(api *elasticsearch.API, pipeline Pipeline) error {
if err := putIngestPipeline(api, pipeline); err != nil {
func installPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error {
if err := putIngestPipeline(ctx, api, pipeline); err != nil {
return err
}
// Just to be sure the pipeline has been uploaded.
return getIngestPipeline(api, pipeline)
return getIngestPipeline(ctx, api, pipeline)
}

func putIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error {
func putIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error {
source, err := pipeline.MarshalJSON()
if err != nil {
return err
}
r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source))
r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source),
api.Ingest.PutPipeline.WithContext(ctx),
)
if err != nil {
return pipelineError(err, pipeline, "PutPipeline API call failed")
}
Expand All @@ -283,10 +286,11 @@ func putIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error {
return nil
}

func getIngestPipeline(api *elasticsearch.API, pipeline Pipeline) error {
r, err := api.Ingest.GetPipeline(func(request *elasticsearch.IngestGetPipelineRequest) {
request.PipelineID = pipeline.Name
})
func getIngestPipeline(ctx context.Context, api *elasticsearch.API, pipeline Pipeline) error {
r, err := api.Ingest.GetPipeline(
api.Ingest.GetPipeline.WithContext(ctx),
api.Ingest.GetPipeline.WithPipelineID(pipeline.Name),
)
if err != nil {
return pipelineError(err, pipeline, "GetPipeline API call failed")
}
Expand Down
4 changes: 1 addition & 3 deletions internal/testrunner/runners/pipeline/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *tester) run(ctx context.Context) ([]testrunner.TestResult, error) {

startTesting := time.Now()
var entryPipeline string
entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(r.esAPI, dataStreamPath)
entryPipeline, r.pipelines, err = ingest.InstallDataStreamPipelines(ctx, r.esAPI, dataStreamPath)
if err != nil {
return nil, fmt.Errorf("installing ingest pipelines failed: %w", err)
}
Expand Down Expand Up @@ -278,7 +278,6 @@ func (r *tester) checkElasticsearchLogs(ctx context.Context, startTesting time.T

return nil
})

if err != nil {
return nil, fmt.Errorf("error at parsing logs of elasticseach: %w", err)
}
Expand All @@ -297,7 +296,6 @@ func (r *tester) checkElasticsearchLogs(ctx context.Context, startTesting time.T
}

return []testrunner.TestResult{tr}, nil

}

func (r *tester) runTestCase(ctx context.Context, testCaseFile string, dsPath string, dsType string, pipeline string, validatorOptions []fields.ValidatorOption) ([]testrunner.TestResult, error) {
Expand Down

0 comments on commit 6943fb8

Please sign in to comment.