Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MERC-5943 Surface EA error fields and track bid/ask violations #13785

Merged
merged 4 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/serious-apples-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal Mercury v3: Include telemetry if bid/ask violation is detected
1 change: 1 addition & 0 deletions GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ codecgen: $(codecgen) ## Install codecgen
protoc: ## Install protoc
core/scripts/install-protoc.sh 25.1 /
go install google.golang.org/protobuf/cmd/protoc-gen-go@`go list -m -json google.golang.org/protobuf | jq -r .Version`
go install github.com/smartcontractkit/wsrpc/cmd/protoc-gen-go-wsrpc@`go list -m -json github.com/smartcontractkit/wsrpc | jq -r .Version`

.PHONY: telemetry-protobuf
telemetry-protobuf: $(telemetry-protobuf) ## Generate telemetry protocol buffers.
Expand Down
33 changes: 16 additions & 17 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,20 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/common"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/libocr/commontypes"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/services"
v1types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1"
v2types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2"
v3types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
"github.com/smartcontractkit/chainlink/v2/core/utils"

v1types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1"
v2types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2"
v3types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3"
)

type eaTelemetry struct {
Expand All @@ -41,15 +38,16 @@ type EnhancedTelemetryData struct {
}

type EnhancedTelemetryMercuryData struct {
V1Observation *v1types.Observation
V2Observation *v2types.Observation
V3Observation *v3types.Observation
TaskRunResults pipeline.TaskRunResults
RepTimestamp ocrtypes.ReportTimestamp
FeedVersion mercuryutils.FeedVersion
FetchMaxFinalizedTimestamp bool
IsLinkFeed bool
IsNativeFeed bool
V1Observation *v1types.Observation
V2Observation *v2types.Observation
V3Observation *v3types.Observation
TaskRunResults pipeline.TaskRunResults
RepTimestamp ocrtypes.ReportTimestamp
FeedVersion mercuryutils.FeedVersion
FetchMaxFinalizedTimestamp bool
IsLinkFeed bool
IsNativeFeed bool
DpInvariantViolationDetected bool
}

type EnhancedTelemetryService[T EnhancedTelemetryData | EnhancedTelemetryMercuryData] struct {
Expand Down Expand Up @@ -295,7 +293,7 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
var bt uint64
// v1+v2+v3 fields
bp := big.NewInt(0)
//v1+v3 fields
// v1+v3 fields
bid := big.NewInt(0)
ask := big.NewInt(0)
// v2+v3 fields
Expand Down Expand Up @@ -384,6 +382,7 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
DpBenchmarkPrice: benchmarkPrice,
DpBid: bidPrice,
DpAsk: askPrice,
DpInvariantViolationDetected: d.DpInvariantViolationDetected,
CurrentBlockNumber: bn,
CurrentBlockHash: bh,
CurrentBlockTimestamp: bt,
Expand Down
2 changes: 1 addition & 1 deletion core/services/pipeline/common_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func makeHTTPRequest(

if statusCode >= 400 {
maybeErr := bestEffortExtractError(responseBytes)
return nil, statusCode, respHeaders, 0, errors.Errorf("got error from %s: (status code %v) %s", url.String(), statusCode, maybeErr)
return responseBytes, statusCode, respHeaders, 0, errors.Errorf("got error from %s: (status code %v) %s", url.String(), statusCode, maybeErr)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helper is only used by the http and bridge tasks and in both cases it's fine to return the response on error.

}
return responseBytes, statusCode, respHeaders, elapsed, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eautils

import (
"encoding/json"
"fmt"
"net/http"
)

Expand Down Expand Up @@ -37,3 +38,28 @@ func BestEffortExtractEAStatus(responseBytes []byte) (code int, ok bool) {

return *status.StatusCode, true
}

type adapterErrorResponse struct {
Error *AdapterError `json:"error"`
}

type AdapterError struct {
Name string `json:"name"`
Message string `json:"message"`
}

func (err *AdapterError) Error() string {
return fmt.Sprintf("%s: %s", err.Name, err.Message)
}

func BestEffortExtractEAError(responseBytes []byte) error {
var errorResponse adapterErrorResponse
err := json.Unmarshal(responseBytes, &errorResponse)
if err != nil {
return nil
}
if errorResponse.Error != nil {
return errorResponse.Error
}
return nil
}
6 changes: 5 additions & 1 deletion core/services/pipeline/task.bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline/internal/eautils"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline/eautils"
)

// NOTE: These metrics generate a new label per bridge, this should be safe
Expand Down Expand Up @@ -178,6 +178,10 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp
}

if err != nil || statusCode != http.StatusOK {
if adapterErr := eautils.BestEffortExtractEAError(responseBytes); adapterErr != nil {
err = adapterErr
}

promBridgeErrors.WithLabelValues(t.Name).Inc()
if cacheTTL == 0 {
return Result{Error: err}, RunInfo{IsRetryable: isRetryableHTTPError(statusCode, err)}
Expand Down
74 changes: 66 additions & 8 deletions core/services/pipeline/task.bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand All @@ -32,7 +33,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline/internal/eautils"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline/eautils"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
Expand Down Expand Up @@ -1088,12 +1089,6 @@ func TestBridgeTask_AdapterResponseStatusFailure(t *testing.T) {
require.NoError(t, err)
task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c)

// Insert entry 1m in the past, stale value, should not be used in case of EA failure.
_, err = db.ExecContext(ctx, `INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at)
VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey
DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now())
require.NoError(t, err)

vars := pipeline.NewVarsFrom(
map[string]interface{}{
"jobRun": map[string]interface{}{
Expand All @@ -1104,9 +1099,27 @@ func TestBridgeTask_AdapterResponseStatusFailure(t *testing.T) {
},
)

testAdapterResponse.SetStatusCode(http.StatusInternalServerError)
testAdapterResponse.Error = map[string]interface{}{
"name": "AdapterLWBAError",
"message": "bid ask violation detected",
}
result, runInfo := task.Run(ctx, logger.TestLogger(t), vars, nil)

require.ErrorContains(t, result.Error, "AdapterLWBAError: bid ask violation detected")
require.Nil(t, result.Value)
require.True(t, runInfo.IsRetryable)
require.False(t, runInfo.IsPending)

// Insert entry 1m in the past, stale value, should not be used in case of EA failure.
_, err = db.ExecContext(ctx, `INSERT INTO bridge_last_value(dot_id, spec_id, value, finished_at)
VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT bridge_last_value_pkey
DO UPDATE SET value = $3, finished_at = $4;`, task.DotID(), specID, big.NewInt(9700).Bytes(), time.Now())
require.NoError(t, err)

// expect all external adapter response status failures to be served from the cache
testAdapterResponse.SetStatusCode(http.StatusBadRequest)
result, runInfo := task.Run(ctx, logger.TestLogger(t), vars, nil)
result, runInfo = task.Run(ctx, logger.TestLogger(t), vars, nil)

require.NoError(t, result.Error)
require.NotNil(t, result.Value)
Expand Down Expand Up @@ -1211,3 +1224,48 @@ func TestBridgeTask_AdapterTimeout(t *testing.T) {
require.False(t, runInfo.IsPending)
})
}

func TestBridgeTask_PipelineAdapterLWBAError(t *testing.T) {
t.Parallel()

dag := `
ds [type=bridge name="adapter-error-bridge" timeout="50ms" requestData="{\"data\":{\"from\":\"ETH\",\"to\":\"USD\"}}"];
`

ctx := testutils.Context(t)
db := pgtest.NewSqlxDB(t)
cfg := configtest.NewTestGeneralConfig(t)
orm := bridges.NewORM(db)
r, _ := newRunner(t, db, orm, cfg)

bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
b, herr := io.ReadAll(req.Body)
require.NoError(t, herr)
require.Equal(t, `{"data":{"from":"ETH","to":"USD"}}`, string(b))

res.WriteHeader(http.StatusInternalServerError)
resp := `{"error": {"name":"AdapterLWBAError", "message": "bid ask violation detected"}}`
_, herr = res.Write([]byte(resp))
require.NoError(t, herr)
}))
t.Cleanup(bridge.Close)
u, _ := url.Parse(bridge.URL)
require.NoError(t, orm.CreateBridgeType(ctx, &bridges.BridgeType{
Name: "adapter-error-bridge",
URL: models.WebURL(*u),
}))

spec := pipeline.Spec{DotDagSource: dag}
vars := pipeline.NewVarsFrom(nil)

lggr := logger.TestLogger(t)
_, trrs, err := r.ExecuteRun(ctx, spec, vars, lggr)

require.NoError(t, err)
require.Len(t, trrs, 1)

finalResult := trrs[0]

require.ErrorContains(t, finalResult.Result.Error, "AdapterLWBAError: bid ask violation detected")
require.Nil(t, finalResult.Result.Value)
}
18 changes: 16 additions & 2 deletions core/services/relay/evm/mercury/v3/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,25 @@ import (
"sync"

pkgerrors "github.com/pkg/errors"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/types/mercury"
v3types "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3"
v3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline/eautils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const adapterLWBAErrorName = "AdapterLWBAError"

type Runner interface {
ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error)
}
Expand Down Expand Up @@ -151,6 +152,19 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
cancel()

if pipelineExecutionErr != nil {
var adapterError *eautils.AdapterError
if errors.As(pipelineExecutionErr, &adapterError) && adapterError.Name == adapterLWBAErrorName {
ocrcommon.MaybeEnqueueEnhancedTelem(ds.jb, ds.chEnhancedTelem, ocrcommon.EnhancedTelemetryMercuryData{
V3Observation: &obs,
TaskRunResults: trrs,
RepTimestamp: repts,
FeedVersion: mercuryutils.REPORT_V3,
FetchMaxFinalizedTimestamp: fetchMaxFinalizedTimestamp,
IsLinkFeed: isLink,
IsNativeFeed: isNative,
DpInvariantViolationDetected: true,
})
}
return
}

Expand Down
Loading
Loading