Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import (
"fmt"
"iter"
"regexp"
"strconv"
"strings"
"time"

p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/smartcontractkit/chainlink-common/pkg/contexts"
"github.com/smartcontractkit/chainlink-protos/cre/go/values"

"github.com/smartcontractkit/chainlink-common/pkg/contexts"
)

// CapabilityType is an enum for the type of capability.
Expand Down Expand Up @@ -180,6 +182,35 @@ func ParseID(id string) (name string, labels iter.Seq2[string, string], version
return
}

// ChainSelectorLabel returns a chain selector value from the labels if one is present.
// It supports both a normal key/value pair, and sequential keys for historical reasons.
func ChainSelectorLabel(labels iter.Seq2[string, string]) (*uint64, error) {
const key = "ChainSelector"
var next bool
for k, v := range labels {
if next {
cs, err := strconv.ParseUint(k, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid chain selector: %s", v)
}
return &cs, nil
}
if k == key {
if v != "" {
cs, err := strconv.ParseUint(v, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid chain selector: %s", v)
}
return &cs, nil
} else {
// empty value means it will be in the next key
next = true
}
}
}
return nil, nil
}

type RegisterToWorkflowRequest struct {
Metadata RegistrationMetadata
Config *values.Map
Expand Down
29 changes: 29 additions & 0 deletions pkg/capabilities/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,32 @@ func TestParseID(t *testing.T) {
})
}
}

func TestChainSelectorLabel(t *testing.T) {
for _, tc := range []struct {
id string
cs *uint64
errMsg string
}{
{"none@v1.0.0", nil, ""},
{"kv:ChainSelector_1@v1.0.0", ptr[uint64](1), ""},
{"kk:ChainSelector:1@v1.0.0", ptr[uint64](1), ""},
{"kv-others:k_v:ChainSelector_1@v1.0.0", ptr[uint64](1), ""},
{"kk-others:k_v:ChainSelector:1@v1.0.0", ptr[uint64](1), ""},

{"kv:ChainSelector_foo@v1.0.0", ptr[uint64](1), "invalid chain selector"},
{"kk:ChainSelector:bar@v1.0.0", ptr[uint64](1), "invalid chain selector"},
} {
t.Run(tc.id, func(t *testing.T) {
_, labels, _ := ParseID(tc.id)
cs, err := ChainSelectorLabel(labels)
if tc.errMsg != "" {
require.ErrorContains(t, err, tc.errMsg)
} else {
require.Equal(t, tc.cs, cs)
}
})
}
}

func ptr[T any](v T) *T { return &v }
23 changes: 23 additions & 0 deletions pkg/contexts/chains.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package contexts

import (
"context"
"errors"
)

const chainSelectorCtxKey key = "chainSelectorCtx"

// WithChainSelector returns a new context that includes the chain selector.
// Use ChainSelectorValue to get the value.
func WithChainSelector(ctx context.Context, cs uint64) context.Context {
return context.WithValue(ctx, chainSelectorCtxKey, cs)
}

// ChainSelectorValue returns the chain selector, if one was set via WithChainSelector.
func ChainSelectorValue(ctx context.Context) (uint64, error) {
val := Value[uint64](ctx, chainSelectorCtxKey)
if val == 0 {
return 0, errors.New("context missing chain selector")
}
return val, nil
}
7 changes: 7 additions & 0 deletions pkg/settings/cresettings/defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
"ConsensusCallsLimit": "2000",
"LogLineLimit": "1kb",
"LogEventLimit": "1000",
"ChainAllowed": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we care to name it something like AdditionalChain instead of ChainAllowed since the latter seems to imply that the empty value forbids them all, whereas it is just meant that "secret" chains are not allowed yet.

Copy link
Contributor Author

@jmank88 jmank88 Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty does forbid them all. We don't have any pre-existing notion of public or private to build on - we just have the chain selector ID.

