Skip to content

Commit

Permalink
Merge branch 'develop' into variable-update-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
SagarRajput-7 authored Dec 18, 2024
2 parents c6c2e1f + cd9f27a commit 1abf809
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 4 deletions.
7 changes: 3 additions & 4 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4075,10 +4075,9 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
zap.L().Warn("found no pipelines in the http request, this will delete all the pipelines")
}

for _, p := range postable {
if err := p.IsValid(); err != nil {
return nil, model.BadRequestStr(err.Error())
}
validationErr := aH.LogsParsingPipelineController.ValidatePipelines(ctx, postable)
if validationErr != nil {
return nil, validationErr
}

return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable)
Expand Down
39 changes: 39 additions & 0 deletions pkg/query-service/app/logparsingpipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,45 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
return ic.GetPipelinesByVersion(ctx, cfg.Version)
}

func (ic *LogParsingPipelineController) ValidatePipelines(
ctx context.Context,
postedPipelines []PostablePipeline,
) *model.ApiError {
for _, p := range postedPipelines {
if err := p.IsValid(); err != nil {
return model.BadRequestStr(err.Error())
}
}

// Also run a collector simulation to ensure config is fit
// for e2e use with a collector
pipelines := []Pipeline{}
for _, pp := range postedPipelines {
pipelines = append(pipelines, Pipeline{
Id: uuid.New().String(),
OrderId: pp.OrderId,
Enabled: pp.Enabled,
Name: pp.Name,
Alias: pp.Alias,
Description: &pp.Description,
Filter: pp.Filter,
Config: pp.Config,
})
}

sampleLogs := []model.SignozLog{{Body: ""}}
_, _, simulationErr := SimulatePipelinesProcessing(
ctx, pipelines, sampleLogs,
)
if simulationErr != nil {
return model.BadRequest(fmt.Errorf(
"invalid pipelines config: %w", simulationErr.ToError(),
))
}

return nil
}

// Returns effective list of pipelines including user created
// pipelines and pipelines for installed integrations
func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
Expand Down
21 changes: 21 additions & 0 deletions pkg/query-service/tests/integration/logparsingpipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,27 @@ func TestLogPipelinesValidation(t *testing.T) {
},
},
ExpectedResponseStatusCode: 400,
}, {
Name: "Invalid from field path",
Pipeline: logparsingpipeline.PostablePipeline{
OrderId: 1,
Name: "pipeline 1",
Alias: "pipeline1",
Enabled: true,
Filter: validPipelineFilterSet,
Config: []logparsingpipeline.PipelineOperator{
{
OrderId: 1,
ID: "move",
Type: "move",
From: `attributes.temp_parsed_body."@l"`,
To: "attributes.test",
Enabled: true,
Name: "test move",
},
},
},
ExpectedResponseStatusCode: 400,
},
}

Expand Down

0 comments on commit 1abf809

Please sign in to comment.