Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wild-wombats-ring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Mix and Max pipeline tasks.
6 changes: 6 additions & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,11 @@ const (
TaskTypeLessThan TaskType = "lessthan"
TaskTypeLookup TaskType = "lookup"
TaskTypeLowercase TaskType = "lowercase"
TaskTypeMax TaskType = "max"
TaskTypeMean TaskType = "mean"
TaskTypeMedian TaskType = "median"
TaskTypeMerge TaskType = "merge"
TaskTypeMin TaskType = "min"
TaskTypeMode TaskType = "mode"
TaskTypeMultiply TaskType = "multiply"
TaskTypeSum TaskType = "sum"
Expand Down Expand Up @@ -373,10 +375,14 @@ func UnmarshalTaskFromMap(taskType TaskType, taskMap any, ID int, dotID string)
task = &HTTPTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeBridge:
task = &BridgeTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeMax:
task = &MaxTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeMean:
task = &MeanTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeMedian:
task = &MedianTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeMin:
task = &MinTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeMode:
task = &ModeTask{BaseTask: BaseTask{id: ID, dotID: dotID}}
case TaskTypeSum:
Expand Down
2 changes: 2 additions & 0 deletions core/services/pipeline/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ func TestUnmarshalTaskFromMap(t *testing.T) {
}{
{pipeline.TaskTypeHTTP, &pipeline.HTTPTask{}},
{pipeline.TaskTypeBridge, &pipeline.BridgeTask{}},
{pipeline.TaskTypeMax, &pipeline.MaxTask{}},
{pipeline.TaskTypeMean, &pipeline.MeanTask{}},
{pipeline.TaskTypeMedian, &pipeline.MedianTask{}},
{pipeline.TaskTypeMin, &pipeline.MinTask{}},
{pipeline.TaskTypeMode, &pipeline.ModeTask{}},
{pipeline.TaskTypeSum, &pipeline.SumTask{}},
{pipeline.TaskTypeMultiply, &pipeline.MultiplyTask{}},
Expand Down
79 changes: 79 additions & 0 deletions core/services/pipeline/task.max.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package pipeline

import (
"context"
stderrors "errors"

"github.com/pkg/errors"
"github.com/shopspring/decimal"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

// Return types:
//
// *decimal.Decimal
type MaxTask struct {
BaseTask `mapstructure:",squash"`
Values string `json:"values"`
AllowedFaults string `json:"allowedFaults"`
// Lax when disabled (default) will return an error if there are no input values or if the input includes nil values.
// Lax when enabled will return nil with no error if there are no valid input values. If the input includes nil values, they will be excluded from the calculation and do not count as a fault.
Lax string
}

var _ Task = (*MaxTask)(nil)

func (t *MaxTask) Type() TaskType {
return TaskTypeMax
}

func (t *MaxTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) {
var (
maybeAllowedFaults MaybeUint64Param
valuesAndErrs SliceParam
decimalValues DecimalSliceParam
allowedFaults int
lax BoolParam
)
err := stderrors.Join(
errors.Wrap(ResolveParam(&maybeAllowedFaults, From(t.AllowedFaults)), "allowedFaults"),
errors.Wrap(ResolveParam(&valuesAndErrs, From(VarExpr(t.Values, vars), JSONWithVarExprs(t.Values, vars, true), Inputs(inputs))), "values"),
errors.Wrap(ResolveParam(&lax, From(NonemptyString(t.Lax), false)), "lax"),
)
if err != nil {
return Result{Error: err}, runInfo
}

// if lax is enabled, filter out nil values
// nil values are not included in the fault calculations
if lax {
valuesAndErrs, _ = valuesAndErrs.FilterNils()
}

if allowed, isSet := maybeAllowedFaults.Uint64(); isSet {
allowedFaults = int(allowed) //nolint:gosec // G115: it will not exceed int64
} else {
allowedFaults = max(len(valuesAndErrs)-1, 0)
}

values, faults := valuesAndErrs.FilterErrors()
if faults > allowedFaults {
return Result{Error: errors.Wrapf(ErrTooManyErrors, "Number of faulty inputs %v to max task > number allowed faults %v", faults, allowedFaults)}, runInfo
}
if len(values) == 0 {
if lax {
return Result{}, runInfo // if lax is enabled, return nil result with no error
}
return Result{Error: errors.Wrap(ErrWrongInputCardinality, "no values to maxize")}, runInfo
}

err = decimalValues.UnmarshalPipelineParam(values)
if err != nil {
return Result{Error: errors.Wrapf(ErrBadInput, "values: %v", err)}, runInfo
}

maxVal := decimal.Max(decimalValues[0], decimalValues[1:]...)

return Result{Value: maxVal}, runInfo
}
Loading
Loading