diff --git a/.changeset/dry-pets-knock.md b/.changeset/dry-pets-knock.md new file mode 100644 index 00000000000..f21178a56ae --- /dev/null +++ b/.changeset/dry-pets-knock.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#changed Support dynamic config updates in TriggerSubscriber diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index d393184160e..d54ccbc7250 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -50,6 +50,7 @@ type launcher struct { myPeerID p2ptypes.PeerID peerWrapper p2ptypes.PeerWrapper dispatcher remotetypes.Dispatcher + cachedShims cachedShims registry *Registry subServices []services.Service workflowDonNotifier donNotifier @@ -57,6 +58,19 @@ type launcher struct { p2pStreamConfig p2ptypes.StreamConfig } +// For V2 capabilities, shims are created once and their config is updated dynamically. +type cachedShims struct { + combinedClients map[string]remote.CombinedClient + triggerSubscribers map[string]remote.TriggerSubscriber + executableClients map[string]executable.Client + + // TODO(CRE-942): add trigger publishers and executable servers +} + +func shimKey(capID string, donID uint32, method string) string { + return fmt.Sprintf("%s:%d:%s", capID, donID, method) +} + type donNotifier interface { NotifyDonSet(don capabilities.DON) } @@ -85,9 +99,14 @@ func NewLauncher( } } return &launcher{ - lggr: logger.Named(lggr, "CapabilitiesLauncher"), - peerWrapper: peerWrapper, - dispatcher: dispatcher, + lggr: logger.Named(lggr, "CapabilitiesLauncher"), + peerWrapper: peerWrapper, + dispatcher: dispatcher, + cachedShims: cachedShims{ + combinedClients: make(map[string]remote.CombinedClient), + triggerSubscribers: make(map[string]remote.TriggerSubscriber), + executableClients: make(map[string]executable.Client), + }, registry: registry, subServices: []services.Service{}, workflowDonNotifier: workflowDonNotifier, @@ -394,6 +413,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync return nil, err } + // deprecated pre-LLO Mercury aggregator aggregator = triggers.NewMercuryRemoteAggregator( codec, signers, @@ -424,23 +444,21 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync aggregator = aggregation.NewDefaultModeAggregator(uint32(remoteDON.F) + 1) } - // TODO: We need to implement a custom, Mercury-specific - // aggregator here, because there is no guarantee that - // all trigger events in the workflow will have the same - // payloads. As a workaround, we validate the signatures. - // When this is solved, we can move to a generic aggregator - // and remove this. - triggerCap := remote.NewTriggerSubscriber( - capabilityConfig.RemoteTriggerConfig, - info, - remoteDON.DON, - myDON.DON, - w.dispatcher, - aggregator, - "", // empty method name for v1 - w.lggr, - ) - return triggerCap, nil + shimKey := shimKey(capability.ID, remoteDON.ID, "") // empty method name for V1 + triggerCap, alreadyExists := w.cachedShims.triggerSubscribers[shimKey] + if !alreadyExists { + triggerCap = remote.NewTriggerSubscriber( + capability.ID, + "", // empty method name for v1 + w.dispatcher, + w.lggr, + ) + w.cachedShims.triggerSubscribers[shimKey] = triggerCap + } + if errCfg := triggerCap.SetConfig(capabilityConfig.RemoteTriggerConfig, info, myDON.ID, remoteDON.DON, aggregator); errCfg != nil { + return nil, fmt.Errorf("failed to set trigger config: %w", errCfg) + } + return triggerCap.(capabilityService), nil } err := w.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newTriggerFn) if err != nil { @@ -744,60 +762,84 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth if err != nil { return fmt.Errorf("failed to create remote capability info: %w", err) } - cc := remote.NewCombinedClient(info) + cc, isNewCC := w.getCombinedClient(info) for method, config := range methodConfig { - var receiver remotetypes.ReceiverService - if config.RemoteTriggerConfig != nil { - // TODO(CRE-590): add support for SignedReportAggregator (needed by LLO Streams Trigger V2) - aggregator := aggregation.NewDefaultModeAggregator(config.RemoteTriggerConfig.MinResponsesToAggregate) - sub := remote.NewTriggerSubscriber( - config.RemoteTriggerConfig, - info, - remoteDON.DON, - myDON.DON, - w.dispatcher, - aggregator, - method, - w.lggr, - ) - receiver = sub - cc.AddTriggerSubscriber(method, sub) - } - if receiver == nil && config.RemoteExecutableConfig != nil { - client := executable.NewClient( - info, - myDON.DON, - w.dispatcher, - config.RemoteExecutableConfig.RequestTimeout, - &transmission.TransmissionConfig{ - Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule), - DeltaStage: config.RemoteExecutableConfig.DeltaStage, - }, - method, - w.lggr, - ) - receiver = client - cc.AddExecutableClient(method, client) - } - if receiver == nil { - return fmt.Errorf("no remote config found for method %s of capability %s", method, capID) - } - if err = w.dispatcher.SetReceiverForMethod(capID, remoteDON.ID, method, receiver); err != nil { - return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err) + w.lggr.Infow("addRemoteCapabilityV2", "capID", capID, "method", method) + if config.RemoteTriggerConfig == nil && config.RemoteExecutableConfig == nil { + w.lggr.Errorw("no remote config found", "method", method, "capID", capID) + continue } - err = receiver.Start(ctx) - if err != nil { - return fmt.Errorf("failed to start receiver for capability %s, method %s: %w", capID, method, err) + + shimKey := shimKey(capID, remoteDON.ID, method) + if config.RemoteTriggerConfig != nil { // trigger + sub, alreadyExists := w.cachedShims.triggerSubscribers[shimKey] + if !alreadyExists { + sub = remote.NewTriggerSubscriber(capID, method, w.dispatcher, w.lggr) + cc.SetTriggerSubscriber(method, sub) + } + // TODO(CRE-590): add support for SignedReportAggregator (needed by LLO Streams Trigger V2) + agg := aggregation.NewDefaultModeAggregator(config.RemoteTriggerConfig.MinResponsesToAggregate) + if errCfg := sub.SetConfig(config.RemoteTriggerConfig, info, myDON.ID, remoteDON.DON, agg); errCfg != nil { + return fmt.Errorf("failed to set trigger config: %w", errCfg) + } + + if !alreadyExists { + if err2 := w.startNewShim(ctx, sub.(remotetypes.ReceiverService), capID, remoteDON.ID, method); err2 != nil { + w.lggr.Errorw("failed to start receiver", "capID", capID, "method", method, "error", err2) + continue + } + w.cachedShims.triggerSubscribers[shimKey] = sub + w.lggr.Infow("added new remote trigger subscriber", "capID", capID, "method", method) + } + } else { // executable + client, alreadyExists := w.cachedShims.executableClients[shimKey] + if !alreadyExists { + client = executable.NewClient( + info, + myDON.DON, + w.dispatcher, + config.RemoteExecutableConfig.RequestTimeout, + &transmission.TransmissionConfig{ + Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule), + DeltaStage: config.RemoteExecutableConfig.DeltaStage, + }, + method, + w.lggr, + ) + cc.SetExecutableClient(method, client) + } + + // TODO(CRE-941): implement setters for executable client config + + if !alreadyExists { + if err2 := w.startNewShim(ctx, client.(remotetypes.ReceiverService), capID, remoteDON.ID, method); err2 != nil { + w.lggr.Errorw("failed to start receiver", "capID", capID, "method", method, "error", err2) + continue + } + w.cachedShims.executableClients[shimKey] = client + w.lggr.Infow("added new remote executable client", "capID", capID, "method", method) + } } - w.subServices = append(w.subServices, receiver) } - err = w.registry.Add(ctx, cc) - if err != nil { - return fmt.Errorf("failed to add CombinedClient for capability %s to registry: %w", capID, err) + if isNewCC { // add new CombinedClient to registry, only after all methods are configured + if err2 := w.registry.Add(ctx, cc); err2 != nil { + return fmt.Errorf("failed to add CombinedClient for capability %s to registry: %w", capID, err2) + } } + return nil +} +func (w *launcher) startNewShim(ctx context.Context, receiver remotetypes.ReceiverService, capID string, remoteDonID uint32, method string) error { + if err := receiver.Start(ctx); err != nil { + return fmt.Errorf("failed to start receiver for capability %s, method %s: %w", capID, method, err) + } + if err := w.dispatcher.SetReceiverForMethod(capID, remoteDonID, method, receiver); err != nil { + _ = receiver.Close() + return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err) + } + w.subServices = append(w.subServices, receiver) return nil } @@ -887,3 +929,15 @@ func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodC } return nil } + +// retrieve or create a CombinedClient for the given capability +func (w *launcher) getCombinedClient(info capabilities.CapabilityInfo) (remote.CombinedClient, bool) { + key := shimKey(info.ID, info.DON.ID, "") // empty method name - CombinedClient covers all methods + cc, exists := w.cachedShims.combinedClients[key] + if !exists { // create a new combined client and cache it + cc = remote.NewCombinedClient(info) + w.cachedShims.combinedClients[key] = cc + return cc, true + } + return cc, false +} diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index 77b92e27bf3..ec17abee4f5 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -293,8 +293,6 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) { dID := uint32(1) capDonID := uint32(2) - rtc := &capabilities.RemoteTriggerConfig{} - rtc.ApplyDefaults() cfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{ RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{ RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{ @@ -442,8 +440,6 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) { dID := uint32(1) capDonID := uint32(2) - rtc := &capabilities.RemoteTriggerConfig{} - rtc.ApplyDefaults() cfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{ RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{ RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{ @@ -504,15 +500,20 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie dID := uint32(1) triggerCapDonID := uint32(2) targetCapDonID := uint32(3) - // The below state describes a Workflow DON (AcceptsWorkflows = true), - // which exposes the streams-trigger and write_chain capabilities. - // We expect receivers to be wired up and both capabilities to be added to the registry. + + cfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{ + RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{ + RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{}, + }, + }) + require.NoError(t, err) + localRegistry := buildLocalRegistry() addDON(localRegistry, dID, uint32(0), uint8(1), true, true, workflowDonNodes, 1, nil) addDON(localRegistry, triggerCapDonID, uint32(0), uint8(1), true, false, capabilityDonNodes, 1, [][32]byte{triggerCapID, targetCapID}) - addCapabilityToDON(localRegistry, triggerCapDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil) + addCapabilityToDON(localRegistry, triggerCapDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, cfg) addDON(localRegistry, targetCapDonID, uint32(0), uint8(1), false, false, capabilityDonNodes, 1, [][32]byte{triggerCapID, targetCapID}) - addCapabilityToDON(localRegistry, targetCapDonID, fullTargetID, capabilities.CapabilityTypeTarget, nil) + addCapabilityToDON(localRegistry, targetCapDonID, fullTargetID, capabilities.CapabilityTypeTarget, cfg) launcher := NewLauncher( lggr, @@ -527,8 +528,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie defer launcher.Close() dispatcher.On("SetReceiver", fullTriggerCapID, triggerCapDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) - err := launcher.OnNewRegistry(t.Context(), localRegistry) - require.NoError(t, err) + require.NoError(t, launcher.OnNewRegistry(t.Context(), localRegistry)) _, err = registry.Get(t.Context(), fullTriggerCapID) require.NoError(t, err) @@ -738,7 +738,7 @@ func TestLauncher_DonPairsToUpdate(t *testing.T) { require.Equal(t, p2ptypes.DonPair{localRegistry.IDsToDONs[capDONID].DON, localRegistry.IDsToDONs[mixedDONID].DON}, res[2]) } -func TestLauncher_CreateCombinedClientForV2Capabilities(t *testing.T) { +func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) { lggr := logger.Test(t) registry := NewRegistry(lggr) dispatcher := remoteMocks.NewDispatcher(t) @@ -805,20 +805,40 @@ func TestLauncher_CreateCombinedClientForV2Capabilities(t *testing.T) { dispatcher.On("SetReceiverForMethod", fullTriggerCapID, capDonID, "StreamsTrigger", mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) dispatcher.On("SetReceiverForMethod", fullExecutableCapID, capDonID, "Write", mock.AnythingOfType("*executable.client")).Return(nil) + // first test the initial CombinedClient creation err = launcher.OnNewRegistry(t.Context(), localRegistry) require.NoError(t, err) - _, err = registry.Get(t.Context(), fullTriggerCapID) + trigCap, err := registry.Get(t.Context(), fullTriggerCapID) + require.NoError(t, err) + trigCC, ok := trigCap.(remote.CombinedClient) + assert.True(t, ok, "expected CombinedClient object") + subscriber := trigCC.GetTriggerSubscriber("StreamsTrigger") + capInfo, err := subscriber.Info(t.Context()) require.NoError(t, err) + assert.Equal(t, fullTriggerCapID, capInfo.ID) + assert.Len(t, capInfo.DON.Members, 4) - executableCap, err := registry.Get(t.Context(), fullExecutableCapID) + execCap, err := registry.Get(t.Context(), fullExecutableCapID) + require.NoError(t, err) + execCC, ok := execCap.(remote.CombinedClient) + assert.True(t, ok, "expected CombinedClient object") + require.NotNil(t, execCC.GetExecutableClient("Write")) + + // Now update config for one capability and verify it's propagated correctly (DON size) + capDon := localRegistry.IDsToDONs[registrysyncer.DonID(capDonID)] + capDon.Members = append(capDon.Members, ragetypes.PeerID(RandomUTF8BytesWord())) + localRegistry.IDsToDONs[registrysyncer.DonID(capDonID)] = capDon + err = launcher.OnNewRegistry(t.Context(), localRegistry) require.NoError(t, err) - _, ok := executableCap.(capabilities.ExecutableAndTriggerCapability) - assert.True(t, ok, "expected executableCap to be of type capabilities.ExecutableAndTriggerCapability") + capInfo, err = subscriber.Info(t.Context()) + require.NoError(t, err) + assert.Equal(t, fullTriggerCapID, capInfo.ID) + assert.Len(t, capInfo.DON.Members, 5) } -func TestLauncher_ExposeV2CapabilitiesRemotely(t *testing.T) { +func TestLauncher_V2CapabilitiesExposeRemotely(t *testing.T) { lggr := logger.Test(t) registry := NewRegistry(lggr) fullTriggerCapID := "streams-trigger@1.0.0" diff --git a/core/capabilities/remote/combined_client.go b/core/capabilities/remote/combined_client.go index 62e928805bd..0080e61e257 100644 --- a/core/capabilities/remote/combined_client.go +++ b/core/capabilities/remote/combined_client.go @@ -3,6 +3,7 @@ package remote import ( "context" "fmt" + "sync" "github.com/pkg/errors" @@ -13,30 +14,47 @@ import ( // The capability can have multiple methods, each one being a trigger or an executable. // The CombinedClient holds method-specific shims for each method and forwards capability API calls // to them. Responses are passed directly to method-specific shims from the Dispatcher. +type CombinedClient interface { + capabilities.ExecutableAndTriggerCapability + SetTriggerSubscriber(method string, subscriber capabilities.TriggerCapability) + SetExecutableClient(method string, client capabilities.ExecutableCapability) + GetTriggerSubscriber(method string) capabilities.TriggerCapability + GetExecutableClient(method string) capabilities.ExecutableCapability +} + type combinedClient struct { info capabilities.CapabilityInfo triggerSubscribers map[string]capabilities.TriggerCapability executableClients map[string]capabilities.ExecutableCapability + mu sync.RWMutex } -var _ capabilities.ExecutableAndTriggerCapability = &combinedClient{} +var _ CombinedClient = &combinedClient{} func (c *combinedClient) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { return c.info, nil } func (c *combinedClient) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { - if _, ok := c.triggerSubscribers[request.Method]; !ok { + c.mu.RLock() + subscriber, ok := c.triggerSubscribers[request.Method] + c.mu.RUnlock() + + if !ok { return nil, fmt.Errorf("method %s not defined", request.Method) } - return c.triggerSubscribers[request.Method].RegisterTrigger(ctx, request) + return subscriber.RegisterTrigger(ctx, request) } func (c *combinedClient) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error { - if _, ok := c.triggerSubscribers[request.Method]; !ok { + c.mu.RLock() + subscriber, ok := c.triggerSubscribers[request.Method] + c.mu.RUnlock() + + if !ok { return fmt.Errorf("method %s not defined", request.Method) } - return c.triggerSubscribers[request.Method].UnregisterTrigger(ctx, request) + return subscriber.UnregisterTrigger(ctx, request) } func (c *combinedClient) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { @@ -48,10 +66,14 @@ func (c *combinedClient) UnregisterFromWorkflow(ctx context.Context, request cap } func (c *combinedClient) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { - if _, ok := c.executableClients[request.Method]; !ok { + c.mu.RLock() + client, ok := c.executableClients[request.Method] + c.mu.RUnlock() + + if !ok { return capabilities.CapabilityResponse{}, fmt.Errorf("method %s not defined", request.Method) } - return c.executableClients[request.Method].Execute(ctx, request) + return client.Execute(ctx, request) } func NewCombinedClient(info capabilities.CapabilityInfo) *combinedClient { @@ -62,10 +84,26 @@ func NewCombinedClient(info capabilities.CapabilityInfo) *combinedClient { } } -func (c *combinedClient) AddTriggerSubscriber(method string, subscriber capabilities.TriggerCapability) { +func (c *combinedClient) SetTriggerSubscriber(method string, subscriber capabilities.TriggerCapability) { + c.mu.Lock() + defer c.mu.Unlock() c.triggerSubscribers[method] = subscriber } -func (c *combinedClient) AddExecutableClient(method string, client capabilities.ExecutableCapability) { +func (c *combinedClient) SetExecutableClient(method string, client capabilities.ExecutableCapability) { + c.mu.Lock() + defer c.mu.Unlock() c.executableClients[method] = client } + +func (c *combinedClient) GetTriggerSubscriber(method string) capabilities.TriggerCapability { + c.mu.RLock() + defer c.mu.RUnlock() + return c.triggerSubscribers[method] +} + +func (c *combinedClient) GetExecutableClient(method string) capabilities.ExecutableCapability { + c.mu.RLock() + defer c.mu.RUnlock() + return c.executableClients[method] +} diff --git a/core/capabilities/remote/combined_client_test.go b/core/capabilities/remote/combined_client_test.go index 5f8ef56c61e..02ebfa2d5cc 100644 --- a/core/capabilities/remote/combined_client_test.go +++ b/core/capabilities/remote/combined_client_test.go @@ -31,7 +31,7 @@ func TestCombinedClient_RegisterTrigger_Success(t *testing.T) { mockTrigger := &mocks.TriggerCapability{} method := "test-method" - client.AddTriggerSubscriber(method, mockTrigger) + client.SetTriggerSubscriber(method, mockTrigger) request := commoncap.TriggerRegistrationRequest{ TriggerID: "test-trigger-id", @@ -70,7 +70,7 @@ func TestCombinedClient_RegisterTrigger_ErrorFromSubscriber(t *testing.T) { mockTrigger := &mocks.TriggerCapability{} method := "test-method" - client.AddTriggerSubscriber(method, mockTrigger) + client.SetTriggerSubscriber(method, mockTrigger) request := commoncap.TriggerRegistrationRequest{ TriggerID: "test-trigger-id", @@ -94,7 +94,7 @@ func TestCombinedClient_UnregisterTrigger_Success(t *testing.T) { mockTrigger := &mocks.TriggerCapability{} method := "test-method" - client.AddTriggerSubscriber(method, mockTrigger) + client.SetTriggerSubscriber(method, mockTrigger) request := commoncap.TriggerRegistrationRequest{ TriggerID: "test-trigger-id", @@ -129,7 +129,7 @@ func TestCombinedClient_UnregisterTrigger_ErrorFromSubscriber(t *testing.T) { mockTrigger := &mocks.TriggerCapability{} method := "test-method" - client.AddTriggerSubscriber(method, mockTrigger) + client.SetTriggerSubscriber(method, mockTrigger) request := commoncap.TriggerRegistrationRequest{ TriggerID: "test-trigger-id", @@ -184,7 +184,7 @@ func TestCombinedClient_Execute_Success(t *testing.T) { mockExecutable := &mocks.ExecutableCapability{} method := "test-execute-method" - client.AddExecutableClient(method, mockExecutable) + client.SetExecutableClient(method, mockExecutable) request := commoncap.CapabilityRequest{ Method: method, @@ -237,7 +237,7 @@ func TestCombinedClient_Execute_ErrorFromExecutable(t *testing.T) { mockExecutable := &mocks.ExecutableCapability{} method := "test-execute-method" - client.AddExecutableClient(method, mockExecutable) + client.SetExecutableClient(method, mockExecutable) request := commoncap.CapabilityRequest{ Method: method, @@ -258,14 +258,14 @@ func TestCombinedClient_Execute_ErrorFromExecutable(t *testing.T) { assert.Equal(t, expectedError, err) } -func TestCombinedClient_AddTriggerSubscriber(t *testing.T) { +func TestCombinedClient_SetTriggerSubscriber(t *testing.T) { info := createTestCapabilityInfo("test-capability", commoncap.CapabilityTypeTrigger) client := remote.NewCombinedClient(info) mockTrigger := &mocks.TriggerCapability{} method := "test-method" - client.AddTriggerSubscriber(method, mockTrigger) + client.SetTriggerSubscriber(method, mockTrigger) ctx := testutils.Context(t) request := commoncap.TriggerRegistrationRequest{ @@ -280,14 +280,14 @@ func TestCombinedClient_AddTriggerSubscriber(t *testing.T) { require.NoError(t, err) } -func TestCombinedClient_AddExecutableClient(t *testing.T) { +func TestCombinedClient_SetExecutableClient(t *testing.T) { info := createTestCapabilityInfo("test-capability", commoncap.CapabilityTypeAction) client := remote.NewCombinedClient(info) mockExecutable := &mocks.ExecutableCapability{} method := "test-method" - client.AddExecutableClient(method, mockExecutable) + client.SetExecutableClient(method, mockExecutable) ctx := testutils.Context(t) request := commoncap.CapabilityRequest{ @@ -321,8 +321,11 @@ func TestCombinedClient_MultipleMethodsAndCapabilities(t *testing.T) { triggerMethod1 := "trigger-method-1" triggerMethod2 := "trigger-method-2" - client.AddTriggerSubscriber(triggerMethod1, mockTrigger1) - client.AddTriggerSubscriber(triggerMethod2, mockTrigger2) + client.SetTriggerSubscriber(triggerMethod1, mockTrigger1) + client.SetTriggerSubscriber(triggerMethod2, mockTrigger2) + + client.SetTriggerSubscriber(triggerMethod1, mockTrigger1) + client.SetTriggerSubscriber(triggerMethod2, mockTrigger2) // Add multiple executable clients mockExecutable1 := &mocks.ExecutableCapability{} @@ -330,8 +333,8 @@ func TestCombinedClient_MultipleMethodsAndCapabilities(t *testing.T) { execMethod1 := "exec-method-1" execMethod2 := "exec-method-2" - client.AddExecutableClient(execMethod1, mockExecutable1) - client.AddExecutableClient(execMethod2, mockExecutable2) + client.SetExecutableClient(execMethod1, mockExecutable1) + client.SetExecutableClient(execMethod2, mockExecutable2) // Test trigger method 1 triggerRequest1 := commoncap.TriggerRegistrationRequest{ diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index fab8ef0bfc0..47a0167dd31 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -279,7 +279,7 @@ func (d *dispatcher) handleMessage(msg *p2ptypes.Message) { receiver, ok := d.receivers[k] d.mu.RUnlock() if !ok { - d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID) + d.lggr.Debugw("received message for unregistered capability or method", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID, "method", k.methodName) d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND) return } diff --git a/core/capabilities/remote/executable/client.go b/core/capabilities/remote/executable/client.go index 1c81f820c4e..daaf003867f 100644 --- a/core/capabilities/remote/executable/client.go +++ b/core/capabilities/remote/executable/client.go @@ -42,7 +42,12 @@ type client struct { wg sync.WaitGroup } -var _ commoncap.ExecutableCapability = &client{} +type Client interface { + commoncap.ExecutableCapability + Receive(ctx context.Context, msg *types.MessageBody) +} + +var _ Client = &client{} var _ types.Receiver = &client{} var _ services.Service = &client{} diff --git a/core/capabilities/remote/executable/request/server_request.go b/core/capabilities/remote/executable/request/server_request.go index aa0320637ff..d9aa9c56a47 100644 --- a/core/capabilities/remote/executable/request/server_request.go +++ b/core/capabilities/remote/executable/request/server_request.go @@ -314,7 +314,7 @@ func (e *ServerRequest) sendResponse(ctx context.Context, requester p2ptypes.Pee responseMsg.Payload = e.response.response } - e.lggr.Debugw("Sending response", "receiver", requester) + e.lggr.Debugw("Sending response", "receiver", requester, "capabilityId", e.capabilityID, "donId", e.capabilityDonID, "method", e.capMethodName) err := e.dispatcher.Send(requester, &responseMsg) e.metrics.countExecutionResponse(ctx, e.response.error.String(), err != nil) if err != nil { diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index 24c010e7342..ff3febba719 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -3,7 +3,9 @@ package remote import ( "context" "errors" + "fmt" "sync" + "sync/atomic" "time" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -11,28 +13,24 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/messagecache" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) // TriggerSubscriber is a shim for remote trigger capabilities. -// It translatesd between capability API calls and network messages. +// It translates between capability API calls and network messages. // Its responsibilities are: // 1. Periodically refresh all registrations for remote triggers. // 2. Collect trigger events from remote nodes and aggregate responses via a customizable aggregator. // // TriggerSubscriber communicates with corresponding TriggerReceivers on remote nodes. type triggerSubscriber struct { - config *commoncap.RemoteTriggerConfig - capInfo commoncap.CapabilityInfo - capDonInfo commoncap.DON - capDonMembers map[p2ptypes.PeerID]struct{} - localDonInfo commoncap.DON - dispatcher types.Dispatcher - capMethodName string - aggregator types.Aggregator + capabilityID string + capMethodName string + dispatcher types.Dispatcher + cfg atomic.Pointer[dynamicConfig] + messageCache *messagecache.MessageCache[triggerEventKey, p2ptypes.PeerID] registeredWorkflows map[string]*subRegState mu sync.RWMutex // protects registeredWorkflows and messageCache @@ -41,6 +39,15 @@ type triggerSubscriber struct { lggr logger.Logger } +type dynamicConfig struct { + remoteConfig *commoncap.RemoteTriggerConfig + capInfo commoncap.CapabilityInfo + capDonInfo commoncap.DON + capDonMembers map[p2ptypes.PeerID]struct{} + localDonID uint32 + aggregator types.Aggregator +} + type triggerEventKey struct { triggerEventID string workflowID string @@ -54,41 +61,24 @@ type subRegState struct { type TriggerSubscriber interface { commoncap.TriggerCapability Receive(ctx context.Context, msg *types.MessageBody) + SetConfig(config *commoncap.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, localDONID uint32, remoteDON commoncap.DON, aggregator types.Aggregator) error } var _ commoncap.TriggerCapability = &triggerSubscriber{} var _ types.Receiver = &triggerSubscriber{} var _ services.Service = &triggerSubscriber{} -// TODO makes this configurable with a default const ( - defaultSendChannelBufferSize = 1000 - maxBatchedWorkflowIDs = 1000 + // Engine reads trigger events without blocking and applies its own limits + sendChannelBufferSize = 1000 + maxBatchedWorkflowIDs = 1000 ) -func NewTriggerSubscriber(config *commoncap.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, localDonInfo commoncap.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, capMethodName string, lggr logger.Logger) *triggerSubscriber { - if aggregator == nil { - lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID) - aggregator = aggregation.NewDefaultModeAggregator(uint32(capDonInfo.F + 1)) - } - if config == nil { - lggr.Info("no config provided, using default values") - config = &commoncap.RemoteTriggerConfig{} - } - config.ApplyDefaults() - capDonMembers := make(map[p2ptypes.PeerID]struct{}) - for _, member := range capDonInfo.Members { - capDonMembers[member] = struct{}{} - } +func NewTriggerSubscriber(capabilityID string, capMethodName string, dispatcher types.Dispatcher, lggr logger.Logger) *triggerSubscriber { return &triggerSubscriber{ - config: config, - capInfo: capInfo, - capDonInfo: capDonInfo, - capDonMembers: capDonMembers, - localDonInfo: localDonInfo, - dispatcher: dispatcher, + capabilityID: capabilityID, capMethodName: capMethodName, - aggregator: aggregator, + dispatcher: dispatcher, messageCache: messagecache.NewMessageCache[triggerEventKey, p2ptypes.PeerID](), registeredWorkflows: make(map[string]*subRegState), stopCh: make(services.StopChan), @@ -97,6 +87,33 @@ func NewTriggerSubscriber(config *commoncap.RemoteTriggerConfig, capInfo commonc } func (s *triggerSubscriber) Start(ctx context.Context) error { + s.mu.RLock() + defer s.mu.RUnlock() + cfg := s.cfg.Load() + + // Validate that all required fields are set before starting + if cfg == nil { + return errors.New("config not set - call SetConfig() before Start()") + } + if cfg.remoteConfig == nil { + return errors.New("remoteConfig not set - call SetConfig() before Start()") + } + if cfg.capInfo.ID == "" { + return errors.New("capability info not set - call SetConfig() before Start()") + } + if cfg.localDonID == 0 { + return errors.New("local DON ID not set - call SetConfig() before Start()") + } + if len(cfg.capDonInfo.Members) == 0 { + return errors.New("capability DON info not set - call SetConfig() before Start()") + } + if cfg.aggregator == nil { + return errors.New("aggregator not set - call SetAggregator() before Start()") + } + if s.dispatcher == nil { + return errors.New("dispatcher set to nil, cannot start triggerSubscriber") + } + s.wg.Add(2) go s.registrationLoop() go s.eventCleanupLoop() @@ -105,7 +122,11 @@ func (s *triggerSubscriber) Start(ctx context.Context) error { } func (s *triggerSubscriber) Info(ctx context.Context) (commoncap.CapabilityInfo, error) { - return s.capInfo, nil + cfg := s.cfg.Load() + if cfg == nil { + return commoncap.CapabilityInfo{}, errors.New("config not set - call SetConfig() before Info()") + } + return cfg.capInfo, nil } func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commoncap.TriggerRegistrationRequest) (<-chan commoncap.TriggerResponse, error) { @@ -116,20 +137,26 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commonc if request.Metadata.WorkflowID == "" { return nil, errors.New("empty workflowID") } + + cfg := s.cfg.Load() + if cfg == nil { + return nil, errors.New("config not set - call SetConfig() first") + } + capID, capDonID := cfg.capInfo.ID, cfg.capDonInfo.ID + s.mu.Lock() defer s.mu.Unlock() - - s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID) + s.lggr.Infow("RegisterTrigger called", "capabilityId", capID, "donId", capDonID, "workflowID", request.Metadata.WorkflowID) regState, ok := s.registeredWorkflows[request.Metadata.WorkflowID] if !ok { regState = &subRegState{ - callback: make(chan commoncap.TriggerResponse, defaultSendChannelBufferSize), + callback: make(chan commoncap.TriggerResponse, sendChannelBufferSize), rawRequest: rawRequest, } s.registeredWorkflows[request.Metadata.WorkflowID] = regState } else { regState.rawRequest = rawRequest - s.lggr.Warnw("RegisterTrigger re-registering trigger", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID) + s.lggr.Warnw("RegisterTrigger re-registering trigger", "capabilityId", capID, "donId", capDonID, "workflowID", request.Metadata.WorkflowID) } return regState.callback, nil @@ -137,32 +164,40 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commonc func (s *triggerSubscriber) registrationLoop() { defer s.wg.Done() - ticker := time.NewTicker(s.config.RegistrationRefresh) + cfg := s.cfg.Load() + tickerDuration := cfg.remoteConfig.RegistrationRefresh + ticker := time.NewTicker(tickerDuration) defer ticker.Stop() for { select { case <-s.stopCh: return case <-ticker.C: + cfg := s.cfg.Load() + if cfg.remoteConfig.RegistrationRefresh != tickerDuration { + tickerDuration = cfg.remoteConfig.RegistrationRefresh + ticker.Reset(tickerDuration) + } + s.mu.RLock() - s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members), "nWorkflows", len(s.registeredWorkflows)) + s.lggr.Infow("register trigger for remote capability", "capabilityId", cfg.capInfo.ID, "donId", cfg.capDonInfo.ID, "nMembers", len(cfg.capDonInfo.Members), "nWorkflows", len(s.registeredWorkflows)) if len(s.registeredWorkflows) == 0 { s.lggr.Infow("no workflows to register") } + for _, registration := range s.registeredWorkflows { - // NOTE: send to all by default, introduce different strategies later (KS-76) - for _, peerID := range s.capDonInfo.Members { + for _, peerID := range cfg.capDonInfo.Members { m := &types.MessageBody{ - CapabilityId: s.capInfo.ID, - CapabilityDonId: s.capDonInfo.ID, - CallerDonId: s.localDonInfo.ID, + CapabilityId: cfg.capInfo.ID, + CapabilityDonId: cfg.capDonInfo.ID, + CallerDonId: cfg.localDonID, Method: types.MethodRegisterTrigger, Payload: registration.rawRequest, CapabilityMethod: s.capMethodName, } err := s.dispatcher.Send(peerID, m) if err != nil { - s.lggr.Errorw("failed to send message", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "peerId", peerID, "err", err) + s.lggr.Errorw("failed to send message", "capabilityId", cfg.capInfo.ID, "donId", cfg.capDonInfo.ID, "peerId", peerID, "err", err) } } } @@ -191,19 +226,24 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { s.lggr.Errorw("failed to convert message sender to PeerID", "err", err) return } - - if _, found := s.capDonMembers[sender]; !found { - s.lggr.Errorw("received message from unexpected node", "capabilityId", s.capInfo.ID, "sender", sender) + cfg := s.cfg.Load() + if cfg == nil { + s.lggr.Errorw("config not set - call SetConfig() first") return } + if _, found := cfg.capDonMembers[sender]; !found { + s.lggr.Errorw("received message from unexpected node", "capabilityId", cfg.capInfo.ID, "sender", sender) + return + } + if msg.Method == types.MethodTriggerEvent { meta := msg.GetTriggerEventMetadata() if meta == nil { - s.lggr.Errorw("received message with invalid trigger metadata", "capabilityId", s.capInfo.ID, "sender", sender) + s.lggr.Errorw("received message with invalid trigger metadata", "capabilityId", cfg.capInfo.ID, "sender", sender) return } if len(meta.WorkflowIds) > maxBatchedWorkflowIDs { - s.lggr.Errorw("received message with too many workflow IDs - truncating", "capabilityId", s.capInfo.ID, "nWorkflows", len(meta.WorkflowIds), "sender", sender) + s.lggr.Errorw("received message with too many workflow IDs - truncating", "capabilityId", cfg.capInfo.ID, "nWorkflows", len(meta.WorkflowIds), "sender", sender) meta.WorkflowIds = meta.WorkflowIds[:maxBatchedWorkflowIDs] } for _, workflowID := range meta.WorkflowIds { @@ -211,7 +251,7 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { registration, found := s.registeredWorkflows[workflowID] s.mu.RUnlock() if !found { - s.lggr.Errorw("received message for unregistered workflow", "capabilityId", s.capInfo.ID, "workflowID", SanitizeLogString(workflowID), "sender", sender) + s.lggr.Errorw("received message for unregistered workflow", "capabilityId", cfg.capInfo.ID, "workflowID", SanitizeLogString(workflowID), "sender", sender) continue } key := triggerEventKey{ @@ -221,16 +261,16 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { nowMs := time.Now().UnixMilli() s.mu.Lock() creationTs := s.messageCache.Insert(key, sender, nowMs, msg.Payload) - ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-s.config.MessageExpiry.Milliseconds(), true) + ready, payloads := s.messageCache.Ready(key, cfg.remoteConfig.MinResponsesToAggregate, nowMs-cfg.remoteConfig.MessageExpiry.Milliseconds(), true) s.mu.Unlock() - s.lggr.Debugw("trigger event received", "triggerEventId", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "sender", sender, "ready", ready, "nowTs", nowMs, "creationTs", creationTs, "minResponsesToAggregate", s.config.MinResponsesToAggregate) + s.lggr.Debugw("trigger event received", "triggerEventId", meta.TriggerEventId, "capabilityId", cfg.capInfo.ID, "workflowId", workflowID, "sender", sender, "ready", ready, "nowTs", nowMs, "creationTs", creationTs, "minResponsesToAggregate", cfg.remoteConfig.MinResponsesToAggregate) if ready { - aggregatedResponse, err := s.aggregator.Aggregate(meta.TriggerEventId, payloads) + aggregatedResponse, err := cfg.aggregator.Aggregate(meta.TriggerEventId, payloads) if err != nil { - s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "err", err) + s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", cfg.capInfo.ID, "workflowId", workflowID, "err", err) continue } - s.lggr.Infow("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID) + s.lggr.Infow("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", cfg.capInfo.ID, "workflowId", workflowID) registration.callback <- aggregatedResponse } } @@ -241,15 +281,24 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { func (s *triggerSubscriber) eventCleanupLoop() { defer s.wg.Done() - ticker := time.NewTicker(s.config.MessageExpiry) + cfg := s.cfg.Load() + cleanupInterval := cfg.remoteConfig.MessageExpiry + ticker := time.NewTicker(cleanupInterval) defer ticker.Stop() for { select { case <-s.stopCh: return case <-ticker.C: + freshCfg := s.cfg.Load() + remoteConfig := freshCfg.remoteConfig + // Update cleanup interval if config has changed + if remoteConfig.MessageExpiry != cleanupInterval { + cleanupInterval = remoteConfig.MessageExpiry + ticker.Reset(cleanupInterval) + } s.mu.Lock() - s.messageCache.DeleteOlderThan(time.Now().UnixMilli() - s.config.MessageExpiry.Milliseconds()) + s.messageCache.DeleteOlderThan(time.Now().UnixMilli() - remoteConfig.MessageExpiry.Milliseconds()) s.mu.Unlock() } } @@ -273,3 +322,39 @@ func (s *triggerSubscriber) HealthReport() map[string]error { func (s *triggerSubscriber) Name() string { return s.lggr.Name() } + +// SetConfig sets the remote trigger configuration, capability info, and DON information dynamically +func (s *triggerSubscriber) SetConfig(config *commoncap.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, localDONID uint32, remoteDON commoncap.DON, aggregator types.Aggregator) error { + if config == nil { + return errors.New("no config provided") + } + if capInfo.ID == "" || capInfo.ID != s.capabilityID { + return fmt.Errorf("capability info provided does not match the subscriber's capabilityID: %s != %s", capInfo.ID, s.capabilityID) + } + if localDONID == 0 { + return errors.New("localDONID=0 provided") + } + if remoteDON.ID == 0 || len(remoteDON.Members) == 0 { + return errors.New("empty remoteDON provided") + } + if aggregator == nil { + return errors.New("aggregator not set - call SetAggregator() before SetConfig()") + } + config.ApplyDefaults() + // Rebuild the capDonMembers map + capDonMembers := make(map[p2ptypes.PeerID]struct{}) + for _, member := range remoteDON.Members { + capDonMembers[member] = struct{}{} + } + + // always replace the whole dynamicConfig object to avoid inconsistent state + s.cfg.Store(&dynamicConfig{ + remoteConfig: config, + capInfo: capInfo, + capDonInfo: remoteDON, + capDonMembers: capDonMembers, + localDonID: localDONID, + aggregator: aggregator, + }) + return nil +} diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index a1fa6fabb01..a7dc813c45f 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -1,6 +1,7 @@ package remote_test import ( + "sync" "testing" "time" @@ -12,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-protos/cre/go/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" @@ -48,7 +50,9 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiry: 100 * time.Second, } - subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, "", lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + agg := aggregation.NewDefaultModeAggregator(config.MinResponsesToAggregate) + require.NoError(t, subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, agg)) require.NoError(t, subscriber.Start(t.Context())) req := commoncap.TriggerRegistrationRequest{ @@ -95,7 +99,9 @@ func TestTriggerSubscriber_CorrectEventExpiryCheck(t *testing.T) { MinResponsesToAggregate: 2, MessageExpiry: 10 * time.Second, } - subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, "", lggr) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + agg := aggregation.NewDefaultModeAggregator(config.MinResponsesToAggregate) + require.NoError(t, subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, agg)) require.NoError(t, subscriber.Start(t.Context())) regReq := commoncap.TriggerRegistrationRequest{ @@ -138,6 +144,138 @@ func TestTriggerSubscriber_CorrectEventExpiryCheck(t *testing.T) { require.Equal(t, response.Event.Outputs, triggerEventValue) } +func TestTriggerSubscriber_SetConfig_Basic(t *testing.T) { + t.Parallel() + lggr := logger.Test(t) + capInfo, capDon, workflowDon := buildTwoTestDONs(t, 3, 1) + agg := aggregation.NewDefaultModeAggregator(1) + + t.Run("returns error when config is nil", func(t *testing.T) { + dispatcher := remoteMocks.NewDispatcher(t) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + err := subscriber.SetConfig(nil, capInfo, workflowDon.ID, capDon, agg) + require.Error(t, err) + require.Contains(t, err.Error(), "no config provided") + }) + + t.Run("returns error when capability info ID doesn't match subscriber's ID", func(t *testing.T) { + dispatcher := remoteMocks.NewDispatcher(t) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + config := &commoncap.RemoteTriggerConfig{} + mismatchedCapInfo := commoncap.CapabilityInfo{ID: "different_id", CapabilityType: commoncap.CapabilityTypeTrigger} + err := subscriber.SetConfig(config, mismatchedCapInfo, workflowDon.ID, capDon, agg) + require.Error(t, err) + require.Contains(t, err.Error(), "capability info provided does not match") + require.Contains(t, err.Error(), "different_id") + require.Contains(t, err.Error(), capInfo.ID) + }) + + t.Run("returns error when aggregator is nil", func(t *testing.T) { + dispatcher := remoteMocks.NewDispatcher(t) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + config := &commoncap.RemoteTriggerConfig{} + err := subscriber.SetConfig(config, capInfo, workflowDon.ID, capDon, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "aggregator not set") + }) + + t.Run("updates existing config", func(t *testing.T) { + dispatcher := remoteMocks.NewDispatcher(t) + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + // Set initial config + initialConfig := &commoncap.RemoteTriggerConfig{ + RegistrationRefresh: 100 * time.Millisecond, + MinResponsesToAggregate: 1, + MessageExpiry: 100 * time.Second, + } + err := subscriber.SetConfig(initialConfig, capInfo, workflowDon.ID, capDon, agg) + require.NoError(t, err) + + // Update with new config + newConfig := &commoncap.RemoteTriggerConfig{ + RegistrationRefresh: 500 * time.Millisecond, + MinResponsesToAggregate: 3, + MessageExpiry: 500 * time.Second, + } + err = subscriber.SetConfig(newConfig, capInfo, workflowDon.ID, capDon, agg) + require.NoError(t, err) + + // Verify updated config works + require.NoError(t, subscriber.Start(t.Context())) + require.NoError(t, subscriber.Close()) + }) +} + +func TestTriggerSubscriber_RegistrationLoopWithConfigUpdate(t *testing.T) { + t.Parallel() + lggr := logger.Test(t) + capInfo, capDon, _ := buildTwoTestDONs(t, 1, 1) + dispatcher := remoteMocks.NewDispatcher(t) + + var capturedMessages []*remotetypes.MessageBody + var messagesMu sync.Mutex + registrationMessageCh := make(chan struct{}) + + dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + messagesMu.Lock() + defer messagesMu.Unlock() + // append to capturedMessages and notify the channel without blockin + if msgBody, ok := args[1].(*remotetypes.MessageBody); ok { + capturedMessages = append(capturedMessages, msgBody) + } + select { + case registrationMessageCh <- struct{}{}: + default: + } + }) + + config := &commoncap.RemoteTriggerConfig{ + RegistrationRefresh: 100 * time.Millisecond, + RegistrationExpiry: 100 * time.Second, + MinResponsesToAggregate: 1, + MessageExpiry: 100 * time.Second, + } + subscriber := remote.NewTriggerSubscriber(capInfo.ID, "method", dispatcher, lggr) + agg := aggregation.NewDefaultModeAggregator(config.MinResponsesToAggregate) + + // Call SetConfig() with workflowDON ID = 1 and register trigger + require.NoError(t, subscriber.SetConfig(config, capInfo, 1, capDon, agg)) + require.NoError(t, subscriber.Start(t.Context())) + req := commoncap.TriggerRegistrationRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: workflowID1, + }, + } + _, err := subscriber.RegisterTrigger(t.Context(), req) + require.NoError(t, err) + + // Wait for first registration message and validate CallerDonId = 1 + <-registrationMessageCh + messagesMu.Lock() + require.NotEmpty(t, capturedMessages, "Expected at least one message to be sent") + lastMsg := capturedMessages[len(capturedMessages)-1] + require.Equal(t, uint32(1), lastMsg.CallerDonId, "First message should have CallerDonId = 1") + messagesMu.Unlock() + + // Change config to workflow ID = 4 + require.NoError(t, subscriber.SetConfig(config, capInfo, 4, capDon, agg)) + + // Wait until we receive a registration message with CallerDonId = 4 + for { + <-registrationMessageCh + messagesMu.Lock() + if len(capturedMessages) > 0 && capturedMessages[len(capturedMessages)-1].CallerDonId == 4 { + messagesMu.Unlock() + break + } + messagesMu.Unlock() + } + + // Gracefully shut down Trigger Subscriber + require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req)) + require.NoError(t, subscriber.Close()) +} + func buildTwoTestDONs(t *testing.T, capDonSize int, workflowDonSize int) (commoncap.CapabilityInfo, commoncap.DON, commoncap.DON) { capInfo := commoncap.CapabilityInfo{ ID: "cap_id@1", diff --git a/core/capabilities/streams/trigger_test.go b/core/capabilities/streams/trigger_test.go index dbc8abac695..d9fc27f4ebe 100644 --- a/core/capabilities/streams/trigger_test.go +++ b/core/capabilities/streams/trigger_test.go @@ -68,7 +68,7 @@ func TestStreamsTrigger(t *testing.T) { feeds := newFeedsWithSignedReports(t, nodes, N, P, R) allowedSigners := make([][]byte, N) - for i := 0; i < N; i++ { + for i := range N { allowedSigners[i] = nodes[i].bundle.PublicKey() // bad name - see comment on evmKeyring.PublicKey } lggr := logger.Test(t) @@ -80,17 +80,19 @@ func TestStreamsTrigger(t *testing.T) { ID: triggerID, } capMembers := make([]p2ptypes.PeerID, N) - for i := 0; i < N; i++ { + for i := range N { capMembers[i] = nodes[i].peerID } capDonInfo := capabilities.DON{ + ID: 2, Members: capMembers, F: uint8(F), } config := &capabilities.RemoteTriggerConfig{ MinResponsesToAggregate: uint32(F + 1), } - subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, capabilities.DON{}, nil, agg, "", lggr) + subscriber := remote.NewTriggerSubscriber(triggerID, "method", nil, lggr) + require.NoError(t, subscriber.SetConfig(config, capInfo, 1, capDonInfo, agg)) // register trigger req := capabilities.TriggerRegistrationRequest{