We could include a default set of public chains here, but I was concerned that is an implementation detail, and regardless it will evolve over time and different between environments and zones, so it can be stale.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, so how one will keep it up to date - will we have a file with all chainIDs somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the deployments repo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, so how deployments repo is better than a set of public chains here? Is it because the deployments repo is a "source of truth" and is better maintained? Also, currently where would such list of public chainIDs be here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make it a separate entry following the capability name like "evm"? Or anything outside "PerWorkflow"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we have chain agnostic selectors rather than per-family chain IDs, and we want this to affect reads and writes. Being under PerWorkflow is what allows us to override and grant access at the workflow level.

"Default": "false",
"Values": {
"12922642891491394802": "true",
"3379446385462418246": "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only these two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could include more chains, but it would be assuming implementation details about the deployed system, and it can't serve as the source of truth because we need to be able to roll out new chains without making code changes here. I originally had it empty, but we use these dev/testnets (EVM chain ID 1337 and 2337) in tests, so including them seemed worthwhile.

}
},
"CRONTrigger": {
"FastestScheduleInterval": "30s",
"RateLimit": "every30s:1"
Expand Down
7 changes: 7 additions & 0 deletions pkg/settings/cresettings/defaults.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ ConsensusCallsLimit = '2000'
LogLineLimit = '1kb'
LogEventLimit = '1000'

[PerWorkflow.ChainAllowed]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't it cause downtime until we push new jobs with limits allowing eth and other chains?
Can we push a job with ChainAllowed fields before we release the version that can read them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll update the settings before updating the nodes: https://github.com/smartcontractkit/chainlink-deployments/pull/8514

Default = 'false'

[PerWorkflow.ChainAllowed.Values]
12922642891491394802 = 'true'
3379446385462418246 = 'true'

[PerWorkflow.CRONTrigger]
FastestScheduleInterval = '30s'
RateLimit = 'every30s:1'
Expand Down
8 changes: 8 additions & 0 deletions pkg/settings/cresettings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ var Default = Schema{
ConsensusCallsLimit: Int(2000),
LogLineLimit: Size(config.KByte),
LogEventLimit: Int(1_000),
ChainAllowed: PerChainSelector(Bool(false), map[string]bool{
// geth-testnet
"3379446385462418246": true,
// geth-devnet2
"12922642891491394802": true,
}),

CRONTrigger: cronTrigger{
FastestScheduleInterval: Duration(30 * time.Second),
Expand Down Expand Up @@ -210,6 +216,8 @@ type Workflows struct {
LogLineLimit Setting[config.Size]
LogEventLimit Setting[int] `unit:"{log}"`

ChainAllowed SettingMap[bool]

CRONTrigger cronTrigger
HTTPTrigger httpTrigger
LogTrigger logTrigger
Expand Down
95 changes: 90 additions & 5 deletions pkg/settings/cresettings/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/contexts"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"
)

var update = flag.Bool("update", false, "update the golden files of this test")
Expand Down Expand Up @@ -74,6 +76,12 @@ func TestSchema_Unmarshal(t *testing.T) {
},
"PerWorkflow": {
"WASMMemoryLimit": "250mb",
"ChainAllowed": {
"Default": "false",
"Values": {
"1": "true"
}
},
"CRONTrigger": {
"RateLimit": "every10s:5"
},
Expand Down Expand Up @@ -103,6 +111,10 @@ func TestSchema_Unmarshal(t *testing.T) {
assert.Equal(t, 48*time.Hour, cfg.PerOrg.ZeroBalancePruningTimeout.DefaultValue)
assert.Equal(t, 99, cfg.PerOwner.WorkflowExecutionConcurrencyLimit.DefaultValue)
assert.Equal(t, 250*config.MByte, cfg.PerWorkflow.WASMMemoryLimit.DefaultValue)
assert.Equal(t, false, cfg.PerWorkflow.ChainAllowed.Default.DefaultValue)
assert.Equal(t, "true", cfg.PerWorkflow.ChainAllowed.Values["1"])
assert.NotNil(t, cfg.PerWorkflow.ChainAllowed.Default.Parse)
assert.NotNil(t, cfg.PerWorkflow.ChainAllowed.KeyFromCtx)
assert.Equal(t, config.Rate{Limit: rate.Every(10 * time.Second), Burst: 5}, cfg.PerWorkflow.CRONTrigger.RateLimit.DefaultValue)
assert.Equal(t, config.Rate{Limit: rate.Every(30 * time.Second), Burst: 3}, cfg.PerWorkflow.HTTPTrigger.RateLimit.DefaultValue)
assert.Equal(t, config.Rate{Limit: rate.Every(13 * time.Second), Burst: 6}, cfg.PerWorkflow.LogTrigger.EventRateLimit.DefaultValue)
Expand Down Expand Up @@ -142,11 +154,6 @@ func TestDefaultGetter(t *testing.T) {
}`)
reinit() // set default vars

_ = `
[workflow.test-wf-id]
PerWorkflow.HTTPAction.CallLimit = 20
`

// Default unchanged
got, err = limit.GetOrDefault(ctx, DefaultGetter)
require.NoError(t, err)
Expand All @@ -158,3 +165,81 @@ PerWorkflow.HTTPAction.CallLimit = 20
require.Equal(t, 20, got)

}

func TestDefaultGetter_SettingMap(t *testing.T) {
limit := Default.PerWorkflow.ChainAllowed

ctx := contexts.WithCRE(t.Context(), contexts.CRE{Owner: "owner-id", Workflow: "foo"})
ctx = contexts.WithChainSelector(ctx, 1234)
overrideCtx := contexts.WithCRE(t.Context(), contexts.CRE{Owner: "owner-id", Workflow: "test-wf-id"})
overrideCtx = contexts.WithChainSelector(overrideCtx, 1234)

// None allowed by default
got, err := limit.GetOrDefault(ctx, DefaultGetter)
require.NoError(t, err)
require.False(t, got)
got, err = limit.GetOrDefault(overrideCtx, DefaultGetter)
require.NoError(t, err)
require.False(t, got)

t.Cleanup(reinit) // restore default vars

// Org override to allow
t.Setenv(envNameSettings, `{
"workflow": {
"test-wf-id": {
"PerWorkflow": {
"ChainAllowed": {
"Values": {
"1234": "true"
}
}
}
}
}
}`)
reinit() // set default vars
got, err = limit.GetOrDefault(ctx, DefaultGetter)
require.NoError(t, err)
require.False(t, got)
got, err = limit.GetOrDefault(overrideCtx, DefaultGetter)
require.NoError(t, err)
require.True(t, got)

// Org override to allow by default, but disallow some
t.Setenv(envNameSettings, `{
"workflow": {
"test-wf-id": {
"PerWorkflow": {
"ChainAllowed": {
"Default": true,
"Values": {
"1234": "false"
}
}
}
}
}
}`)
reinit() // set default vars
got, err = limit.GetOrDefault(ctx, DefaultGetter)
require.NoError(t, err)
require.False(t, got)
got, err = limit.GetOrDefault(overrideCtx, DefaultGetter)
require.NoError(t, err)
require.False(t, got)
got, err = limit.GetOrDefault(contexts.WithChainSelector(overrideCtx, 42), DefaultGetter)
require.NoError(t, err)
require.True(t, got)
}

func TestChainAllows(t *testing.T) {
gl, err := limits.MakeGateLimiter(limits.Factory{Logger: logger.Test(t)}, Default.PerWorkflow.ChainAllowed)
require.NoError(t, err)

ctx := contexts.WithCRE(t.Context(), contexts.CRE{Owner: "owner-id", Workflow: "foo"})

assert.NoError(t, gl.AllowErr(contexts.WithChainSelector(ctx, 3379446385462418246)))
assert.NoError(t, gl.AllowErr(contexts.WithChainSelector(ctx, 12922642891491394802)))
assert.ErrorIs(t, gl.AllowErr(contexts.WithChainSelector(ctx, 1234)), limits.ErrorNotAllowed{})
}
28 changes: 18 additions & 10 deletions pkg/settings/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"io/fs"
"maps"
"slices"
"strconv"
"strings"
)

// CombineJSONFiles reads a set of JSON config files and combines them in to one file. The expected inputs are:
// - global.json
// - org/*.json
// - owner/*.json
// - workflow/*.json
// - global.json
// - org/*.json
// - owner/*.json
// - workflow/*.json
//
// The directory and file names translate to keys in the JSON structure, while the file extensions are discarded.
// For example: owner/0x1234.json:Foo.Bar becomes owner.0x1234.Foo.Bar
func CombineJSONFiles(files fs.FS) ([]byte, error) {
Expand Down Expand Up @@ -152,11 +154,17 @@ func (s *jsonSettings) get(key string) (string, error) {
}

field := parts[len(parts)-1]
switch t := m[field].(type) {
case string:
return t, nil
case json.Number:
return t.String(), nil
if val, ok := m[field]; ok {
switch t := val.(type) {
case string:
return t, nil
case json.Number:
return t.String(), nil
case bool:
return strconv.FormatBool(t), nil
default:
return "", fmt.Errorf("non-string value: %s: %t(%v)", key, val, val)
}
}
return "", nil // no value
}
Expand All @@ -166,7 +174,7 @@ type jsonGetter struct {
}

// NewJSONGetter returns a static Getter backed by the given JSON.
//TODO https://smartcontract-it.atlassian.net/browse/CAPPL-775
// TODO https://smartcontract-it.atlassian.net/browse/CAPPL-775
// NewJSONRegistry with polling & subscriptions
func NewJSONGetter(b []byte) (Getter, error) {
s, err := newJSONSettings(b)
Expand Down
3 changes: 2 additions & 1 deletion pkg/settings/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ func (s Scope) rawKeys(ctx context.Context, key string) (keys []string, err erro
if i.IsTenantRequired() {
err = errors.Join(err, fmt.Errorf("empty %s key", i))
}
} else {
keys = append(keys, i.String()+"."+tenant+"."+key)
}
keys = append(keys, i.String()+"."+tenant+"."+key)
}
keys = append(keys, ScopeGlobal.String()+"."+key) // ScopeGlobal
return
Expand Down
6 changes: 2 additions & 4 deletions pkg/settings/limits/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ func newBoundLimiter[N Number](f Factory, bound settings.Setting[N]) (BoundLimit
updater: newUpdater[N](nil, func(ctx context.Context) (N, error) {
return bound.GetOrDefault(ctx, f.Settings)
}, nil),
defaultBound: bound.DefaultValue,
key: bound.Key,
scope: bound.Scope,
key: bound.Key,
scope: bound.Scope,
}
b.updater.recordLimit = func(ctx context.Context, n N) { b.recordBound(ctx, n) }

Expand Down Expand Up @@ -115,7 +114,6 @@ func newBoundLimiter[N Number](f Factory, bound settings.Setting[N]) (BoundLimit

type boundLimiter[N Number] struct {
*updater[N]
defaultBound N

key string // optional
scope settings.Scope
Expand Down
21 changes: 21 additions & 0 deletions pkg/settings/limits/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,24 @@ func (e ErrorQueueFull) Error() string {
}

var ErrQueueEmpty = fmt.Errorf("queue is empty")

type ErrorNotAllowed struct {
Key string

Scope settings.Scope
Tenant string
}

func (e ErrorNotAllowed) GRPCStatus() *status.Status {
return status.New(codes.PermissionDenied, e.Error())
}

func (e ErrorNotAllowed) Is(target error) bool {
_, ok := target.(ErrorNotAllowed) //nolint:errcheck // implementing errors.Is
return ok
}

func (e ErrorNotAllowed) Error() string {
which, who := errArgs(e.Key, e.Scope, e.Tenant)
return fmt.Sprintf("%slimited%s: not allowed", which, who)
}
Loading
Loading