diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index 6b2d78e33c6..361deb815ed 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -61,9 +61,9 @@ type launcher struct { type cachedShims struct { combinedClients map[string]remote.CombinedClient triggerSubscribers map[string]remote.TriggerSubscriber + triggerPublishers map[string]remote.TriggerPublisher executableClients map[string]executable.Client - - // TODO(CRE-942): add trigger publishers and executable servers + executableServers map[string]executable.Server } func shimKey(capID string, donID uint32, method string) string { @@ -109,7 +109,9 @@ func NewLauncher( cachedShims: cachedShims{ combinedClients: make(map[string]remote.CombinedClient), triggerSubscribers: make(map[string]remote.TriggerSubscriber), + triggerPublishers: make(map[string]remote.TriggerPublisher), executableClients: make(map[string]executable.Client), + executableServers: make(map[string]executable.Server), }, registry: registry, subServices: []services.Service{}, @@ -482,16 +484,22 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, c regist } case capabilities.CapabilityTypeAction: newActionFn := func(info capabilities.CapabilityInfo) (capabilityService, error) { - client := executable.NewClient( - info, - myDON.DON, - w.dispatcher, - defaultTargetRequestTimeout, - nil, // V1 capabilities read transmission schedule from every request - "", // empty method name for v1 - w.lggr, - ) - return client, nil + shimKey := shimKey(capability.ID, remoteDON.ID, "") // empty method name for V1 + execCap, alreadyExists := w.cachedShims.executableClients[shimKey] + if !alreadyExists { + execCap = executable.NewClient( + info.ID, + "", // empty method name for v1 + w.dispatcher, + w.lggr, + ) + w.cachedShims.executableClients[shimKey] = execCap + } + // V1 capabilities read transmission schedule from every request + if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil); errCfg != nil { + return nil, fmt.Errorf("failed to set trigger config: %w", errCfg) + } + return execCap.(capabilityService), nil } err := w.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newActionFn) @@ -502,16 +510,22 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, c regist // nothing to do; we don't support remote consensus capabilities for now case capabilities.CapabilityTypeTarget: newTargetFn := func(info capabilities.CapabilityInfo) (capabilityService, error) { - client := executable.NewClient( - info, - myDON.DON, - w.dispatcher, - defaultTargetRequestTimeout, - nil, // V1 capabilities read transmission schedule from every request - "", // empty method name for v1 - w.lggr, - ) - return client, nil + shimKey := shimKey(capability.ID, remoteDON.ID, "") // empty method name for V1 + execCap, alreadyExists := w.cachedShims.executableClients[shimKey] + if !alreadyExists { + execCap = executable.NewClient( + info.ID, + "", // empty method name for v1 + w.dispatcher, + w.lggr, + ) + w.cachedShims.executableClients[shimKey] = execCap + } + // V1 capabilities read transmission schedule from every request + if errCfg := execCap.SetConfig(info, myDON.DON, defaultTargetRequestTimeout, nil); errCfg != nil { + return nil, fmt.Errorf("failed to set trigger config: %w", errCfg) + } + return execCap.(capabilityService), nil } err := w.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newTargetFn) @@ -580,7 +594,7 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability var ( // TODO: make this configurable defaultTargetRequestTimeout = 8 * time.Minute - defaultMaxParallelCapabilityExecuteRequests = 1000 + defaultMaxParallelCapabilityExecuteRequests = uint32(1000) ) // serveCapabilities exposes capabilities that are available on this node, as part of the given DON. @@ -630,17 +644,20 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy if !ok { return nil, errors.New("capability does not implement TriggerCapability") } - - publisher := remote.NewTriggerPublisher( - capabilityConfig.RemoteTriggerConfig, - triggerCapability, - info, - don.DON, - idsToDONs, - w.dispatcher, - "", // empty method name for v1 - w.lggr, - ) + shimKey := shimKey(capability.ID, don.ID, "") // empty method name for V1 + publisher, alreadyExists := w.cachedShims.triggerPublishers[shimKey] + if !alreadyExists { + publisher = remote.NewTriggerPublisher( + capability.ID, + "", // empty method name for v1 + w.dispatcher, + w.lggr, + ) + w.cachedShims.triggerPublishers[shimKey] = publisher + } + if errCfg := publisher.SetConfig(capabilityConfig.RemoteTriggerConfig, triggerCapability, don.DON, idsToDONs); errCfg != nil { + return nil, fmt.Errorf("failed to set config for trigger publisher: %w", errCfg) + } return publisher, nil } @@ -653,26 +670,40 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy if !ok { return nil, errors.New("capability does not implement ActionCapability") } + shimKey := shimKey(capability.ID, don.ID, "") // empty method name for V1 + server, alreadyExists := w.cachedShims.executableServers[shimKey] + if !alreadyExists { + server = executable.NewServer( + info.ID, + "", // empty method name for v1 + myPeerID, + w.dispatcher, + w.lggr, + ) + w.cachedShims.executableServers[shimKey] = server + } - remoteConfig := &capabilities.RemoteExecutableConfig{} + remoteConfig := &capabilities.RemoteExecutableConfig{ + // deprecated defaults - v2 reads these from onchain config + RequestTimeout: defaultTargetRequestTimeout, + ServerMaxParallelRequests: defaultMaxParallelCapabilityExecuteRequests, + } if capabilityConfig.RemoteTargetConfig != nil { remoteConfig.RequestHashExcludedAttributes = capabilityConfig.RemoteTargetConfig.RequestHashExcludedAttributes } - - return executable.NewServer( + errCfg := server.SetConfig( remoteConfig, - myPeerID, actionCapability, info, don.DON, idsToDONs, - w.dispatcher, - defaultTargetRequestTimeout, - defaultMaxParallelCapabilityExecuteRequests, - nil, // TODO: create a capability-specific hasher - "", // empty method name for v1 - w.lggr, - ), nil + nil, + ) + if errCfg != nil { + return nil, fmt.Errorf("failed to set server config: %w", errCfg) + } + + return server, nil } if err = w.addReceiver(ctx, capability, don, newActionServer); err != nil { @@ -687,25 +718,40 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy return nil, errors.New("capability does not implement TargetCapability") } - remoteConfig := &capabilities.RemoteExecutableConfig{} + shimKey := shimKey(capability.ID, don.ID, "") // empty method name for V1 + server, alreadyExists := w.cachedShims.executableServers[shimKey] + if !alreadyExists { + server = executable.NewServer( + info.ID, + "", // empty method name for v1 + myPeerID, + w.dispatcher, + w.lggr, + ) + w.cachedShims.executableServers[shimKey] = server + } + + remoteConfig := &capabilities.RemoteExecutableConfig{ + // deprecated defaults - v2 reads these from onchain config + RequestTimeout: defaultTargetRequestTimeout, + ServerMaxParallelRequests: defaultMaxParallelCapabilityExecuteRequests, + } if capabilityConfig.RemoteTargetConfig != nil { remoteConfig.RequestHashExcludedAttributes = capabilityConfig.RemoteTargetConfig.RequestHashExcludedAttributes } - - return executable.NewServer( + errCfg := server.SetConfig( remoteConfig, - myPeerID, targetCapability, info, don.DON, idsToDONs, - w.dispatcher, - defaultTargetRequestTimeout, - defaultMaxParallelCapabilityExecuteRequests, - nil, // TODO: create a capability-specific hasher - "", // empty method name for v1 - w.lggr, - ), nil + nil, + ) + if errCfg != nil { + return nil, fmt.Errorf("failed to set server config: %w", errCfg) + } + + return server, nil } if err = w.addReceiver(ctx, capability, don, newTargetServer); err != nil { @@ -801,6 +847,7 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth if !alreadyExists { sub = remote.NewTriggerSubscriber(capID, method, w.dispatcher, w.lggr) cc.SetTriggerSubscriber(method, sub) + // add to cachedShims later, only after startNewShim succeeds } // TODO(CRE-590): add support for SignedReportAggregator (needed by LLO Streams Trigger V2) agg := aggregation.NewDefaultModeAggregator(config.RemoteTriggerConfig.MinResponsesToAggregate) @@ -820,22 +867,20 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth } else { // executable client, alreadyExists := w.cachedShims.executableClients[shimKey] if !alreadyExists { - client = executable.NewClient( - info, - myDON.DON, - w.dispatcher, - config.RemoteExecutableConfig.RequestTimeout, - &transmission.TransmissionConfig{ - Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule), - DeltaStage: config.RemoteExecutableConfig.DeltaStage, - }, - method, - w.lggr, - ) + client = executable.NewClient(info.ID, method, w.dispatcher, w.lggr) cc.SetExecutableClient(method, client) + // add to cachedShims later, only after startNewShim succeeds + } + // Update existing client config + transmissionConfig := &transmission.TransmissionConfig{ + Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule), + DeltaStage: config.RemoteExecutableConfig.DeltaStage, + } + err := client.SetConfig(info, myDON.DON, config.RemoteExecutableConfig.RequestTimeout, transmissionConfig) + if err != nil { + w.lggr.Errorw("failed to update client config", "capID", capID, "method", method, "error", err) + continue } - - // TODO(CRE-941): implement setters for executable client config if !alreadyExists { if err2 := w.startNewShim(ctx, client.(remotetypes.ReceiverService), capID, remoteDON.ID, method); err2 != nil { @@ -857,15 +902,17 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth return nil } -func (w *launcher) startNewShim(ctx context.Context, receiver remotetypes.ReceiverService, capID string, remoteDonID uint32, method string) error { +func (w *launcher) startNewShim(ctx context.Context, receiver remotetypes.ReceiverService, capID string, donID uint32, method string) error { + w.lggr.Debugw("Starting new remote shim for capability method", "id", capID, "method", method, "donID", donID) if err := receiver.Start(ctx); err != nil { return fmt.Errorf("failed to start receiver for capability %s, method %s: %w", capID, method, err) } - if err := w.dispatcher.SetReceiverForMethod(capID, remoteDonID, method, receiver); err != nil { + if err := w.dispatcher.SetReceiverForMethod(capID, donID, method, receiver); err != nil { _ = receiver.Close() return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err) } w.subServices = append(w.subServices, receiver) + w.lggr.Debugw("New remote shim started successfully for capability method", "id", capID, "method", method, "donID", donID) return nil } @@ -884,28 +931,54 @@ func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodC return fmt.Errorf("failed to get capability %s from registry: %w", capID, err) } for method, config := range methodConfig { - var receiver remotetypes.ReceiverService - if config.RemoteTriggerConfig != nil { + if config.RemoteTriggerConfig != nil { // trigger underlyingTriggerCapability, ok := (underlying).(capabilities.TriggerCapability) if !ok { return fmt.Errorf("capability %s does not implement TriggerCapability", capID) } - receiver = remote.NewTriggerPublisher( - config.RemoteTriggerConfig, - underlyingTriggerCapability, - info, - myDON.DON, - idsToDONs, - w.dispatcher, - method, - w.lggr, - ) - } - if receiver == nil && config.RemoteExecutableConfig != nil { + shimKey := shimKey(capID, myDON.ID, method) + publisher, alreadyExists := w.cachedShims.triggerPublishers[shimKey] + if !alreadyExists { + publisher = remote.NewTriggerPublisher( + capID, + method, + w.dispatcher, + w.lggr, + ) + // add to cachedShims later, only after startNewShim succeeds + } + if errCfg := publisher.SetConfig(config.RemoteTriggerConfig, underlyingTriggerCapability, myDON.DON, idsToDONs); errCfg != nil { + return fmt.Errorf("failed to set config for trigger publisher: %w", errCfg) + } + + if !alreadyExists { + if err2 := w.startNewShim(ctx, publisher.(remotetypes.ReceiverService), capID, myDON.ID, method); err2 != nil { + // TODO CRE-1021 metrics + w.lggr.Errorw("failed to start receiver", "capID", capID, "method", method, "error", err2) + continue + } + w.cachedShims.triggerPublishers[shimKey] = publisher + w.lggr.Infow("added new remote trigger publisher", "capID", capID, "method", method) + } + } else { // executable underlyingExecutableCapability, ok := (underlying).(capabilities.ExecutableCapability) if !ok { return fmt.Errorf("capability %s does not implement ExecutableCapability", capID) } + + shimKey := shimKey(capID, myDON.ID, method) + server, alreadyExists := w.cachedShims.executableServers[shimKey] + if !alreadyExists { + server = executable.NewServer( + info.ID, + method, + myPeerID, + w.dispatcher, + w.lggr, + ) + // add to cachedShims later, only after startNewShim succeeds + } + var requestHasher remotetypes.MessageHasher switch config.RemoteExecutableConfig.RequestHasherType { case capabilities.RequestHasherType_Simple: @@ -915,43 +988,29 @@ func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodC default: requestHasher = executable.NewSimpleHasher() } - receiver = executable.NewServer( + + err := server.SetConfig( config.RemoteExecutableConfig, - myPeerID, underlyingExecutableCapability, info, myDON.DON, idsToDONs, - w.dispatcher, - config.RemoteExecutableConfig.RequestTimeout, - int(config.RemoteExecutableConfig.ServerMaxParallelRequests), requestHasher, - method, - w.lggr, ) - } - if receiver == nil { - return fmt.Errorf("no remote config found for method %s of capability %s", method, capID) - } - - w.lggr.Debugw("Enabling external access for capability method", "id", capID, "method", method, "donID", myDON.ID) - err := w.dispatcher.SetReceiverForMethod(capID, myDON.ID, method, receiver) - if errors.Is(err, remote.ErrReceiverExists) { - // If a receiver already exists, let's log the error for debug purposes, but - // otherwise short-circuit here. We've handled this capability in a previous iteration. - // TODO(CRE-788) support dynamic changes to config and underlying capability - w.lggr.Debugw("receiver already exists", "capabilityID", capID, "donID", myDON.ID, "method", method, "error", err) - return nil - } else if err != nil { - return fmt.Errorf("failed to set receiver for capability %s, method %s: %w", capID, method, err) - } + if err != nil { + return fmt.Errorf("failed to set server config: %w", err) + } - err = receiver.Start(ctx) - if err != nil { - return fmt.Errorf("failed to start receiver for capability %s, method %s: %w", capID, method, err) + if !alreadyExists { + if err2 := w.startNewShim(ctx, server.(remotetypes.ReceiverService), capID, myDON.ID, method); err2 != nil { + // TODO CRE-1021 metrics + w.lggr.Errorw("failed to start receiver", "capID", capID, "method", method, "error", err2) + continue + } + w.cachedShims.executableServers[shimKey] = server + w.lggr.Infow("added new remote execcutable server", "capID", capID, "method", method) + } } - - w.subServices = append(w.subServices, receiver) } return nil } diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index d540f326757..5ccd8906fd2 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -77,9 +77,10 @@ func TestLauncher(t *testing.T) { dispatcher := remoteMocks.NewDispatcher(t) nodes := newNodes(4) + capabilityDonNodes := newNodes(4) peer := mocks.NewPeer(t) peer.On("UpdateConnections", mock.Anything).Return(nil) - peer.On("ID").Return(nodes[0]) + peer.On("ID").Return(capabilityDonNodes[0]) peer.On("IsBootstrap").Return(false) wrapper := mocks.NewPeerWrapper(t) wrapper.On("GetPeer").Return(peer) @@ -108,12 +109,14 @@ func TestLauncher(t *testing.T) { fullMissingTargetID := "super-duper-target@6.6.6" missingTargetCapID := RandomUTF8BytesWord() dID := uint32(1) + capDonID := uint32(2) localRegistry := buildLocalRegistry() addDON(localRegistry, dID, uint32(0), uint8(1), true, true, nodes, 1, [][32]byte{triggerCapID, targetCapID, missingTargetCapID}) - addCapabilityToDON(localRegistry, dID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil) - addCapabilityToDON(localRegistry, dID, fullTargetID, capabilities.CapabilityTypeTarget, nil) - addCapabilityToDON(localRegistry, dID, fullMissingTargetID, capabilities.CapabilityTypeTarget, nil) + addDON(localRegistry, capDonID, uint32(0), uint8(1), true, false, capabilityDonNodes, 1, [][32]byte{triggerCapID, targetCapID}) + addCapabilityToDON(localRegistry, capDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil) + addCapabilityToDON(localRegistry, capDonID, fullTargetID, capabilities.CapabilityTypeTarget, nil) + addCapabilityToDON(localRegistry, capDonID, fullMissingTargetID, capabilities.CapabilityTypeTarget, nil) launcher := NewLauncher( lggr, @@ -127,8 +130,8 @@ func TestLauncher(t *testing.T) { require.NoError(t, launcher.Start(t.Context())) defer launcher.Close() - dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil) - dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*executable.server")).Return(nil) + dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil) + dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*executable.server")).Return(nil) require.NoError(t, launcher.OnNewRegistry(t.Context(), localRegistry)) }) @@ -885,8 +888,9 @@ func TestLauncher_V2CapabilitiesExposeRemotely(t *testing.T) { "Write": { RemoteConfig: &capabilitiespb.CapabilityMethodConfig_RemoteExecutableConfig{ RemoteExecutableConfig: &capabilitiespb.RemoteExecutableConfig{ - RequestTimeout: durationpb.New(30 * time.Second), - DeltaStage: durationpb.New(1 * time.Second), + RequestTimeout: durationpb.New(30 * time.Second), + ServerMaxParallelRequests: 10, + DeltaStage: durationpb.New(1 * time.Second), }, }, }, diff --git a/core/capabilities/remote/executable/client.go b/core/capabilities/remote/executable/client.go index daaf003867f..893e2e77492 100644 --- a/core/capabilities/remote/executable/client.go +++ b/core/capabilities/remote/executable/client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -27,14 +28,11 @@ import ( // client communicates with corresponding server on remote nodes. type client struct { services.StateMachine - lggr logger.Logger - remoteCapabilityInfo commoncap.CapabilityInfo - localDONInfo commoncap.DON - dispatcher types.Dispatcher - requestTimeout time.Duration - // Has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request. - transmissionConfig *transmission.TransmissionConfig - capMethodName string + capabilityID string + capMethodName string + dispatcher types.Dispatcher + cfg atomic.Pointer[dynamicConfig] + lggr logger.Logger requestIDToCallerRequest map[string]*request.ClientRequest mutex sync.Mutex @@ -42,40 +40,86 @@ type client struct { wg sync.WaitGroup } +type dynamicConfig struct { + remoteCapabilityInfo commoncap.CapabilityInfo + localDONInfo commoncap.DON + requestTimeout time.Duration + // Has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request. + transmissionConfig *transmission.TransmissionConfig +} + type Client interface { commoncap.ExecutableCapability Receive(ctx context.Context, msg *types.MessageBody) + SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig) error } var _ Client = &client{} var _ types.Receiver = &client{} var _ services.Service = &client{} -const expiryCheckInterval = 30 * time.Second +const defaultExpiryCheckInterval = 30 * time.Second var ( ErrRequestExpired = errors.New("request expired by executable client") ErrContextDoneBeforeResponseQuorum = errors.New("context done before remote client received a quorum of responses") ) -// TransmissionConfig has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request. -func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher, - requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, capMethodName string, lggr logger.Logger) *client { +func NewClient(capabilityID string, capMethodName string, dispatcher types.Dispatcher, lggr logger.Logger) *client { return &client{ - lggr: logger.Named(lggr, "ExecutableCapabilityClient"), - remoteCapabilityInfo: remoteCapabilityInfo, - localDONInfo: localDonInfo, - dispatcher: dispatcher, - requestTimeout: requestTimeout, - transmissionConfig: transmissionConfig, + capabilityID: capabilityID, capMethodName: capMethodName, + dispatcher: dispatcher, + lggr: logger.Named(lggr, "ExecutableCapabilityClient"), requestIDToCallerRequest: make(map[string]*request.ClientRequest), stopCh: make(services.StopChan), } } +// SetConfig sets the remote capability configuration dynamically +// TransmissionConfig has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request. +func (c *client) SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig) error { + if remoteCapabilityInfo.ID == "" || remoteCapabilityInfo.ID != c.capabilityID { + return fmt.Errorf("capability info provided does not match the client's capabilityID: %s != %s", remoteCapabilityInfo.ID, c.capabilityID) + } + if len(localDonInfo.Members) == 0 { + return errors.New("empty localDonInfo provided") + } + if requestTimeout <= 0 { + return errors.New("requestTimeout must be positive") + } + + // always replace the whole dynamicConfig object to avoid inconsistent state + c.cfg.Store(&dynamicConfig{ + remoteCapabilityInfo: remoteCapabilityInfo, + localDONInfo: localDonInfo, + requestTimeout: requestTimeout, + transmissionConfig: transmissionConfig, + }) + return nil +} + func (c *client) Start(ctx context.Context) error { return c.StartOnce(c.Name(), func() error { + cfg := c.cfg.Load() + + // Validate that all required fields are set before starting + if cfg == nil { + return errors.New("config not set - call SetConfig() before Start()") + } + if cfg.remoteCapabilityInfo.ID == "" { + return errors.New("remote capability info not set - call SetConfig() before Start()") + } + if len(cfg.localDONInfo.Members) == 0 { + return errors.New("local DON info not set - call SetConfig() before Start()") + } + if cfg.requestTimeout <= 0 { + return errors.New("request timeout not set - call SetConfig() before Start()") + } + if c.dispatcher == nil { + return errors.New("dispatcher set to nil, cannot start client") + } + c.wg.Add(1) go func() { defer c.wg.Done() @@ -118,22 +162,26 @@ func (c *client) checkDispatcherReady() { } func (c *client) checkForExpiredRequests() { - tickerInterval := expiryCheckInterval - if c.requestTimeout < tickerInterval { - tickerInterval = c.requestTimeout - } - ticker := time.NewTicker(tickerInterval) + ticker := time.NewTicker(getClientTickerInterval(c.cfg.Load())) defer ticker.Stop() for { select { case <-c.stopCh: return case <-ticker.C: + ticker.Reset(getClientTickerInterval(c.cfg.Load())) c.expireRequests() } } } +func getClientTickerInterval(cfg *dynamicConfig) time.Duration { + if cfg != nil && cfg.requestTimeout > 0 { + return cfg.requestTimeout + } + return defaultExpiryCheckInterval +} + func (c *client) expireRequests() { c.mutex.Lock() defer c.mutex.Unlock() @@ -160,7 +208,11 @@ func (c *client) cancelAllRequests(err error) { } func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { - return c.remoteCapabilityInfo, nil + cfg := c.cfg.Load() + if cfg == nil { + return commoncap.CapabilityInfo{}, errors.New("config not set - call SetConfig() before Info()") + } + return cfg.remoteCapabilityInfo, nil } func (c *client) RegisterToWorkflow(ctx context.Context, registerRequest commoncap.RegisterToWorkflowRequest) error { @@ -172,8 +224,13 @@ func (c *client) UnregisterFromWorkflow(ctx context.Context, unregisterRequest c } func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) { - req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher, - c.requestTimeout, c.transmissionConfig, c.capMethodName) + cfg := c.cfg.Load() + if cfg == nil { + return commoncap.CapabilityResponse{}, errors.New("config not set - call SetConfig() before Execute()") + } + + req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, cfg.remoteCapabilityInfo, cfg.localDONInfo, c.dispatcher, + cfg.requestTimeout, cfg.transmissionConfig, c.capMethodName) if err != nil { return commoncap.CapabilityResponse{}, fmt.Errorf("failed to create client request: %w", err) } diff --git a/core/capabilities/remote/executable/client_test.go b/core/capabilities/remote/executable/client_test.go index d2eceea6259..ec564b49001 100644 --- a/core/capabilities/remote/executable/client_test.go +++ b/core/capabilities/remote/executable/client_test.go @@ -242,7 +242,9 @@ func testClient(t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout for i := 0; i < numWorkflowPeers; i++ { workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) - caller := executable.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout, nil, "", lggr) + caller := executable.NewClient(capInfo.ID, "", workflowPeerDispatcher, lggr) + err := caller.SetConfig(capInfo, workflowDonInfo, workflowNodeResponseTimeout, nil) + require.NoError(t, err) servicetest.Run(t, caller) broker.RegisterReceiverNode(workflowPeers[i], caller) callers[i] = caller @@ -368,3 +370,151 @@ func (t *clientTestServer) sendResponse(messageID string, responseErr error, } } } + +func TestClient_SetConfig(t *testing.T) { + lggr := logger.Test(t) + capabilityID := "test_capability@1.0.0" + + // Create broker and dispatcher like other tests + broker := newTestAsyncMessageBroker(t, 100) + peerID := NewP2PPeerID(t) + dispatcher := broker.NewDispatcherForNode(peerID) + client := executable.NewClient(capabilityID, "execute", dispatcher, lggr) + + // Create valid test data + validCapInfo := commoncap.CapabilityInfo{ + ID: capabilityID, + CapabilityType: commoncap.CapabilityTypeAction, + Description: "Test capability", + } + + validDonInfo := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{NewP2PPeerID(t)}, + F: 0, + } + + validTimeout := 30 * time.Second + + t.Run("successful config set", func(t *testing.T) { + transmissionConfig := &transmission.TransmissionConfig{ + Schedule: transmission.Schedule_OneAtATime, + DeltaStage: 10 * time.Millisecond, + } + + err := client.SetConfig(validCapInfo, validDonInfo, validTimeout, transmissionConfig) + require.NoError(t, err) + + // Verify config was set + info, err := client.Info(context.Background()) + require.NoError(t, err) + assert.Equal(t, validCapInfo.ID, info.ID) + }) + + t.Run("mismatched capability ID", func(t *testing.T) { + invalidCapInfo := commoncap.CapabilityInfo{ + ID: "different_capability@1.0.0", + CapabilityType: commoncap.CapabilityTypeAction, + } + + err := client.SetConfig(invalidCapInfo, validDonInfo, validTimeout, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "capability info provided does not match the client's capabilityID") + assert.Contains(t, err.Error(), "different_capability@1.0.0 != test_capability@1.0.0") + }) + + t.Run("empty DON members", func(t *testing.T) { + invalidDonInfo := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{}, + F: 0, + } + + err := client.SetConfig(validCapInfo, invalidDonInfo, validTimeout, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "empty localDonInfo provided") + }) + + t.Run("successful config update", func(t *testing.T) { + // Set initial config + initialTimeout := 10 * time.Second + err := client.SetConfig(validCapInfo, validDonInfo, initialTimeout, nil) + require.NoError(t, err) + + // Replace with new config + newTimeout := 60 * time.Second + newDonInfo := commoncap.DON{ + ID: 2, + Members: []p2ptypes.PeerID{NewP2PPeerID(t), NewP2PPeerID(t)}, + F: 1, + } + + err = client.SetConfig(validCapInfo, newDonInfo, newTimeout, nil) + require.NoError(t, err) + + // Verify the config was completely replaced + info, err := client.Info(context.Background()) + require.NoError(t, err) + assert.Equal(t, validCapInfo.ID, info.ID) + }) +} + +func TestClient_SetConfig_StartClose(t *testing.T) { + ctx := testutils.Context(t) + lggr := logger.Test(t) + capabilityID := "test_capability@1.0.0" + + // Create broker and dispatcher like other tests + broker := newTestAsyncMessageBroker(t, 100) + peerID := NewP2PPeerID(t) + dispatcher := broker.NewDispatcherForNode(peerID) + client := executable.NewClient(capabilityID, "execute", dispatcher, lggr) + + validCapInfo := commoncap.CapabilityInfo{ + ID: capabilityID, + CapabilityType: commoncap.CapabilityTypeAction, + Description: "Test capability", + } + + validDonInfo := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{NewP2PPeerID(t)}, + F: 0, + } + + validTimeout := 30 * time.Second + + t.Run("start fails without config", func(t *testing.T) { + clientWithoutConfig := executable.NewClient(capabilityID, "execute", dispatcher, lggr) + err := clientWithoutConfig.Start(ctx) + require.Error(t, err) + assert.Contains(t, err.Error(), "config not set - call SetConfig() before Start()") + }) + + t.Run("start succeeds after config set", func(t *testing.T) { + require.NoError(t, client.SetConfig(validCapInfo, validDonInfo, validTimeout, nil)) + require.NoError(t, client.Start(ctx)) + require.NoError(t, client.Close()) + }) + + t.Run("config can be updated after start", func(t *testing.T) { + // Create a fresh client for this test since services can only be started once + freshClient := executable.NewClient(capabilityID, "execute", dispatcher, lggr) + + // Set initial config and start + require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil)) + require.NoError(t, freshClient.Start(ctx)) + + // Update config while running + validCapInfo.Description = "new description" + require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil)) + + // Verify config was updated + info, err := freshClient.Info(ctx) + require.NoError(t, err) + assert.Equal(t, validCapInfo.Description, info.Description) + + // Clean up + require.NoError(t, freshClient.Close()) + }) +} diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index 73c29cec2f5..c8c9e777553 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -293,8 +293,13 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin for i := 0; i < numCapabilityPeers; i++ { capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) - capabilityNode := executable.NewServer(&commoncap.RemoteExecutableConfig{RequestHashExcludedAttributes: []string{}}, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, - capabilityNodeResponseTimeout, 10, nil, "", lggr) + capabilityNode := executable.NewServer(capInfo.ID, "", capabilityPeer, capabilityDispatcher, lggr) + cfg := &commoncap.RemoteExecutableConfig{ + RequestHashExcludedAttributes: []string{}, + RequestTimeout: capabilityNodeResponseTimeout, + ServerMaxParallelRequests: 10, + } + require.NoError(t, capabilityNode.SetConfig(cfg, underlying, capInfo, capDonInfo, workflowDONs, nil)) servicetest.Run(t, capabilityNode) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) capabilityNodes[i] = capabilityNode @@ -303,7 +308,9 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin workflowNodes := make([]commoncap.ExecutableCapability, numWorkflowPeers) for i := 0; i < numWorkflowPeers; i++ { workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) - workflowNode := executable.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout, nil, "", lggr) + workflowNode := executable.NewClient(capInfo.ID, "", workflowPeerDispatcher, lggr) + err := workflowNode.SetConfig(capInfo, workflowDonInfo, workflowNodeTimeout, nil) + require.NoError(t, err) servicetest.Run(t, workflowNode) broker.RegisterReceiverNode(workflowPeers[i], workflowNode) workflowNodes[i] = workflowNode diff --git a/core/capabilities/remote/executable/server.go b/core/capabilities/remote/executable/server.go index bcaac9dc773..55160023c00 100644 --- a/core/capabilities/remote/executable/server.go +++ b/core/capabilities/remote/executable/server.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -27,20 +28,14 @@ import ( // server communicates with corresponding client on remote nodes. type server struct { services.StateMachine - lggr logger.Logger - - config *commoncap.RemoteExecutableConfig - hasher types.MessageHasher - peerID p2ptypes.PeerID - underlying commoncap.ExecutableCapability - capInfo commoncap.CapabilityInfo - localDonInfo commoncap.DON - workflowDONs map[uint32]commoncap.DON - dispatcher types.Dispatcher + capabilityID string + capMethodName string + peerID p2ptypes.PeerID + dispatcher types.Dispatcher + cfg atomic.Pointer[dynamicServerConfig] + lggr logger.Logger requestIDToRequest map[string]requestAndMsgID - requestTimeout time.Duration - capMethodName string // Used to detect messages with the same message id but different payloads messageIDToRequestIDsCount map[string]map[string]int @@ -52,6 +47,24 @@ type server struct { parallelExecutor *parallelExecutor } +type dynamicServerConfig struct { + remoteExecutableConfig *commoncap.RemoteExecutableConfig + hasher types.MessageHasher + underlying commoncap.ExecutableCapability + capInfo commoncap.CapabilityInfo + localDonInfo commoncap.DON + workflowDONs map[uint32]commoncap.DON +} + +type Server interface { + types.Receiver + services.Service + SetConfig(remoteExecutableConfig *commoncap.RemoteExecutableConfig, underlying commoncap.ExecutableCapability, + capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, + messageHasher types.MessageHasher) error +} + +var _ Server = &server{} var _ types.Receiver = &server{} var _ services.Service = &server{} @@ -60,47 +73,105 @@ type requestAndMsgID struct { messageID string } -func NewServer(remoteExecutableConfig *commoncap.RemoteExecutableConfig, peerID p2ptypes.PeerID, underlying commoncap.ExecutableCapability, - capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, - workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, - maxParallelRequests int, messageHasher types.MessageHasher, capMethodName string, - lggr logger.Logger) *server { +func NewServer(capabilityID, methodName string, peerID p2ptypes.PeerID, dispatcher types.Dispatcher, lggr logger.Logger) *server { + return &server{ + capabilityID: capabilityID, + capMethodName: methodName, + peerID: peerID, + dispatcher: dispatcher, + lggr: logger.Named(lggr, "ExecutableCapabilityServer"), + requestIDToRequest: map[string]requestAndMsgID{}, + messageIDToRequestIDsCount: map[string]map[string]int{}, + stopCh: make(services.StopChan), + } +} + +// SetConfig sets the remote server configuration dynamically +func (r *server) SetConfig(remoteExecutableConfig *commoncap.RemoteExecutableConfig, underlying commoncap.ExecutableCapability, + capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, messageHasher types.MessageHasher) error { + currCfg := r.cfg.Load() if remoteExecutableConfig == nil { - lggr.Info("no remote config provided, using default values") + r.lggr.Info("no remote config provided, using default values") remoteExecutableConfig = &commoncap.RemoteExecutableConfig{} } if messageHasher == nil { + r.lggr.Warn("no message hasher provided, using default V1 hasher") messageHasher = NewV1Hasher(remoteExecutableConfig.RequestHashExcludedAttributes) } - return &server{ - config: remoteExecutableConfig, - hasher: messageHasher, - underlying: underlying, - peerID: peerID, - capInfo: capInfo, - localDonInfo: localDonInfo, - workflowDONs: workflowDONs, - dispatcher: dispatcher, - - requestIDToRequest: map[string]requestAndMsgID{}, - messageIDToRequestIDsCount: map[string]map[string]int{}, - requestTimeout: requestTimeout, - capMethodName: capMethodName, - - lggr: logger.Named(lggr, "ExecutableCapabilityServer"), - stopCh: make(services.StopChan), + if capInfo.ID == "" || capInfo.ID != r.capabilityID { + return fmt.Errorf("capability info provided does not match the server's capabilityID: %s != %s", capInfo.ID, r.capabilityID) + } + if underlying == nil { + return errors.New("underlying capability cannot be nil") + } + if len(localDonInfo.Members) == 0 { + return errors.New("empty localDonInfo provided") + } + if len(workflowDONs) == 0 { + return errors.New("empty workflowDONs provided") + } + if remoteExecutableConfig.RequestTimeout <= 0 { + return errors.New("cfg.RequestTimeout must be positive") + } + if remoteExecutableConfig.ServerMaxParallelRequests <= 0 { + return errors.New("cfg.ServerMaxParallelRequests must be positive") + } - parallelExecutor: newParallelExecutor(maxParallelRequests), + if currCfg != nil && currCfg.remoteExecutableConfig != nil && + currCfg.remoteExecutableConfig.ServerMaxParallelRequests > 0 && + remoteExecutableConfig.ServerMaxParallelRequests != currCfg.remoteExecutableConfig.ServerMaxParallelRequests { + r.lggr.Warn("ServerMaxParallelRequests changed but it won't be applied until node restart") } + + // always replace the whole dynamicServerConfig object to avoid inconsistent state + r.cfg.Store(&dynamicServerConfig{ + remoteExecutableConfig: remoteExecutableConfig, + hasher: messageHasher, + underlying: underlying, + capInfo: capInfo, + localDonInfo: localDonInfo, + workflowDONs: workflowDONs, + }) + return nil } func (r *server) Start(ctx context.Context) error { return r.StartOnce(r.Name(), func() error { + cfg := r.cfg.Load() + + // Validate that all required fields are set before starting + if cfg == nil { + return errors.New("config not set - call SetConfig() before Start()") + } + if cfg.remoteExecutableConfig == nil { + return errors.New("remote executable config not set - call SetConfig() before Start()") + } + if cfg.underlying == nil { + return errors.New("underlying capability not set - call SetConfig() before Start()") + } + if cfg.capInfo.ID == "" { + return errors.New("capability info not set - call SetConfig() before Start()") + } + if len(cfg.localDonInfo.Members) == 0 { + return errors.New("local DON info not set - call SetConfig() before Start()") + } + if cfg.remoteExecutableConfig.RequestTimeout <= 0 { + return errors.New("cfg.RequestTimeout not set - call SetConfig() before Start()") + } + if cfg.remoteExecutableConfig.ServerMaxParallelRequests <= 0 { + return errors.New("cfg.ServerMaxParallelRequests not set - call SetConfig() before Start()") + } + if r.dispatcher == nil { + return errors.New("dispatcher set to nil, cannot start server") + } + + // Initialize parallel executor with the configured max parallel requests + r.parallelExecutor = newParallelExecutor(int(cfg.remoteExecutableConfig.ServerMaxParallelRequests)) + r.wg.Add(1) go func() { defer r.wg.Done() - tickerInterval := min(r.requestTimeout, expiryCheckInterval) - ticker := time.NewTicker(tickerInterval) + ticker := time.NewTicker(getServerTickerInterval(cfg)) defer ticker.Stop() r.lggr.Info("executable capability server started") @@ -109,6 +180,7 @@ func (r *server) Start(ctx context.Context) error { case <-r.stopCh: return case <-ticker.C: + ticker.Reset(getServerTickerInterval(cfg)) r.expireRequests() } } @@ -122,13 +194,22 @@ func (r *server) Start(ctx context.Context) error { }) } +func getServerTickerInterval(cfg *dynamicServerConfig) time.Duration { + if cfg.remoteExecutableConfig.RequestTimeout > 0 { + return cfg.remoteExecutableConfig.RequestTimeout + } + return defaultExpiryCheckInterval +} + func (r *server) Close() error { return r.StopOnce(r.Name(), func() error { close(r.stopCh) r.wg.Wait() - err := r.parallelExecutor.Close() - if err != nil { - return fmt.Errorf("failed to close parallel executor: %w", err) + if r.parallelExecutor != nil { + err := r.parallelExecutor.Close() + if err != nil { + return fmt.Errorf("failed to close parallel executor: %w", err) + } } r.lggr.Info("executable capability server closed") @@ -155,6 +236,12 @@ func (r *server) expireRequests() { } func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { + cfg := r.cfg.Load() + if cfg == nil { + r.lggr.Errorw("config not set, cannot process request") + return + } + r.receiveLock.Lock() defer r.receiveLock.Unlock() @@ -171,7 +258,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { return } - msgHash, err := r.hasher.Hash(msg) + msgHash, err := cfg.hasher.Hash(msg) if err != nil { r.lggr.Errorw("failed to get message hash", "err", err) return @@ -197,14 +284,14 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { } if _, ok := r.requestIDToRequest[requestID]; !ok { - callingDon, ok := r.workflowDONs[msg.CallerDonId] + callingDon, ok := cfg.workflowDONs[msg.CallerDonId] if !ok { r.lggr.Errorw("received request from unregistered don", "donId", msg.CallerDonId) return } - sr, ierr := request.NewServerRequest(r.underlying, msg.Method, r.capInfo.ID, r.localDonInfo.ID, r.peerID, - callingDon, messageID, r.dispatcher, r.requestTimeout, r.capMethodName, r.lggr) + sr, ierr := request.NewServerRequest(cfg.underlying, msg.Method, cfg.capInfo.ID, cfg.localDonInfo.ID, r.peerID, + callingDon, messageID, r.dispatcher, cfg.remoteExecutableConfig.RequestTimeout, r.capMethodName, r.lggr) if ierr != nil { r.lggr.Errorw("failed to instantiate server request", "err", ierr) return diff --git a/core/capabilities/remote/executable/server_test.go b/core/capabilities/remote/executable/server_test.go index 80d3aebd294..57f4ea23dca 100644 --- a/core/capabilities/remote/executable/server_test.go +++ b/core/capabilities/remote/executable/server_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strconv" + "sync" "testing" "time" @@ -315,6 +316,12 @@ func testRemoteExecutableCapabilityServer(ctx context.Context, t *testing.T, numCapabilityPeers int, capabilityDonF uint8, capabilityNodeResponseTimeout time.Duration, messageHasher remotetypes.MessageHasher) ([]*serverTestClient, []services.Service) { lggr := logger.Test(t) + if config.RequestTimeout == 0 { + config.RequestTimeout = capabilityNodeResponseTimeout + } + if config.ServerMaxParallelRequests == 0 { + config.ServerMaxParallelRequests = 10 + } capabilityPeers := make([]p2ptypes.PeerID, numCapabilityPeers) for i := 0; i < numCapabilityPeers; i++ { @@ -361,8 +368,8 @@ func testRemoteExecutableCapabilityServer(ctx context.Context, t *testing.T, for i := 0; i < numCapabilityPeers; i++ { capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) - capabilityNode := executable.NewServer(config, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, - capabilityNodeResponseTimeout, 10, messageHasher, "", lggr) + capabilityNode := executable.NewServer(capInfo.ID, "", capabilityPeer, capabilityDispatcher, lggr) + require.NoError(t, capabilityNode.SetConfig(config, underlying, capInfo, capDonInfo, workflowDONs, messageHasher)) require.NoError(t, capabilityNode.Start(ctx)) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) capabilityNodes[i] = capabilityNode @@ -435,3 +442,561 @@ func (r *serverTestClient) Execute(ctx context.Context, req commoncap.Capability return nil, nil } + +func Test_Server_SetConfig(t *testing.T) { + lggr := logger.Test(t) + peerID := NewP2PPeerID(t) + + // Create broker and dispatcher + broker := newTestAsyncMessageBroker(t, 100) + dispatcher := broker.NewDispatcherForNode(peerID) + + // Create server instance + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + + // Create test data + capInfo := commoncap.CapabilityInfo{ + ID: "test-capability-id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Test capability", + } + + localDonInfo := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{peerID}, + F: 0, + } + + workflowDONs := map[uint32]commoncap.DON{ + 2: { + ID: 2, + Members: []p2ptypes.PeerID{NewP2PPeerID(t)}, + F: 0, + }, + } + + underlying := &TestCapability{} + requestTimeout := 10 * time.Second + maxParallelRequests := uint32(5) + + t.Run("valid config should succeed", func(t *testing.T) { + config := &commoncap.RemoteExecutableConfig{ + RequestHashExcludedAttributes: []string{"test"}, + RequestTimeout: requestTimeout, + ServerMaxParallelRequests: maxParallelRequests, + } + + err := server.SetConfig(config, underlying, capInfo, localDonInfo, workflowDONs, nil) + require.NoError(t, err) + }) + + t.Run("mismatched capability ID should return error", func(t *testing.T) { + invalidCapInfo := commoncap.CapabilityInfo{ + ID: "different-capability-id", + CapabilityType: commoncap.CapabilityTypeTarget, + } + + err := server.SetConfig(&commoncap.RemoteExecutableConfig{}, underlying, invalidCapInfo, + localDonInfo, workflowDONs, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "capability info provided does not match") + }) + + t.Run("nil underlying capability should return error", func(t *testing.T) { + err := server.SetConfig(&commoncap.RemoteExecutableConfig{}, nil, capInfo, + localDonInfo, workflowDONs, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "underlying capability cannot be nil") + }) + + t.Run("empty local DON members should fail", func(t *testing.T) { + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + emptyLocalDon := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{}, + F: 0, + } + config := &commoncap.RemoteExecutableConfig{ + RequestTimeout: 10 * time.Second, + ServerMaxParallelRequests: 5, + } + err := server.SetConfig(config, underlying, capInfo, emptyLocalDon, workflowDONs, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "empty localDonInfo provided") + }) + + t.Run("nil message hasher should use default", func(t *testing.T) { + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + config := &commoncap.RemoteExecutableConfig{ + RequestTimeout: 10 * time.Second, + ServerMaxParallelRequests: 5, + } + err := server.SetConfig(config, underlying, capInfo, localDonInfo, workflowDONs, nil) + require.NoError(t, err) + }) + + t.Run("zero timeout should fail", func(t *testing.T) { + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + config := &commoncap.RemoteExecutableConfig{ + RequestTimeout: 0, + ServerMaxParallelRequests: 5, + } + err := server.SetConfig(config, underlying, capInfo, localDonInfo, workflowDONs, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "RequestTimeout must be positive") + }) + + t.Run("zero max parallel requests should fail", func(t *testing.T) { + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + config := &commoncap.RemoteExecutableConfig{ + RequestTimeout: 10 * time.Second, + ServerMaxParallelRequests: 0, + } + err := server.SetConfig(config, underlying, capInfo, localDonInfo, workflowDONs, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "ServerMaxParallelRequests must be positive") + }) + + t.Run("empty workflow DONs should fail", func(t *testing.T) { + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + emptyWorkflowDONs := map[uint32]commoncap.DON{} + config := &commoncap.RemoteExecutableConfig{ + RequestTimeout: 10 * time.Second, + ServerMaxParallelRequests: 5, + } + err := server.SetConfig(config, underlying, capInfo, localDonInfo, emptyWorkflowDONs, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "empty workflowDONs provided") + }) +} + +func Test_Server_SetConfig_ConfigReplacement(t *testing.T) { + lggr := logger.Test(t) + peerID := NewP2PPeerID(t) + broker := newTestAsyncMessageBroker(t, 100) + dispatcher := broker.NewDispatcherForNode(peerID) + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + + capInfo := commoncap.CapabilityInfo{ + ID: "test-capability-id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Test capability", + } + + localDonInfo := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{peerID}, + F: 0, + } + + workflowDONs := map[uint32]commoncap.DON{ + 2: { + ID: 2, + Members: []p2ptypes.PeerID{NewP2PPeerID(t)}, + F: 0, + }, + } + + underlying := &TestCapability{} + + // Set initial config + config1 := &commoncap.RemoteExecutableConfig{ + RequestHashExcludedAttributes: []string{"attr1"}, + RequestTimeout: 5 * time.Second, + ServerMaxParallelRequests: 3, + } + err := server.SetConfig(config1, underlying, capInfo, localDonInfo, workflowDONs, nil) + require.NoError(t, err) + + // Verify server can start with valid config + ctx := testutils.Context(t) + err = server.Start(ctx) + require.NoError(t, err) + + // Replace with new config + config2 := &commoncap.RemoteExecutableConfig{ + RequestHashExcludedAttributes: []string{"attr2", "attr3"}, + RequestTimeout: 10 * time.Second, + ServerMaxParallelRequests: 5, + } + err = server.SetConfig(config2, underlying, capInfo, localDonInfo, workflowDONs, nil) + require.NoError(t, err) + + // Clean up + err = server.Close() + require.NoError(t, err) +} + +func Test_Server_SetConfig_StartValidation(t *testing.T) { + ctx := testutils.Context(t) + + t.Run("Start without SetConfig should fail", func(t *testing.T) { + lggr := logger.Test(t) + peerID := NewP2PPeerID(t) + broker := newTestAsyncMessageBroker(t, 100) + dispatcher := broker.NewDispatcherForNode(peerID) + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + + err := server.Start(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "config not set - call SetConfig() before Start()") + }) + + t.Run("Start with valid config should succeed", func(t *testing.T) { + lggr := logger.Test(t) + peerID := NewP2PPeerID(t) + broker := newTestAsyncMessageBroker(t, 100) + dispatcher := broker.NewDispatcherForNode(peerID) + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + + // Set valid config + capInfo := commoncap.CapabilityInfo{ + ID: "test-capability-id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Test capability", + } + + localDonInfo := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{peerID}, + F: 0, + } + + workflowDONs := map[uint32]commoncap.DON{ + 2: { + ID: 2, + Members: []p2ptypes.PeerID{NewP2PPeerID(t)}, + F: 0, + }, + } + + underlying := &TestCapability{} + cfg := &commoncap.RemoteExecutableConfig{ + RequestTimeout: 10 * time.Second, + ServerMaxParallelRequests: 5, + } + err := server.SetConfig(cfg, underlying, capInfo, + localDonInfo, workflowDONs, nil) + require.NoError(t, err) + + err = server.Start(ctx) + require.NoError(t, err) + + // Clean up + err = server.Close() + require.NoError(t, err) + }) +} + +func Test_Server_SetConfig_DONMembershipChange(t *testing.T) { + ctx := testutils.Context(t) + lggr := logger.Test(t) + peerID := NewP2PPeerID(t) + broker := newTestAsyncMessageBroker(t, 100) + dispatcher := broker.NewDispatcherForNode(peerID) + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + + capInfo := commoncap.CapabilityInfo{ + ID: "test-capability-id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Test capability", + } + + localDonInfo := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{peerID}, + F: 0, + } + + workflowPeer1 := NewP2PPeerID(t) + workflowPeer2 := NewP2PPeerID(t) + workflowDONs := map[uint32]commoncap.DON{ + 2: { + ID: 2, + Members: []p2ptypes.PeerID{workflowPeer1}, + F: 0, + }, + } + + underlying := &TestSlowExecutionCapability{ + workflowIDToPause: map[string]time.Duration{ + workflowID1: 1 * time.Second, + }, + } + + config := &commoncap.RemoteExecutableConfig{ + RequestTimeout: 10 * time.Second, + ServerMaxParallelRequests: 5, + } + err := server.SetConfig(config, underlying, capInfo, localDonInfo, workflowDONs, nil) + require.NoError(t, err) + + // Set up workflow node before starting servers + workflowDispatcher := broker.NewDispatcherForNode(workflowPeer1) + workflowNode := newServerTestClient(workflowPeer1, localDonInfo, workflowDispatcher) + broker.RegisterReceiverNode(workflowPeer1, workflowNode) + broker.RegisterReceiverNode(peerID, server) + + err = server.Start(ctx) + require.NoError(t, err) + err = broker.Start(ctx) + require.NoError(t, err) + + // Start a request + _, err = workflowNode.Execute(context.Background(), commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, + }, + }) + require.NoError(t, err) + + // Change DON membership while request is in flight + time.Sleep(100 * time.Millisecond) + newWorkflowDONs := map[uint32]commoncap.DON{ + 2: { + ID: 2, + Members: []p2ptypes.PeerID{workflowPeer1, workflowPeer2}, + F: 0, + }, + } + err = server.SetConfig(config, underlying, capInfo, localDonInfo, newWorkflowDONs, nil) + require.NoError(t, err) + + // Original request should still complete + select { + case msg := <-workflowNode.receivedMessages: + assert.NotNil(t, msg) + case <-time.After(5 * time.Second): + t.Fatal("request did not complete after DON change") + } + + // Clean up + require.NoError(t, server.Close()) + require.NoError(t, broker.Close()) +} + +func Test_Server_SetConfig_ShutdownRaces(t *testing.T) { + ctx := testutils.Context(t) + lggr := logger.Test(t) + peerID := NewP2PPeerID(t) + broker := newTestAsyncMessageBroker(t, 100) + dispatcher := broker.NewDispatcherForNode(peerID) + server := executable.NewServer("test-capability-id", "test-method", peerID, dispatcher, lggr) + + capInfo := commoncap.CapabilityInfo{ + ID: "test-capability-id", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Test capability", + } + + localDonInfo := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{peerID}, + F: 0, + } + + workflowDONs := map[uint32]commoncap.DON{ + 2: { + ID: 2, + Members: []p2ptypes.PeerID{NewP2PPeerID(t)}, + F: 0, + }, + } + + underlying := &TestCapability{} + config := &commoncap.RemoteExecutableConfig{ + RequestTimeout: 10 * time.Second, + ServerMaxParallelRequests: 5, + } + + err := server.SetConfig(config, underlying, capInfo, localDonInfo, workflowDONs, nil) + require.NoError(t, err) + err = server.Start(ctx) + require.NoError(t, err) + + // Concurrently call SetConfig and Close + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for i := range 50 { + newConfig := &commoncap.RemoteExecutableConfig{ + RequestTimeout: time.Duration(5+i) * time.Millisecond, + ServerMaxParallelRequests: 5, + } + _ = server.SetConfig(newConfig, underlying, capInfo, localDonInfo, workflowDONs, nil) + time.Sleep(1 * time.Millisecond) + } + }() + + go func() { + defer wg.Done() + time.Sleep(25 * time.Millisecond) + _ = server.Close() + }() + + wg.Wait() + // Test passes if no panic occurs +} + +func Test_Server_Execute_WithConcurrentSetConfig(t *testing.T) { + ctx := testutils.Context(t) + lggr := logger.Test(t) + numWorkflowPeers := 4 + + peerID := NewP2PPeerID(t) + capabilityPeers := []p2ptypes.PeerID{peerID} + + capDonInfo := commoncap.DON{ + ID: 1, + Members: capabilityPeers, + F: 0, + } + + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id@1.0.0", + CapabilityType: commoncap.CapabilityTypeTarget, + Description: "Remote Target", + DON: &capDonInfo, + } + + workflowPeers := make([]p2ptypes.PeerID, numWorkflowPeers) + for i := range numWorkflowPeers { + workflowPeers[i] = NewP2PPeerID(t) + } + + workflowDonInfo := commoncap.DON{ + Members: workflowPeers, + ID: 2, + F: 1, + } + + broker := newTestAsyncMessageBroker(t, 1000) + err := broker.Start(context.Background()) + require.NoError(t, err) + defer broker.Close() + + workflowDONs := map[uint32]commoncap.DON{ + workflowDonInfo.ID: workflowDonInfo, + } + + // Create and set up server + dispatcher := broker.NewDispatcherForNode(peerID) + server := executable.NewServer(capInfo.ID, "", peerID, dispatcher, lggr) + + underlying := &TestSlowExecutionCapability{ + workflowIDToPause: map[string]time.Duration{ + workflowID1: 50 * time.Millisecond, + }, + } + + initialConfig := &commoncap.RemoteExecutableConfig{ + RequestTimeout: 10 * time.Second, + ServerMaxParallelRequests: 10, + } + err = server.SetConfig(initialConfig, underlying, capInfo, capDonInfo, workflowDONs, nil) + require.NoError(t, err) + + err = server.Start(ctx) + require.NoError(t, err) + defer server.Close() + + broker.RegisterReceiverNode(peerID, server) + + // Create workflow nodes (callers) + workflowNodes := make([]*serverTestClient, numWorkflowPeers) + for i := range numWorkflowPeers { + workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) + workflowNode := newServerTestClient(workflowPeers[i], capDonInfo, workflowPeerDispatcher) + broker.RegisterReceiverNode(workflowPeers[i], workflowNode) + workflowNodes[i] = workflowNode + } + + var wg sync.WaitGroup + numExecuteCalls := 20 + numSetConfigCalls := 10 + + // Track successful responses + responseCount := sync.Map{} + + // Start goroutine for concurrent SetConfig calls with randomized delays + wg.Add(1) + go func() { + defer wg.Done() + for i := range numSetConfigCalls { + // Random delay between 5-50ms + delay := time.Duration(5+i*2) * time.Millisecond + time.Sleep(delay) + + newConfig := &commoncap.RemoteExecutableConfig{ + RequestTimeout: time.Duration(10+i) * time.Second, + ServerMaxParallelRequests: uint32(5), + } + assert.NoError(t, server.SetConfig(newConfig, underlying, capInfo, capDonInfo, workflowDONs, nil)) + } + }() + + // Start multiple goroutines for concurrent Execute calls with randomized delays + for callerIdx, caller := range workflowNodes { + for execIdx := range numExecuteCalls { + wg.Add(1) + go func(callerID int, execID int, node *serverTestClient) { + defer wg.Done() + + // Random delay between 0-100ms + delay := time.Duration(execID*5) * time.Millisecond + time.Sleep(delay) + + workflowExecutionID := fmt.Sprintf("exec-%d", execID) + _, err := node.Execute(context.Background(), + commoncap.CapabilityRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID, + }, + }) + if err != nil { + t.Logf("Execute error for caller %d exec %d: %v", callerID, execID, err) + } + }(callerIdx, execIdx, caller) + } + } + + // Collect responses + wg.Add(1) + go func() { + defer wg.Done() + expectedResponses := numWorkflowPeers * numExecuteCalls + + for i := range expectedResponses { + // Try to receive from all callers + for _, caller := range workflowNodes { + select { + case msg := <-caller.receivedMessages: + if msg.Error == remotetypes.Error_OK { + count, _ := responseCount.LoadOrStore("success", 0) + responseCount.Store("success", count.(int)+1) + } else { + count, _ := responseCount.LoadOrStore("error", 0) + responseCount.Store("error", count.(int)+1) + } + case <-time.After(15 * time.Second): + t.Logf("Timeout waiting for response %d/%d", i+1, expectedResponses) + return + } + } + } + }() + + wg.Wait() + + // Verify we received responses (most should succeed) + successCount := 0 + if val, ok := responseCount.Load("success"); ok { + successCount = val.(int) + } + expectedResponses := numWorkflowPeers * numExecuteCalls + require.Equal(t, expectedResponses, successCount) +} diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index 00cbb983aaf..261bcdd4ef4 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -4,7 +4,9 @@ import ( "context" "crypto/sha256" "encoding/binary" + "errors" "sync" + "sync/atomic" "time" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -26,23 +28,28 @@ import ( // // TriggerPublisher communicates with corresponding TriggerSubscribers on remote nodes. type triggerPublisher struct { - config *commoncap.RemoteTriggerConfig + capabilityID string + capMethodName string + dispatcher types.Dispatcher + cfg atomic.Pointer[dynamicPublisherConfig] + + messageCache *messagecache.MessageCache[registrationKey, p2ptypes.PeerID] + registrations map[registrationKey]*pubRegState + mu sync.RWMutex // protects messageCache and registrations + batchingQueue map[[32]byte]*batchedResponse + bqMu sync.Mutex // protects batchingQueue + stopCh services.StopChan + wg sync.WaitGroup + lggr logger.Logger +} + +type dynamicPublisherConfig struct { + remoteConfig *commoncap.RemoteTriggerConfig underlying commoncap.TriggerCapability - capInfo commoncap.CapabilityInfo capDonInfo commoncap.DON workflowDONs map[uint32]commoncap.DON membersCache map[uint32]map[p2ptypes.PeerID]bool - dispatcher types.Dispatcher - capMethodName string - messageCache *messagecache.MessageCache[registrationKey, p2ptypes.PeerID] - registrations map[registrationKey]*pubRegState - mu sync.RWMutex // protects messageCache and registrations - batchingQueue map[[32]byte]*batchedResponse batchingEnabled bool - bqMu sync.Mutex // protects batchingQueue - stopCh services.StopChan - wg sync.WaitGroup - lggr logger.Logger } type registrationKey struct { @@ -63,16 +70,47 @@ type batchedResponse struct { workflowIDs []string } +type TriggerPublisher interface { + types.ReceiverService + SetConfig(config *commoncap.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON) error +} + +var _ TriggerPublisher = &triggerPublisher{} var _ types.ReceiverService = &triggerPublisher{} const minAllowedBatchCollectionPeriod = 10 * time.Millisecond -func NewTriggerPublisher(config *commoncap.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, capMethodName string, lggr logger.Logger) *triggerPublisher { +func NewTriggerPublisher(capID, method string, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher { + return &triggerPublisher{ + capabilityID: capID, + capMethodName: method, + dispatcher: dispatcher, + messageCache: messagecache.NewMessageCache[registrationKey, p2ptypes.PeerID](), + registrations: make(map[registrationKey]*pubRegState), + batchingQueue: make(map[[32]byte]*batchedResponse), + stopCh: make(services.StopChan), + lggr: logger.Named(lggr, "TriggerPublisher"), + } +} + +// SetConfig sets the remote trigger configuration, capability info, and DON information dynamically +func (p *triggerPublisher) SetConfig(config *commoncap.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON) error { if config == nil { - lggr.Info("no config provided, using default values") + p.lggr.Info("SetConfig called with nil config, using defaults") config = &commoncap.RemoteTriggerConfig{} } config.ApplyDefaults() + if underlying == nil { + return errors.New("underlying trigger capability cannot be nil") + } + if capDonInfo.ID == 0 || len(capDonInfo.Members) == 0 { + return errors.New("empty capDonInfo provided") + } + if workflowDONs == nil { + workflowDONs = make(map[uint32]commoncap.DON) + } + + // Build the members cache membersCache := make(map[uint32]map[p2ptypes.PeerID]bool) for id, don := range workflowDONs { cache := make(map[p2ptypes.PeerID]bool) @@ -81,36 +119,55 @@ func NewTriggerPublisher(config *commoncap.RemoteTriggerConfig, underlying commo } membersCache[id] = cache } - return &triggerPublisher{ - config: config, + + // always replace the whole dynamicPublisherConfig object to avoid inconsistent state + p.cfg.Store(&dynamicPublisherConfig{ + remoteConfig: config, underlying: underlying, - capInfo: capInfo, capDonInfo: capDonInfo, workflowDONs: workflowDONs, membersCache: membersCache, - dispatcher: dispatcher, - capMethodName: capMethodName, - messageCache: messagecache.NewMessageCache[registrationKey, p2ptypes.PeerID](), - registrations: make(map[registrationKey]*pubRegState), - batchingQueue: make(map[[32]byte]*batchedResponse), batchingEnabled: config.MaxBatchSize > 1 && config.BatchCollectionPeriod >= minAllowedBatchCollectionPeriod, - stopCh: make(services.StopChan), - lggr: logger.Named(lggr, "TriggerPublisher"), - } + }) + + return nil } func (p *triggerPublisher) Start(ctx context.Context) error { + cfg := p.cfg.Load() + + // Validate that all required fields are set before starting + if cfg == nil { + return errors.New("config not set - call SetConfig() before Start()") + } + if cfg.remoteConfig == nil { + return errors.New("remoteConfig not set - call SetConfig() before Start()") + } + if cfg.underlying == nil { + return errors.New("underlying trigger capability not set - call SetConfig() before Start()") + } + if len(cfg.capDonInfo.Members) == 0 { + return errors.New("capability DON info not set - call SetConfig() before Start()") + } + if p.dispatcher == nil { + return errors.New("dispatcher set to nil, cannot start triggerPublisher") + } + p.wg.Add(1) go p.registrationCleanupLoop() - if p.batchingEnabled { - p.wg.Add(1) - go p.batchingLoop() - } + p.wg.Add(1) + go p.batchingLoop() p.lggr.Info("TriggerPublisher started") return nil } func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { + cfg := p.cfg.Load() + if cfg == nil { + p.lggr.Errorw("received message but config is not set") + return + } + sender, err := ToPeerID(msg.Sender) if err != nil { p.lggr.Errorw("failed to convert message sender to PeerID", "err", err) @@ -126,23 +183,23 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { case types.MethodRegisterTrigger: req, err := pb.UnmarshalTriggerRegistrationRequest(msg.Payload) if err != nil { - p.lggr.Errorw("failed to unmarshal trigger registration request", "capabilityId", p.capInfo.ID, "err", err) + p.lggr.Errorw("failed to unmarshal trigger registration request", "capabilityId", p.capabilityID, "err", err) return } - callerDon, ok := p.workflowDONs[msg.CallerDonId] + callerDon, ok := cfg.workflowDONs[msg.CallerDonId] if !ok { - p.lggr.Errorw("received a message from unsupported workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId) + p.lggr.Errorw("received a message from unsupported workflow DON", "capabilityId", p.capabilityID, "callerDonId", msg.CallerDonId) return } - if !p.membersCache[msg.CallerDonId][sender] { - p.lggr.Errorw("sender not a member of its workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId, "sender", sender) + if !cfg.membersCache[msg.CallerDonId][sender] { + p.lggr.Errorw("sender not a member of its workflow DON", "capabilityId", p.capabilityID, "callerDonId", msg.CallerDonId, "sender", sender) return } if err = validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil { - p.lggr.Errorw("received trigger request with invalid workflow ID", "capabilityId", p.capInfo.ID, "workflowId", SanitizeLogString(req.Metadata.WorkflowID), "err", err) + p.lggr.Errorw("received trigger request with invalid workflow ID", "capabilityId", p.capabilityID, "workflowId", SanitizeLogString(req.Metadata.WorkflowID), "err", err) return } - p.lggr.Debugw("received trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "sender", sender) + p.lggr.Debugw("received trigger registration", "capabilityId", p.capabilityID, "workflowId", req.Metadata.WorkflowID, "sender", sender) key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID} nowMs := time.Now().UnixMilli() p.mu.Lock() @@ -150,28 +207,28 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { p.messageCache.Insert(key, sender, nowMs, msg.Payload) _, exists := p.registrations[key] if exists { - p.lggr.Debugw("trigger registration already exists", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID) + p.lggr.Debugw("trigger registration already exists", "capabilityId", p.capabilityID, "workflowId", req.Metadata.WorkflowID) return } // NOTE: require 2F+1 by default, introduce different strategies later (KS-76) minRequired := uint32(2*callerDon.F + 1) - ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-p.config.RegistrationExpiry.Milliseconds(), false) + ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-cfg.remoteConfig.RegistrationExpiry.Milliseconds(), false) if !ready { - p.lggr.Debugw("not ready to aggregate yet", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "minRequired", minRequired) + p.lggr.Debugw("not ready to aggregate yet", "capabilityId", p.capabilityID, "workflowId", req.Metadata.WorkflowID, "minRequired", minRequired) return } aggregated, err := aggregation.AggregateModeRaw(payloads, uint32(callerDon.F+1)) if err != nil { - p.lggr.Errorw("failed to aggregate trigger registrations", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err) + p.lggr.Errorw("failed to aggregate trigger registrations", "capabilityId", p.capabilityID, "workflowId", req.Metadata.WorkflowID, "err", err) return } unmarshaled, err := pb.UnmarshalTriggerRegistrationRequest(aggregated) if err != nil { - p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err) + p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capabilityID, "err", err) return } ctx, cancel := p.stopCh.NewCtx() - callbackCh, err := p.underlying.RegisterTrigger(ctx, unmarshaled) + callbackCh, err := cfg.underlying.RegisterTrigger(ctx, unmarshaled) if err == nil { p.registrations[key] = &pubRegState{ callback: callbackCh, @@ -180,10 +237,10 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { } p.wg.Add(1) go p.triggerEventLoop(callbackCh, key) - p.lggr.Debugw("updated trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID) + p.lggr.Debugw("updated trigger registration", "capabilityId", p.capabilityID, "workflowId", req.Metadata.WorkflowID) } else { cancel() - p.lggr.Errorw("failed to register trigger", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err) + p.lggr.Errorw("failed to register trigger", "capabilityId", p.capabilityID, "workflowId", req.Metadata.WorkflowID, "err", err) } case types.MethodTriggerEvent: p.lggr.Errorw("trigger request failed with error", @@ -196,25 +253,40 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { func (p *triggerPublisher) registrationCleanupLoop() { defer p.wg.Done() - ticker := time.NewTicker(p.config.RegistrationExpiry) + + // Get initial config for ticker setup + firstCfg := p.cfg.Load() + if firstCfg == nil { + p.lggr.Errorw("registrationCleanupLoop started but config not set") + return + } + cleanupInterval := firstCfg.remoteConfig.MessageExpiry + ticker := time.NewTicker(cleanupInterval) defer ticker.Stop() for { select { case <-p.stopCh: return case <-ticker.C: + cfg := p.cfg.Load() + // Update cleanup interval if config has changed + if cfg.remoteConfig.MessageExpiry != cleanupInterval { + cleanupInterval = cfg.remoteConfig.MessageExpiry + ticker.Reset(cleanupInterval) + } now := time.Now().UnixMilli() + p.mu.Lock() for key, req := range p.registrations { - callerDon := p.workflowDONs[key.callerDonID] - ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-p.config.RegistrationExpiry.Milliseconds(), false) + callerDon := cfg.workflowDONs[key.callerDonID] + ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-cfg.remoteConfig.RegistrationExpiry.Milliseconds(), false) if !ready { - p.lggr.Infow("trigger registration expired", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonID, "workflowId", key.workflowID) + p.lggr.Infow("trigger registration expired", "capabilityId", p.capabilityID, "callerDonID", key.callerDonID, "workflowId", key.workflowID) ctx, cancel := p.stopCh.NewCtx() - err := p.underlying.UnregisterTrigger(ctx, req.request) + err := cfg.underlying.UnregisterTrigger(ctx, req.request) cancel() p.registrations[key].cancel() // Cancel context on register trigger - p.lggr.Infow("unregistered trigger", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonID, "workflowId", key.workflowID, "err", err) + p.lggr.Infow("unregistered trigger", "capabilityId", p.capabilityID, "callerDonID", key.callerDonID, "workflowId", key.workflowID, "err", err) // after calling UnregisterTrigger, the underlying trigger will not send any more events to the channel delete(p.registrations, key) p.messageCache.Delete(key) @@ -233,18 +305,20 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.TriggerR return case response, ok := <-callbackCh: if !ok { - p.lggr.Infow("triggerEventLoop channel closed", "capabilityId", p.capInfo.ID, "workflowId", key.workflowID) + p.lggr.Infow("triggerEventLoop channel closed", "capabilityId", p.capabilityID, "workflowId", key.workflowID) return } + triggerEvent := response.Event - p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowID, "triggerEventID", triggerEvent.ID) + p.lggr.Debugw("received trigger event", "capabilityId", p.capabilityID, "workflowId", key.workflowID, "triggerEventID", triggerEvent.ID) marshaledResponse, err := pb.MarshalTriggerResponse(response) if err != nil { p.lggr.Debugw("can't marshal trigger event", "err", err) break } - if p.batchingEnabled { + cfg := p.cfg.Load() + if cfg.batchingEnabled { p.enqueueForBatching(marshaledResponse, key, triggerEvent.ID) } else { // a single-element "batch" @@ -283,17 +357,23 @@ func (p *triggerPublisher) enqueueForBatching(rawResponse []byte, key registrati } func (p *triggerPublisher) sendBatch(resp *batchedResponse) { + cfg := p.cfg.Load() + if cfg == nil { + p.lggr.Errorw("config not set during sendBatch") + return + } + for len(resp.workflowIDs) > 0 { idBatch := resp.workflowIDs - if p.batchingEnabled && int64(len(idBatch)) > int64(p.config.MaxBatchSize) { - idBatch = idBatch[:p.config.MaxBatchSize] - resp.workflowIDs = resp.workflowIDs[p.config.MaxBatchSize:] + if cfg.batchingEnabled && int64(len(idBatch)) > int64(cfg.remoteConfig.MaxBatchSize) { + idBatch = idBatch[:cfg.remoteConfig.MaxBatchSize] + resp.workflowIDs = resp.workflowIDs[cfg.remoteConfig.MaxBatchSize:] } else { resp.workflowIDs = nil } msg := &types.MessageBody{ - CapabilityId: p.capInfo.ID, - CapabilityDonId: p.capDonInfo.ID, + CapabilityId: p.capabilityID, + CapabilityDonId: cfg.capDonInfo.ID, CallerDonId: resp.callerDonID, Method: types.MethodTriggerEvent, Payload: resp.rawResponse, @@ -306,10 +386,10 @@ func (p *triggerPublisher) sendBatch(resp *batchedResponse) { CapabilityMethod: p.capMethodName, } // NOTE: send to all nodes by default, introduce different strategies later (KS-76) - for _, peerID := range p.workflowDONs[resp.callerDonID].Members { + for _, peerID := range cfg.workflowDONs[resp.callerDonID].Members { err := p.dispatcher.Send(peerID, msg) if err != nil { - p.lggr.Errorw("failed to send trigger event", "capabilityId", p.capInfo.ID, "peerID", peerID, "err", err) + p.lggr.Errorw("failed to send trigger event", "capabilityId", p.capabilityID, "peerID", peerID, "err", err) } } } @@ -317,13 +397,29 @@ func (p *triggerPublisher) sendBatch(resp *batchedResponse) { func (p *triggerPublisher) batchingLoop() { defer p.wg.Done() - ticker := time.NewTicker(p.config.BatchCollectionPeriod) + + // Get initial config for ticker setup + firstCfg := p.cfg.Load() + if firstCfg == nil { + p.lggr.Errorw("batchingLoop started but config not set") + return + } + interval := firstCfg.remoteConfig.BatchCollectionPeriod + ticker := time.NewTicker(interval) + defer ticker.Stop() for { select { case <-p.stopCh: return case <-ticker.C: + cfg := p.cfg.Load() + // Update cleanup interval if config has changed + if cfg.remoteConfig.MessageExpiry != interval { + interval = cfg.remoteConfig.BatchCollectionPeriod + ticker.Reset(interval) + } + p.bqMu.Lock() queue := p.batchingQueue p.batchingQueue = make(map[[32]byte]*batchedResponse) diff --git a/core/capabilities/remote/trigger_publisher_test.go b/core/capabilities/remote/trigger_publisher_test.go index 0480462f103..bad3b4cd905 100644 --- a/core/capabilities/remote/trigger_publisher_test.go +++ b/core/capabilities/remote/trigger_publisher_test.go @@ -99,6 +99,111 @@ func TestTriggerPublisher_ReceiveTriggerEvents_BatchingEnabled(t *testing.T) { require.NoError(t, publisher.Close()) } +func TestTriggerPublisher_SetConfig_Basic(t *testing.T) { + t.Parallel() + lggr := logger.Test(t) + capInfo := commoncap.CapabilityInfo{ + ID: capID, + CapabilityType: commoncap.CapabilityTypeTrigger, + Description: "Remote Trigger", + } + peers := make([]p2ptypes.PeerID, 2) + require.NoError(t, peers[0].UnmarshalText([]byte(peerID1))) + require.NoError(t, peers[1].UnmarshalText([]byte(peerID2))) + capDonInfo := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{peers[0]}, + F: 0, + } + workflowDonInfo := commoncap.DON{ + ID: 2, + Members: []p2ptypes.PeerID{peers[1]}, + F: 0, + } + workflowDONs := map[uint32]commoncap.DON{ + workflowDonInfo.ID: workflowDonInfo, + } + underlying := &testTrigger{ + info: capInfo, + registrationsCh: make(chan commoncap.TriggerRegistrationRequest, 2), + eventCh: make(chan commoncap.TriggerResponse, 2), + } + + t.Run("returns error when underlying trigger capability is nil", func(t *testing.T) { + dispatcher := mocks.NewDispatcher(t) + publisher := remote.NewTriggerPublisher(capInfo.ID, "method", dispatcher, lggr) + config := &commoncap.RemoteTriggerConfig{} + err := publisher.SetConfig(config, nil, capDonInfo, workflowDONs) + require.Error(t, err) + require.Contains(t, err.Error(), "underlying trigger capability cannot be nil") + }) + + t.Run("handles nil config", func(t *testing.T) { + dispatcher := mocks.NewDispatcher(t) + publisher := remote.NewTriggerPublisher(capInfo.ID, "method", dispatcher, lggr) + // Set config as nil - should use defaults + err := publisher.SetConfig(nil, underlying, capDonInfo, workflowDONs) + require.NoError(t, err) + + // Verify config works + ctx := testutils.Context(t) + require.NoError(t, publisher.Start(ctx)) + require.NoError(t, publisher.Close()) + }) + + t.Run("handles nil workflowDONs", func(t *testing.T) { + dispatcher := mocks.NewDispatcher(t) + publisher := remote.NewTriggerPublisher(capInfo.ID, "method", dispatcher, lggr) + config := &commoncap.RemoteTriggerConfig{ + RegistrationRefresh: 100 * time.Millisecond, + RegistrationExpiry: 100 * time.Second, + MinResponsesToAggregate: 1, + MessageExpiry: 100 * time.Second, + } + // Set workflowDONs as nil - should create empty map + err := publisher.SetConfig(config, underlying, capDonInfo, nil) + require.NoError(t, err) + + // Verify config works + ctx := testutils.Context(t) + require.NoError(t, publisher.Start(ctx)) + require.NoError(t, publisher.Close()) + }) + + t.Run("updates existing config", func(t *testing.T) { + dispatcher := mocks.NewDispatcher(t) + publisher := remote.NewTriggerPublisher(capInfo.ID, "method", dispatcher, lggr) + // Set initial config + initialConfig := &commoncap.RemoteTriggerConfig{ + RegistrationRefresh: 100 * time.Millisecond, + RegistrationExpiry: 100 * time.Second, + MinResponsesToAggregate: 1, + MessageExpiry: 100 * time.Second, + MaxBatchSize: 1, + BatchCollectionPeriod: 100 * time.Millisecond, + } + err := publisher.SetConfig(initialConfig, underlying, capDonInfo, workflowDONs) + require.NoError(t, err) + + // Update with new config + newConfig := &commoncap.RemoteTriggerConfig{ + RegistrationRefresh: 500 * time.Millisecond, + RegistrationExpiry: 500 * time.Second, + MinResponsesToAggregate: 3, + MessageExpiry: 500 * time.Second, + MaxBatchSize: 5, + BatchCollectionPeriod: 500 * time.Millisecond, + } + err = publisher.SetConfig(newConfig, underlying, capDonInfo, workflowDONs) + require.NoError(t, err) + + // Verify updated config works + ctx := testutils.Context(t) + require.NoError(t, publisher.Start(ctx)) + require.NoError(t, publisher.Close()) + }) +} + func newServices(t *testing.T, capabilityDONID uint32, workflowDONID uint32, maxBatchSize uint32) (*testTrigger, remotetypes.ReceiverService, *mocks.Dispatcher, []p2ptypes.PeerID) { lggr := logger.Test(t) ctx := testutils.Context(t) @@ -138,7 +243,8 @@ func newServices(t *testing.T, capabilityDONID uint32, workflowDONID uint32, max registrationsCh: make(chan commoncap.TriggerRegistrationRequest, 2), eventCh: make(chan commoncap.TriggerResponse, 2), } - publisher := remote.NewTriggerPublisher(config, underlying, capInfo, capDonInfo, workflowDONs, dispatcher, "", lggr) + publisher := remote.NewTriggerPublisher(capInfo.ID, "", dispatcher, lggr) + require.NoError(t, publisher.SetConfig(config, underlying, capDonInfo, workflowDONs)) require.NoError(t, publisher.Start(ctx)) return underlying, publisher, dispatcher, peers }