diff --git a/.changeset/wild-wombats-ring.md b/.changeset/wild-wombats-ring.md new file mode 100644 index 00000000000..81bd77dbdfe --- /dev/null +++ b/.changeset/wild-wombats-ring.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added Mix and Max pipeline tasks. diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index d4e358393d4..8394848fe34 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -329,9 +329,11 @@ const ( TaskTypeLessThan TaskType = "lessthan" TaskTypeLookup TaskType = "lookup" TaskTypeLowercase TaskType = "lowercase" + TaskTypeMax TaskType = "max" TaskTypeMean TaskType = "mean" TaskTypeMedian TaskType = "median" TaskTypeMerge TaskType = "merge" + TaskTypeMin TaskType = "min" TaskTypeMode TaskType = "mode" TaskTypeMultiply TaskType = "multiply" TaskTypeSum TaskType = "sum" @@ -373,10 +375,14 @@ func UnmarshalTaskFromMap(taskType TaskType, taskMap any, ID int, dotID string) task = &HTTPTask{BaseTask: BaseTask{id: ID, dotID: dotID}} case TaskTypeBridge: task = &BridgeTask{BaseTask: BaseTask{id: ID, dotID: dotID}} + case TaskTypeMax: + task = &MaxTask{BaseTask: BaseTask{id: ID, dotID: dotID}} case TaskTypeMean: task = &MeanTask{BaseTask: BaseTask{id: ID, dotID: dotID}} case TaskTypeMedian: task = &MedianTask{BaseTask: BaseTask{id: ID, dotID: dotID}} + case TaskTypeMin: + task = &MinTask{BaseTask: BaseTask{id: ID, dotID: dotID}} case TaskTypeMode: task = &ModeTask{BaseTask: BaseTask{id: ID, dotID: dotID}} case TaskTypeSum: diff --git a/core/services/pipeline/common_test.go b/core/services/pipeline/common_test.go index 156fb75fe52..94b1d1a2f49 100644 --- a/core/services/pipeline/common_test.go +++ b/core/services/pipeline/common_test.go @@ -143,8 +143,10 @@ func TestUnmarshalTaskFromMap(t *testing.T) { }{ {pipeline.TaskTypeHTTP, &pipeline.HTTPTask{}}, {pipeline.TaskTypeBridge, &pipeline.BridgeTask{}}, + {pipeline.TaskTypeMax, &pipeline.MaxTask{}}, {pipeline.TaskTypeMean, &pipeline.MeanTask{}}, {pipeline.TaskTypeMedian, &pipeline.MedianTask{}}, + {pipeline.TaskTypeMin, &pipeline.MinTask{}}, {pipeline.TaskTypeMode, &pipeline.ModeTask{}}, {pipeline.TaskTypeSum, &pipeline.SumTask{}}, {pipeline.TaskTypeMultiply, &pipeline.MultiplyTask{}}, diff --git a/core/services/pipeline/task.max.go b/core/services/pipeline/task.max.go new file mode 100644 index 00000000000..5c15d82169b --- /dev/null +++ b/core/services/pipeline/task.max.go @@ -0,0 +1,79 @@ +package pipeline + +import ( + "context" + stderrors "errors" + + "github.com/pkg/errors" + "github.com/shopspring/decimal" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +// Return types: +// +// *decimal.Decimal +type MaxTask struct { + BaseTask `mapstructure:",squash"` + Values string `json:"values"` + AllowedFaults string `json:"allowedFaults"` + // Lax when disabled (default) will return an error if there are no input values or if the input includes nil values. + // Lax when enabled will return nil with no error if there are no valid input values. If the input includes nil values, they will be excluded from the calculation and do not count as a fault. + Lax string +} + +var _ Task = (*MaxTask)(nil) + +func (t *MaxTask) Type() TaskType { + return TaskTypeMax +} + +func (t *MaxTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) { + var ( + maybeAllowedFaults MaybeUint64Param + valuesAndErrs SliceParam + decimalValues DecimalSliceParam + allowedFaults int + lax BoolParam + ) + err := stderrors.Join( + errors.Wrap(ResolveParam(&maybeAllowedFaults, From(t.AllowedFaults)), "allowedFaults"), + errors.Wrap(ResolveParam(&valuesAndErrs, From(VarExpr(t.Values, vars), JSONWithVarExprs(t.Values, vars, true), Inputs(inputs))), "values"), + errors.Wrap(ResolveParam(&lax, From(NonemptyString(t.Lax), false)), "lax"), + ) + if err != nil { + return Result{Error: err}, runInfo + } + + // if lax is enabled, filter out nil values + // nil values are not included in the fault calculations + if lax { + valuesAndErrs, _ = valuesAndErrs.FilterNils() + } + + if allowed, isSet := maybeAllowedFaults.Uint64(); isSet { + allowedFaults = int(allowed) //nolint:gosec // G115: it will not exceed int64 + } else { + allowedFaults = max(len(valuesAndErrs)-1, 0) + } + + values, faults := valuesAndErrs.FilterErrors() + if faults > allowedFaults { + return Result{Error: errors.Wrapf(ErrTooManyErrors, "Number of faulty inputs %v to max task > number allowed faults %v", faults, allowedFaults)}, runInfo + } + if len(values) == 0 { + if lax { + return Result{}, runInfo // if lax is enabled, return nil result with no error + } + return Result{Error: errors.Wrap(ErrWrongInputCardinality, "no values to maxize")}, runInfo + } + + err = decimalValues.UnmarshalPipelineParam(values) + if err != nil { + return Result{Error: errors.Wrapf(ErrBadInput, "values: %v", err)}, runInfo + } + + maxVal := decimal.Max(decimalValues[0], decimalValues[1:]...) + + return Result{Value: maxVal}, runInfo +} diff --git a/core/services/pipeline/task.max_test.go b/core/services/pipeline/task.max_test.go new file mode 100644 index 00000000000..fd25eaab705 --- /dev/null +++ b/core/services/pipeline/task.max_test.go @@ -0,0 +1,296 @@ +package pipeline_test + +import ( + "strconv" + "testing" + + "github.com/pkg/errors" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" +) + +func TestMaxTask(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + inputs []pipeline.Result + allowedFaults string + lax bool + want pipeline.Result + }{ + { + "happy", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Value: mustDecimal(t, "2")}, {Value: mustDecimal(t, "3")}}, + "1", + false, + pipeline.Result{Value: mustDecimal(t, "3")}, + }, + { + "happy (one input)", + []pipeline.Result{{Value: mustDecimal(t, "1")}}, + "0", + false, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "happy (with zero)", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Value: mustDecimal(t, "0")}}, + "1", + false, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "happy (with negative)", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Value: mustDecimal(t, "-1")}}, + "1", + false, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "happy (with fractional)", + []pipeline.Result{{Value: mustDecimal(t, "0.2")}, {Value: mustDecimal(t, "0.1")}}, + "1", + false, + pipeline.Result{Value: mustDecimal(t, "0.2")}, + }, + { + "nil and non-nil inputs", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {}}, + "1", + false, + pipeline.Result{Error: pipeline.ErrBadInput}, + }, + { + "only nil inputs", + []pipeline.Result{{}}, + "0", + false, + pipeline.Result{Error: pipeline.ErrBadInput}, + }, + { + "zero inputs", + []pipeline.Result{}, + "0", + false, + pipeline.Result{Error: pipeline.ErrWrongInputCardinality}, + }, + { + "fewer errors than threshold", + []pipeline.Result{{Error: errors.New("")}, {Value: mustDecimal(t, "2")}, {Value: mustDecimal(t, "3")}, {Value: mustDecimal(t, "4")}}, + "2", + false, + pipeline.Result{Value: mustDecimal(t, "4")}, + }, + { + "exactly threshold of errors", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Value: mustDecimal(t, "3")}, {Value: mustDecimal(t, "4")}}, + "2", + false, + pipeline.Result{Value: mustDecimal(t, "4")}, + }, + { + "more errors than threshold", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Error: errors.New("")}, {Value: mustDecimal(t, "4")}}, + "2", + false, + pipeline.Result{Error: pipeline.ErrTooManyErrors}, + }, + { + "(unspecified AllowedFaults) fewer errors than threshold", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Value: mustDecimal(t, "3")}, {Value: mustDecimal(t, "4")}}, + "", + false, + pipeline.Result{Value: mustDecimal(t, "4")}, + }, + { + "(unspecified AllowedFaults) exactly threshold of errors", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Error: errors.New("")}, {Value: mustDecimal(t, "4")}}, + "", + false, + pipeline.Result{Value: mustDecimal(t, "4")}, + }, + { + "(unspecified AllowedFaults) more errors than threshold", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Error: errors.New("")}}, + "", + false, + pipeline.Result{Error: pipeline.ErrTooManyErrors}, + }, + { + "lax with nil and non-nil inputs", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Value: mustDecimal(t, "2")}, {}}, + "1", + true, + pipeline.Result{Value: mustDecimal(t, "2")}, + }, + { + "lax with more nils than allowed faults", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {}, {}, {}}, + "3", + true, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "lax with nils and errors", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Error: errors.New("1")}, {Error: errors.New("2")}, {}}, + "2", + true, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "lax with nils and more errors than allowed faults", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Error: errors.New("1")}, {Error: errors.New("2")}, {}}, + "1", + true, + pipeline.Result{Error: pipeline.ErrTooManyErrors}, + }, + { + "lax with numbers and errors and unset allowed faults", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Error: errors.New("1")}, {Error: errors.New("2")}, {}}, + "", + true, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "lax with only errors and unset allowed faults", + []pipeline.Result{{Error: errors.New("1")}, {Error: errors.New("2")}, {}, {}}, + "", + true, + pipeline.Result{Error: pipeline.ErrTooManyErrors}, + }, + { + "lax with only nils", + []pipeline.Result{{}, {}, {}, {}}, + "1", + true, + pipeline.Result{}, + }, + { + "lax with only nils and unset allowed faults", + []pipeline.Result{{}, {}, {}, {}}, + "", + true, + pipeline.Result{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Run("without vars", func(t *testing.T) { + task := pipeline.MaxTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + AllowedFaults: test.allowedFaults, + Lax: strconv.FormatBool(test.lax), + } + output, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), pipeline.NewVarsFrom(nil), test.inputs) + assert.False(t, runInfo.IsPending) + assert.False(t, runInfo.IsRetryable) + + switch { + case test.want.Error != nil: + require.Equal(t, test.want.Error, errors.Cause(output.Error)) + require.Nil(t, output.Value) + case test.want.Value != nil: + require.Equal(t, test.want.Value.(*decimal.Decimal).String(), output.Value.(decimal.Decimal).String()) + require.NoError(t, output.Error) + default: + require.Nil(t, output.Value) + require.NoError(t, output.Error) + } + }) + + t.Run("with vars", func(t *testing.T) { + var inputs []any + for _, input := range test.inputs { + if input.Error != nil { + inputs = append(inputs, input.Error) + } else { + inputs = append(inputs, input.Value) + } + } + vars := pipeline.NewVarsFrom(map[string]any{ + "foo": map[string]any{"bar": inputs}, + }) + task := pipeline.MaxTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + Values: "$(foo.bar)", + AllowedFaults: test.allowedFaults, + Lax: strconv.FormatBool(test.lax), + } + output, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil) + assert.False(t, runInfo.IsPending) + assert.False(t, runInfo.IsRetryable) + + switch { + case test.want.Error != nil: + require.Equal(t, test.want.Error, errors.Cause(output.Error)) + require.Nil(t, output.Value) + case test.want.Value != nil: + require.Equal(t, test.want.Value.(*decimal.Decimal).String(), output.Value.(decimal.Decimal).String()) + require.NoError(t, output.Error) + default: + require.Nil(t, output.Value) + require.NoError(t, output.Error) + } + }) + + t.Run("with json vars", func(t *testing.T) { + var inputs []any + for _, input := range test.inputs { + if input.Error != nil { + inputs = append(inputs, input.Error) + } else { + inputs = append(inputs, input.Value) + } + } + var valuesParam string + var vars pipeline.Vars + switch len(inputs) { + case 0: + valuesParam = "[]" + vars = pipeline.NewVarsFrom(nil) + case 1: + valuesParam = "[ $(foo) ]" + vars = pipeline.NewVarsFrom(map[string]any{"foo": inputs[0]}) + case 2: + valuesParam = "[ $(foo), $(bar) ]" + vars = pipeline.NewVarsFrom(map[string]any{"foo": inputs[0], "bar": inputs[1]}) + case 3: + valuesParam = "[ $(foo), $(bar), $(chain) ]" + vars = pipeline.NewVarsFrom(map[string]any{"foo": inputs[0], "bar": inputs[1], "chain": inputs[2]}) + case 4: + valuesParam = "[ $(foo), $(bar), $(chain), $(link) ]" + vars = pipeline.NewVarsFrom(map[string]any{"foo": inputs[0], "bar": inputs[1], "chain": inputs[2], "link": inputs[3]}) + } + + task := pipeline.MaxTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + Values: valuesParam, + AllowedFaults: test.allowedFaults, + Lax: strconv.FormatBool(test.lax), + } + output, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil) + assert.False(t, runInfo.IsPending) + assert.False(t, runInfo.IsRetryable) + + switch { + case test.want.Error != nil: + require.Equal(t, test.want.Error, errors.Cause(output.Error)) + require.Nil(t, output.Value) + case test.want.Value != nil: + require.Equal(t, test.want.Value.(*decimal.Decimal).String(), output.Value.(decimal.Decimal).String()) + require.NoError(t, output.Error) + default: + require.Nil(t, output.Value) + require.NoError(t, output.Error) + } + }) + }) + } +} diff --git a/core/services/pipeline/task.min.go b/core/services/pipeline/task.min.go new file mode 100644 index 00000000000..4d1dfac1299 --- /dev/null +++ b/core/services/pipeline/task.min.go @@ -0,0 +1,79 @@ +package pipeline + +import ( + "context" + stderrors "errors" + + "github.com/pkg/errors" + "github.com/shopspring/decimal" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +// Return types: +// +// *decimal.Decimal +type MinTask struct { + BaseTask `mapstructure:",squash"` + Values string `json:"values"` + AllowedFaults string `json:"allowedFaults"` + // Lax when disabled (default) will return an error if there are no input values or if the input includes nil values. + // Lax when enabled will return nil with no error if there are no valid input values. If the input includes nil values, they will be excluded from the calculation and do not count as a fault. + Lax string +} + +var _ Task = (*MinTask)(nil) + +func (t *MinTask) Type() TaskType { + return TaskTypeMin +} + +func (t *MinTask) Run(_ context.Context, _ logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) { + var ( + maybeAllowedFaults MaybeUint64Param + valuesAndErrs SliceParam + decimalValues DecimalSliceParam + allowedFaults int + lax BoolParam + ) + err := stderrors.Join( + errors.Wrap(ResolveParam(&maybeAllowedFaults, From(t.AllowedFaults)), "allowedFaults"), + errors.Wrap(ResolveParam(&valuesAndErrs, From(VarExpr(t.Values, vars), JSONWithVarExprs(t.Values, vars, true), Inputs(inputs))), "values"), + errors.Wrap(ResolveParam(&lax, From(NonemptyString(t.Lax), false)), "lax"), + ) + if err != nil { + return Result{Error: err}, runInfo + } + + // if lax is enabled, filter out nil values + // nil values are not included in the fault calculations + if lax { + valuesAndErrs, _ = valuesAndErrs.FilterNils() + } + + if allowed, isSet := maybeAllowedFaults.Uint64(); isSet { + allowedFaults = int(allowed) //nolint:gosec // G115: it will not exceed int64 + } else { + allowedFaults = max(len(valuesAndErrs)-1, 0) + } + + values, faults := valuesAndErrs.FilterErrors() + if faults > allowedFaults { + return Result{Error: errors.Wrapf(ErrTooManyErrors, "Number of faulty inputs %v to min task > number allowed faults %v", faults, allowedFaults)}, runInfo + } + if len(values) == 0 { + if lax { + return Result{}, runInfo // if lax is enabled, return nil result with no error + } + return Result{Error: errors.Wrap(ErrWrongInputCardinality, "no values to minize")}, runInfo + } + + err = decimalValues.UnmarshalPipelineParam(values) + if err != nil { + return Result{Error: errors.Wrapf(ErrBadInput, "values: %v", err)}, runInfo + } + + minVal := decimal.Min(decimalValues[0], decimalValues[1:]...) + + return Result{Value: minVal}, runInfo +} diff --git a/core/services/pipeline/task.min_test.go b/core/services/pipeline/task.min_test.go new file mode 100644 index 00000000000..227e5bcd3a7 --- /dev/null +++ b/core/services/pipeline/task.min_test.go @@ -0,0 +1,296 @@ +package pipeline_test + +import ( + "strconv" + "testing" + + "github.com/pkg/errors" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" +) + +func TestMinTask(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + inputs []pipeline.Result + allowedFaults string + lax bool + want pipeline.Result + }{ + { + "happy", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Value: mustDecimal(t, "2")}, {Value: mustDecimal(t, "3")}}, + "1", + false, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "happy (one input)", + []pipeline.Result{{Value: mustDecimal(t, "1")}}, + "0", + false, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "happy (with zero)", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Value: mustDecimal(t, "0")}}, + "1", + false, + pipeline.Result{Value: mustDecimal(t, "0")}, + }, + { + "happy (with negative)", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Value: mustDecimal(t, "-1")}}, + "1", + false, + pipeline.Result{Value: mustDecimal(t, "-1")}, + }, + { + "happy (with fractional)", + []pipeline.Result{{Value: mustDecimal(t, "0.2")}, {Value: mustDecimal(t, "0.1")}}, + "1", + false, + pipeline.Result{Value: mustDecimal(t, "0.1")}, + }, + { + "nil and non-nil inputs", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {}}, + "1", + false, + pipeline.Result{Error: pipeline.ErrBadInput}, + }, + { + "only nil inputs", + []pipeline.Result{{}}, + "0", + false, + pipeline.Result{Error: pipeline.ErrBadInput}, + }, + { + "zero inputs", + []pipeline.Result{}, + "0", + false, + pipeline.Result{Error: pipeline.ErrWrongInputCardinality}, + }, + { + "fewer errors than threshold", + []pipeline.Result{{Error: errors.New("")}, {Value: mustDecimal(t, "2")}, {Value: mustDecimal(t, "3")}, {Value: mustDecimal(t, "4")}}, + "2", + false, + pipeline.Result{Value: mustDecimal(t, "2")}, + }, + { + "exactly threshold of errors", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Value: mustDecimal(t, "3")}, {Value: mustDecimal(t, "4")}}, + "2", + false, + pipeline.Result{Value: mustDecimal(t, "3")}, + }, + { + "more errors than threshold", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Error: errors.New("")}, {Value: mustDecimal(t, "4")}}, + "2", + false, + pipeline.Result{Error: pipeline.ErrTooManyErrors}, + }, + { + "(unspecified AllowedFaults) fewer errors than threshold", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Value: mustDecimal(t, "3")}, {Value: mustDecimal(t, "4")}}, + "", + false, + pipeline.Result{Value: mustDecimal(t, "3")}, + }, + { + "(unspecified AllowedFaults) exactly threshold of errors", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Error: errors.New("")}, {Value: mustDecimal(t, "4")}}, + "", + false, + pipeline.Result{Value: mustDecimal(t, "4")}, + }, + { + "(unspecified AllowedFaults) more errors than threshold", + []pipeline.Result{{Error: errors.New("")}, {Error: errors.New("")}, {Error: errors.New("")}}, + "", + false, + pipeline.Result{Error: pipeline.ErrTooManyErrors}, + }, + { + "lax with nil and non-nil inputs", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Value: mustDecimal(t, "2")}, {}}, + "1", + true, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "lax with more nils than allowed faults", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {}, {}, {}}, + "3", + true, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "lax with nils and errors", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Error: errors.New("1")}, {Error: errors.New("2")}, {}}, + "2", + true, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "lax with nils and more errors than allowed faults", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Error: errors.New("1")}, {Error: errors.New("2")}, {}}, + "1", + true, + pipeline.Result{Error: pipeline.ErrTooManyErrors}, + }, + { + "lax with numbers and errors and unset allowed faults", + []pipeline.Result{{Value: mustDecimal(t, "1")}, {Error: errors.New("1")}, {Error: errors.New("2")}, {}}, + "", + true, + pipeline.Result{Value: mustDecimal(t, "1")}, + }, + { + "lax with only errors and unset allowed faults", + []pipeline.Result{{Error: errors.New("1")}, {Error: errors.New("2")}, {}, {}}, + "", + true, + pipeline.Result{Error: pipeline.ErrTooManyErrors}, + }, + { + "lax with only nils", + []pipeline.Result{{}, {}, {}, {}}, + "1", + true, + pipeline.Result{}, + }, + { + "lax with only nils and unset allowed faults", + []pipeline.Result{{}, {}, {}, {}}, + "", + true, + pipeline.Result{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Run("without vars", func(t *testing.T) { + task := pipeline.MinTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + AllowedFaults: test.allowedFaults, + Lax: strconv.FormatBool(test.lax), + } + output, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), pipeline.NewVarsFrom(nil), test.inputs) + assert.False(t, runInfo.IsPending) + assert.False(t, runInfo.IsRetryable) + + switch { + case test.want.Error != nil: + require.Equal(t, test.want.Error, errors.Cause(output.Error)) + require.Nil(t, output.Value) + case test.want.Value != nil: + require.Equal(t, test.want.Value.(*decimal.Decimal).String(), output.Value.(decimal.Decimal).String()) + require.NoError(t, output.Error) + default: + require.Nil(t, output.Value) + require.NoError(t, output.Error) + } + }) + + t.Run("with vars", func(t *testing.T) { + var inputs []any + for _, input := range test.inputs { + if input.Error != nil { + inputs = append(inputs, input.Error) + } else { + inputs = append(inputs, input.Value) + } + } + vars := pipeline.NewVarsFrom(map[string]any{ + "foo": map[string]any{"bar": inputs}, + }) + task := pipeline.MinTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + Values: "$(foo.bar)", + AllowedFaults: test.allowedFaults, + Lax: strconv.FormatBool(test.lax), + } + output, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil) + assert.False(t, runInfo.IsPending) + assert.False(t, runInfo.IsRetryable) + + switch { + case test.want.Error != nil: + require.Equal(t, test.want.Error, errors.Cause(output.Error)) + require.Nil(t, output.Value) + case test.want.Value != nil: + require.Equal(t, test.want.Value.(*decimal.Decimal).String(), output.Value.(decimal.Decimal).String()) + require.NoError(t, output.Error) + default: + require.Nil(t, output.Value) + require.NoError(t, output.Error) + } + }) + + t.Run("with json vars", func(t *testing.T) { + var inputs []any + for _, input := range test.inputs { + if input.Error != nil { + inputs = append(inputs, input.Error) + } else { + inputs = append(inputs, input.Value) + } + } + var valuesParam string + var vars pipeline.Vars + switch len(inputs) { + case 0: + valuesParam = "[]" + vars = pipeline.NewVarsFrom(nil) + case 1: + valuesParam = "[ $(foo) ]" + vars = pipeline.NewVarsFrom(map[string]any{"foo": inputs[0]}) + case 2: + valuesParam = "[ $(foo), $(bar) ]" + vars = pipeline.NewVarsFrom(map[string]any{"foo": inputs[0], "bar": inputs[1]}) + case 3: + valuesParam = "[ $(foo), $(bar), $(chain) ]" + vars = pipeline.NewVarsFrom(map[string]any{"foo": inputs[0], "bar": inputs[1], "chain": inputs[2]}) + case 4: + valuesParam = "[ $(foo), $(bar), $(chain), $(link) ]" + vars = pipeline.NewVarsFrom(map[string]any{"foo": inputs[0], "bar": inputs[1], "chain": inputs[2], "link": inputs[3]}) + } + + task := pipeline.MinTask{ + BaseTask: pipeline.NewBaseTask(0, "task", nil, nil, 0), + Values: valuesParam, + AllowedFaults: test.allowedFaults, + Lax: strconv.FormatBool(test.lax), + } + output, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), vars, nil) + assert.False(t, runInfo.IsPending) + assert.False(t, runInfo.IsRetryable) + + switch { + case test.want.Error != nil: + require.Equal(t, test.want.Error, errors.Cause(output.Error)) + require.Nil(t, output.Value) + case test.want.Value != nil: + require.Equal(t, test.want.Value.(*decimal.Decimal).String(), output.Value.(decimal.Decimal).String()) + require.NoError(t, output.Error) + default: + require.Nil(t, output.Value) + require.NoError(t, output.Error) + } + }) + }) + } +}