Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 173 additions & 114 deletions core/capabilities/launcher.go

Large diffs are not rendered by default.

20 changes: 12 additions & 8 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ func TestLauncher(t *testing.T) {
dispatcher := remoteMocks.NewDispatcher(t)

nodes := newNodes(4)
capabilityDonNodes := newNodes(4)
peer := mocks.NewPeer(t)
peer.On("UpdateConnections", mock.Anything).Return(nil)
peer.On("ID").Return(nodes[0])
peer.On("ID").Return(capabilityDonNodes[0])
peer.On("IsBootstrap").Return(false)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
Expand Down Expand Up @@ -108,12 +109,14 @@ func TestLauncher(t *testing.T) {
fullMissingTargetID := "super-duper-target@6.6.6"
missingTargetCapID := RandomUTF8BytesWord()
dID := uint32(1)
capDonID := uint32(2)

localRegistry := buildLocalRegistry()
addDON(localRegistry, dID, uint32(0), uint8(1), true, true, nodes, 1, [][32]byte{triggerCapID, targetCapID, missingTargetCapID})
addCapabilityToDON(localRegistry, dID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil)
addCapabilityToDON(localRegistry, dID, fullTargetID, capabilities.CapabilityTypeTarget, nil)
addCapabilityToDON(localRegistry, dID, fullMissingTargetID, capabilities.CapabilityTypeTarget, nil)
addDON(localRegistry, capDonID, uint32(0), uint8(1), true, false, capabilityDonNodes, 1, [][32]byte{triggerCapID, targetCapID})
addCapabilityToDON(localRegistry, capDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil)
addCapabilityToDON(localRegistry, capDonID, fullTargetID, capabilities.CapabilityTypeTarget, nil)
addCapabilityToDON(localRegistry, capDonID, fullMissingTargetID, capabilities.CapabilityTypeTarget, nil)

launcher := NewLauncher(
lggr,
Expand All @@ -127,8 +130,8 @@ func TestLauncher(t *testing.T) {
require.NoError(t, launcher.Start(t.Context()))
defer launcher.Close()

dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*executable.server")).Return(nil)
dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*executable.server")).Return(nil)

require.NoError(t, launcher.OnNewRegistry(t.Context(), localRegistry))
})
Expand Down Expand Up @@ -885,8 +888,9 @@ func TestLauncher_V2CapabilitiesExposeRemotely(t *testing.T) {
"Write": {
RemoteConfig: &capabilitiespb.CapabilityMethodConfig_RemoteExecutableConfig{
RemoteExecutableConfig: &capabilitiespb.RemoteExecutableConfig{
RequestTimeout: durationpb.New(30 * time.Second),
DeltaStage: durationpb.New(1 * time.Second),
RequestTimeout: durationpb.New(30 * time.Second),
ServerMaxParallelRequests: 10,
DeltaStage: durationpb.New(1 * time.Second),
},
},
},
Expand Down
109 changes: 83 additions & 26 deletions core/capabilities/remote/executable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
Expand All @@ -27,55 +28,98 @@ import (
// client communicates with corresponding server on remote nodes.
type client struct {
services.StateMachine
lggr logger.Logger
remoteCapabilityInfo commoncap.CapabilityInfo
localDONInfo commoncap.DON
dispatcher types.Dispatcher
requestTimeout time.Duration
// Has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
transmissionConfig *transmission.TransmissionConfig
capMethodName string
capabilityID string
capMethodName string
dispatcher types.Dispatcher
cfg atomic.Pointer[dynamicConfig]
lggr logger.Logger

requestIDToCallerRequest map[string]*request.ClientRequest
mutex sync.Mutex
stopCh services.StopChan
wg sync.WaitGroup
}

type dynamicConfig struct {
Copy link
Contributor

@mchain0 mchain0 Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe some sort of "version" tag field could make it easier in debugging and logs, knowing which config are we currently dealing with application wide?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't store any version onchain so it's not really possible to map it to anything sensible here.
Launcher calls SetConfig() on every iteration, even if values didn't change so an update timestamp also doesn't make too much sense...

remoteCapabilityInfo commoncap.CapabilityInfo
localDONInfo commoncap.DON
requestTimeout time.Duration
// Has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
transmissionConfig *transmission.TransmissionConfig
}

type Client interface {
commoncap.ExecutableCapability
Receive(ctx context.Context, msg *types.MessageBody)
SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig) error
}

var _ Client = &client{}
var _ types.Receiver = &client{}
var _ services.Service = &client{}

const expiryCheckInterval = 30 * time.Second
const defaultExpiryCheckInterval = 30 * time.Second

var (
ErrRequestExpired = errors.New("request expired by executable client")
ErrContextDoneBeforeResponseQuorum = errors.New("context done before remote client received a quorum of responses")
)

// TransmissionConfig has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, capMethodName string, lggr logger.Logger) *client {
func NewClient(capabilityID string, capMethodName string, dispatcher types.Dispatcher, lggr logger.Logger) *client {
return &client{
lggr: logger.Named(lggr, "ExecutableCapabilityClient"),
remoteCapabilityInfo: remoteCapabilityInfo,
localDONInfo: localDonInfo,
dispatcher: dispatcher,
requestTimeout: requestTimeout,
transmissionConfig: transmissionConfig,
capabilityID: capabilityID,
capMethodName: capMethodName,
dispatcher: dispatcher,
lggr: logger.Named(lggr, "ExecutableCapabilityClient"),
requestIDToCallerRequest: make(map[string]*request.ClientRequest),
stopCh: make(services.StopChan),
}
}

// SetConfig sets the remote capability configuration dynamically
// TransmissionConfig has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
func (c *client) SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig) error {
if remoteCapabilityInfo.ID == "" || remoteCapabilityInfo.ID != c.capabilityID {
return fmt.Errorf("capability info provided does not match the client's capabilityID: %s != %s", remoteCapabilityInfo.ID, c.capabilityID)
}
if len(localDonInfo.Members) == 0 {
return errors.New("empty localDonInfo provided")
}
if requestTimeout <= 0 {
return errors.New("requestTimeout must be positive")
}

// always replace the whole dynamicConfig object to avoid inconsistent state
c.cfg.Store(&dynamicConfig{
remoteCapabilityInfo: remoteCapabilityInfo,
localDONInfo: localDonInfo,
requestTimeout: requestTimeout,
transmissionConfig: transmissionConfig,
})
return nil
}

func (c *client) Start(ctx context.Context) error {
return c.StartOnce(c.Name(), func() error {
cfg := c.cfg.Load()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is non-recoverable.

if you pass a bad config, and call start, then call setConfig with a good config, and call start does it start?

i don't think it will. is there a test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all make a typed error for the caller to check if you want them to handle it ~

var ErrConfigNotSet = errors.New(...)
...
return fmt.Errorf("%w: empty remote capabilty", ErrConfigNotSet)

++ caller ++
err : = x.Start()
if errors.Is(ErrConfigNotSet) {
x.SetConfig
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Launcher will cache the shim only when Start() succeeded. Otherwise it will be re-created on each iteration with fresh config. It wasn't implemented consistently for all 4 cases but I now fixed it and added comments to point it out. Thanks!


// Validate that all required fields are set before starting
if cfg == nil {
return errors.New("config not set - call SetConfig() before Start()")
}
if cfg.remoteCapabilityInfo.ID == "" {
return errors.New("remote capability info not set - call SetConfig() before Start()")
}
if len(cfg.localDONInfo.Members) == 0 {
return errors.New("local DON info not set - call SetConfig() before Start()")
}
if cfg.requestTimeout <= 0 {
return errors.New("request timeout not set - call SetConfig() before Start()")
}
if c.dispatcher == nil {
return errors.New("dispatcher set to nil, cannot start client")
}

c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down Expand Up @@ -118,22 +162,26 @@ func (c *client) checkDispatcherReady() {
}

func (c *client) checkForExpiredRequests() {
tickerInterval := expiryCheckInterval
if c.requestTimeout < tickerInterval {
tickerInterval = c.requestTimeout
}
ticker := time.NewTicker(tickerInterval)
ticker := time.NewTicker(getClientTickerInterval(c.cfg.Load()))
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
ticker.Reset(getClientTickerInterval(c.cfg.Load()))
c.expireRequests()
}
}
}

func getClientTickerInterval(cfg *dynamicConfig) time.Duration {
if cfg != nil && cfg.requestTimeout > 0 {
return cfg.requestTimeout
}
return defaultExpiryCheckInterval
}

func (c *client) expireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()
Expand All @@ -160,7 +208,11 @@ func (c *client) cancelAllRequests(err error) {
}

func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
cfg := c.cfg.Load()
if cfg == nil {
return commoncap.CapabilityInfo{}, errors.New("config not set - call SetConfig() before Info()")
}
return cfg.remoteCapabilityInfo, nil
}

func (c *client) RegisterToWorkflow(ctx context.Context, registerRequest commoncap.RegisterToWorkflowRequest) error {
Expand All @@ -172,8 +224,13 @@ func (c *client) UnregisterFromWorkflow(ctx context.Context, unregisterRequest c
}

func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout, c.transmissionConfig, c.capMethodName)
cfg := c.cfg.Load()
if cfg == nil {
return commoncap.CapabilityResponse{}, errors.New("config not set - call SetConfig() before Execute()")
}

req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, cfg.remoteCapabilityInfo, cfg.localDONInfo, c.dispatcher,
cfg.requestTimeout, cfg.transmissionConfig, c.capMethodName)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to create client request: %w", err)
}
Expand Down
Loading
Loading