Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-4447 Promwrapper for OCR3 plugins and factories #15521

Merged
merged 16 commits into from
Dec 6, 2024
5 changes: 5 additions & 0 deletions .changeset/fuzzy-hairs-appear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Prometheus observability layer added to OCR3 Reporting Plugins #internal
9 changes: 9 additions & 0 deletions core/capabilities/ccip/oraclecreator/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr3/promwrapper"

"github.com/smartcontractkit/libocr/commontypes"
libocr3 "github.com/smartcontractkit/libocr/offchainreporting2plus"
Expand Down Expand Up @@ -229,6 +230,12 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter(
) (ocr3types.ReportingPluginFactory[[]byte], ocr3types.ContractTransmitter[[]byte], error) {
var factory ocr3types.ReportingPluginFactory[[]byte]
var transmitter ocr3types.ContractTransmitter[[]byte]

chainID, err := chainsel.GetChainIDFromSelector(uint64(config.Config.ChainSelector))
if err != nil {
return nil, nil, fmt.Errorf("unsupported chain selector %d %w", config.Config.ChainSelector, err)
}

if config.Config.PluginType == uint8(cctypes.PluginTypeCCIPCommit) {
if !i.peerWrapper.IsStarted() {
return nil, nil, fmt.Errorf("peer wrapper is not started")
Expand Down Expand Up @@ -263,6 +270,7 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter(
rmnPeerClient,
rmnCrypto,
)
factory = promwrapper.NewReportingPluginFactory[[]byte](factory, chainID, "CCIPCommit")
transmitter = ocrimpls.NewCommitContractTransmitter[[]byte](destChainWriter,
ocrtypes.Account(destFromAccounts[0]),
hexutil.Encode(config.Config.OfframpAddress), // TODO: this works for evm only, how about non-evm?
Expand All @@ -283,6 +291,7 @@ func (i *pluginOracleCreator) createFactoryAndTransmitter(
contractReaders,
chainWriters,
)
factory = promwrapper.NewReportingPluginFactory[[]byte](factory, chainID, "CCIPExec")
transmitter = ocrimpls.NewExecContractTransmitter[[]byte](destChainWriter,
ocrtypes.Account(destFromAccounts[0]),
hexutil.Encode(config.Config.OfframpAddress), // TODO: this works for evm only, how about non-evm?
Expand Down
42 changes: 42 additions & 0 deletions core/services/ocr3/promwrapper/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package promwrapper

import (
"context"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
)

var _ ocr3types.ReportingPluginFactory[any] = &ReportingPluginFactory[any]{}

type ReportingPluginFactory[RI any] struct {
origin ocr3types.ReportingPluginFactory[RI]
chainID string
plugin string
}

func NewReportingPluginFactory[RI any](
origin ocr3types.ReportingPluginFactory[RI],
chainID string,
plugin string,
) *ReportingPluginFactory[RI] {
return &ReportingPluginFactory[RI]{
origin: origin,
chainID: chainID,
plugin: plugin,
}
}

func (r ReportingPluginFactory[RI]) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) {
plugin, info, err := r.origin.NewReportingPlugin(ctx, config)
if err != nil {
return nil, ocr3types.ReportingPluginInfo{}, err
}
wrapped := newReportingPlugin(
plugin,
r.chainID,
r.plugin,
promOCR3ReportsGenerated,
promOCR3Durations,
)
return wrapped, info, err
}
41 changes: 41 additions & 0 deletions core/services/ocr3/promwrapper/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package promwrapper

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

func Test_WrapperFactory(t *testing.T) {
validFactory := NewReportingPluginFactory(fakeFactory[uint]{}, "solana", "plugin")
failingFactory := NewReportingPluginFactory(fakeFactory[uint]{err: errors.New("error")}, "123", "plugin")

plugin, _, err := validFactory.NewReportingPlugin(tests.Context(t), ocr3types.ReportingPluginConfig{})
require.NoError(t, err)

_, err = plugin.Outcome(tests.Context(t), ocr3types.OutcomeContext{}, nil, nil)
require.NoError(t, err)

require.Equal(t, 1, counterFromHistogramByLabels(t, promOCR3Durations, "solana", "plugin", "outcome", "true"))
require.Equal(t, 0, counterFromHistogramByLabels(t, promOCR3Durations, "solana", "plugin", "outcome", "false"))

_, _, err = failingFactory.NewReportingPlugin(tests.Context(t), ocr3types.ReportingPluginConfig{})
require.Error(t, err)
}

type fakeFactory[RI any] struct {
err error
}

func (f fakeFactory[RI]) NewReportingPlugin(context.Context, ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) {
if f.err != nil {
return nil, ocr3types.ReportingPluginInfo{}, f.err
}
return fakePlugin[RI]{}, ocr3types.ReportingPluginInfo{}, nil
}
122 changes: 122 additions & 0 deletions core/services/ocr3/promwrapper/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package promwrapper

import (
"context"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
)

var _ ocr3types.ReportingPlugin[any] = &reportingPlugin[any]{}

type reportingPlugin[RI any] struct {
ocr3types.ReportingPlugin[RI]
chainID string
plugin string

// Prometheus components for tracking metrics
reportsGenerated *prometheus.CounterVec
durations *prometheus.HistogramVec
}

func newReportingPlugin[RI any](
origin ocr3types.ReportingPlugin[RI],
chainID string,
plugin string,
reportsGenerated *prometheus.CounterVec,
durations *prometheus.HistogramVec,
) *reportingPlugin[RI] {
return &reportingPlugin[RI]{
ReportingPlugin: origin,
chainID: chainID,
plugin: plugin,
reportsGenerated: reportsGenerated,
durations: durations,
}
}

func (p *reportingPlugin[RI]) Query(ctx context.Context, outctx ocr3types.OutcomeContext) (ocrtypes.Query, error) {
return withObservedExecution(p, query, func() (ocrtypes.Query, error) {
return p.ReportingPlugin.Query(ctx, outctx)
})
}

func (p *reportingPlugin[RI]) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query) (ocrtypes.Observation, error) {
return withObservedExecution(p, observation, func() (ocrtypes.Observation, error) {
return p.ReportingPlugin.Observation(ctx, outctx, query)
})
}

func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, ao ocrtypes.AttributedObservation) error {
_, err := withObservedExecution(p, validateObservation, func() (any, error) {
err := p.ReportingPlugin.ValidateObservation(ctx, outctx, query, ao)
return nil, err
})
return err
}

func (p *reportingPlugin[RI]) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, aos []ocrtypes.AttributedObservation) (ocr3types.Outcome, error) {
return withObservedExecution(p, outcome, func() (ocr3types.Outcome, error) {
return p.ReportingPlugin.Outcome(ctx, outctx, query, aos)
})
}

func (p *reportingPlugin[RI]) Reports(ctx context.Context, seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportPlus[RI], error) {
result, err := withObservedExecution(p, reports, func() ([]ocr3types.ReportPlus[RI], error) {
return p.ReportingPlugin.Reports(ctx, seqNr, outcome)
})
p.trackReports(reports, len(result))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that we are tracking report even when we have an error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reports should be nil or empty in that case

return result, err
}

func (p *reportingPlugin[RI]) ShouldAcceptAttestedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[RI]) (bool, error) {
result, err := withObservedExecution(p, shouldAccept, func() (bool, error) {
return p.ReportingPlugin.ShouldAcceptAttestedReport(ctx, seqNr, reportWithInfo)
})
p.trackReports(shouldAccept, boolToInt(result))
return result, err
}

func (p *reportingPlugin[RI]) ShouldTransmitAcceptedReport(ctx context.Context, seqNr uint64, reportWithInfo ocr3types.ReportWithInfo[RI]) (bool, error) {
result, err := withObservedExecution(p, shouldTransmit, func() (bool, error) {
return p.ReportingPlugin.ShouldTransmitAcceptedReport(ctx, seqNr, reportWithInfo)
})
p.trackReports(shouldTransmit, boolToInt(result))
return result, err
}

func (p *reportingPlugin[RI]) trackReports(
function functionType,
count int,
) {
p.reportsGenerated.
WithLabelValues(p.chainID, p.plugin, string(function)).
Add(float64(count))
}

func boolToInt(arg bool) int {
if arg {
return 1
}
return 0
}

func withObservedExecution[RI, R any](
p *reportingPlugin[RI],
function functionType,
exec func() (R, error),
) (R, error) {
start := time.Now()
result, err := exec()

success := err == nil

p.durations.
WithLabelValues(p.chainID, p.plugin, string(function), strconv.FormatBool(success)).
Observe(float64(time.Since(start)))

return result, err
}
Loading
Loading