diff --git a/.changeset/mighty-trains-heal.md b/.changeset/mighty-trains-heal.md new file mode 100644 index 00000000000..9c9e93ae68c --- /dev/null +++ b/.changeset/mighty-trains-heal.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#added Support capabilities that are both Triggers and Executables diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index e8c0e9ddc26..7c0795621b2 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/config" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" @@ -354,6 +355,15 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync return fmt.Errorf("could not unmarshal capability config for id %s", cid) } + methodConfig := capabilityConfig.CapabilityMethodConfig + if methodConfig != nil { // v2 capability - handle via CombinedClient + errAdd := w.addRemoteCapabilityV2(ctx, capability.ID, methodConfig, myDON, remoteDON) + if errAdd != nil { + return fmt.Errorf("failed to add remote v2 capability %s: %w", capability.ID, errAdd) + } + continue + } + switch capability.CapabilityType { case capabilities.CapabilityTypeTrigger: newTriggerFn := func(info capabilities.CapabilityInfo) (capabilityService, error) { @@ -417,6 +427,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync myDON.DON, w.dispatcher, aggregator, + "", // empty method name for v1 w.lggr, ) return triggerCap, nil @@ -433,6 +444,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync w.dispatcher, defaultTargetRequestTimeout, nil, // V1 capabilities read transmission schedule from every request + "", // empty method name for v1 w.lggr, ) return client, nil @@ -452,6 +464,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync w.dispatcher, defaultTargetRequestTimeout, nil, // V1 capabilities read transmission schedule from every request + "", // empty method name for v1 w.lggr, ) return client, nil @@ -543,6 +556,15 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee return fmt.Errorf("could not unmarshal capability config for id %s", cid) } + methodConfig := capabilityConfig.CapabilityMethodConfig + if methodConfig != nil { // v2 capability + errExpose := w.exposeCapabilityV2(ctx, cid, methodConfig, myPeerID, don, idsToDONs) + if errExpose != nil { + return fmt.Errorf("failed to expose v2 capability remotely %s: %w", cid, errExpose) + } + continue + } + switch capability.CapabilityType { case capabilities.CapabilityTypeTrigger: newTriggerPublisher := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) { @@ -558,6 +580,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee don.DON, idsToDONs, w.dispatcher, + "", // empty method name for v1 w.lggr, ) return publisher, nil @@ -591,6 +614,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee defaultTargetRequestTimeout, defaultMaxParallelCapabilityExecuteRequests, nil, // TODO: create a capability-specific hasher + "", // empty method name for v1 w.lggr, ), nil } @@ -625,6 +649,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee defaultTargetRequestTimeout, defaultMaxParallelCapabilityExecuteRequests, nil, // TODO: create a capability-specific hasher + "", // empty method name for v1 w.lggr, ), nil } @@ -697,3 +722,153 @@ func signersFor(don registrysyncer.DON, state *registrysyncer.LocalRegistry) ([] return s, nil } + +// Add a V2 capability with multiple methods, using CombinedClient. +func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, methodConfig map[string]capabilities.CapabilityMethodConfig, myDON registrysyncer.DON, remoteDON registrysyncer.DON) error { + info, err := capabilities.NewRemoteCapabilityInfo( + capID, + capabilities.CapabilityTypeCombined, + "Remote Capability for "+capID, + &myDON.DON, + ) + if err != nil { + return fmt.Errorf("failed to create remote capability info: %w", err) + } + cc := remote.NewCombinedClient(info) + + for method, config := range methodConfig { + var receiver remotetypes.Receiver + if config.RemoteTriggerConfig != nil { + // TODO(CRE-590): add support for SignedReportAggregator (needed by LLO Streams Trigger V2) + aggregator := aggregation.NewDefaultModeAggregator(config.RemoteTriggerConfig.MinResponsesToAggregate) + sub := remote.NewTriggerSubscriber( + config.RemoteTriggerConfig, + info, + remoteDON.DON, + myDON.DON, + w.dispatcher, + aggregator, + method, + w.lggr, + ) + receiver = sub + cc.AddTriggerSubscriber(method, sub) + } + if receiver == nil && config.RemoteExecutableConfig != nil { + client := executable.NewClient( + info, + myDON.DON, + w.dispatcher, + config.RemoteExecutableConfig.RequestTimeout, + &transmission.TransmissionConfig{ + Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule), + DeltaStage: config.RemoteExecutableConfig.DeltaStage, + }, + method, + w.lggr, + ) + receiver = client + cc.AddExecutableClient(method, client) + } + if receiver == nil { + return fmt.Errorf("no remote config found for method %s of capability %s", method, capID) + } + if err = w.dispatcher.SetReceiverForMethod(capID, myDON.ID, method, receiver); err != nil { + return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err) + } + } + + err = w.registry.Add(ctx, cc) + if err != nil { + return fmt.Errorf("failed to add CombinedClient for capability %s to registry: %w", capID, err) + } + + return nil +} + +func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodConfig map[string]capabilities.CapabilityMethodConfig, myPeerID p2ptypes.PeerID, myDON registrysyncer.DON, idsToDONs map[uint32]capabilities.DON) error { + info, err := capabilities.NewRemoteCapabilityInfo( + capID, + capabilities.CapabilityTypeCombined, + "Remote Capability for "+capID, + &myDON.DON, + ) + if err != nil { + return fmt.Errorf("failed to create remote capability info: %w", err) + } + underlying, err := w.registry.Get(ctx, capID) + if err != nil { + return fmt.Errorf("failed to get capability %s from registry: %w", capID, err) + } + for method, config := range methodConfig { + var receiver remotetypes.ReceiverService + if config.RemoteTriggerConfig != nil { + underlyingTriggerCapability, ok := (underlying).(capabilities.TriggerCapability) + if !ok { + return fmt.Errorf("capability %s does not implement TriggerCapability", capID) + } + receiver = remote.NewTriggerPublisher( + config.RemoteTriggerConfig, + underlyingTriggerCapability, + info, + myDON.DON, + idsToDONs, + w.dispatcher, + method, + w.lggr, + ) + } + if receiver == nil && config.RemoteExecutableConfig != nil { + underlyingExecutableCapability, ok := (underlying).(capabilities.ExecutableCapability) + if !ok { + return fmt.Errorf("capability %s does not implement ExecutableCapability", capID) + } + var requestHasher remotetypes.MessageHasher + switch config.RemoteExecutableConfig.RequestHasherType { + case capabilities.RequestHasherType_Simple: + requestHasher = executable.NewSimpleHasher() + case capabilities.RequestHasherType_WriteReportExcludeSignatures: + requestHasher = executable.NewWriteReportExcludeSignaturesHasher() + default: + requestHasher = executable.NewSimpleHasher() + } + receiver = executable.NewServer( + config.RemoteExecutableConfig, + myPeerID, + underlyingExecutableCapability, + info, + myDON.DON, + idsToDONs, + w.dispatcher, + config.RemoteExecutableConfig.RequestTimeout, + int(config.RemoteExecutableConfig.ServerMaxParallelRequests), + requestHasher, + method, + w.lggr, + ) + } + if receiver == nil { + return fmt.Errorf("no remote config found for method %s of capability %s", method, capID) + } + + w.lggr.Debugw("Enabling external access for capability method", "id", capID, "method", method, "donID", myDON.ID) + err := w.dispatcher.SetReceiverForMethod(capID, myDON.ID, method, receiver) + if errors.Is(err, remote.ErrReceiverExists) { + // If a receiver already exists, let's log the error for debug purposes, but + // otherwise short-circuit here. We've handled this capability in a previous iteration. + // TODO(CRE-788) support dynamic changes to config and underlying capability + w.lggr.Debugw("receiver already exists", "capabilityID", capID, "donID", myDON.ID, "method", method, "error", err) + return nil + } else if err != nil { + return fmt.Errorf("failed to set receiver for capability %s, method %s: %w", capID, method, err) + } + + err = receiver.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start receiver for capability %s, method %s: %w", capID, method, err) + } + + w.subServices = append(w.subServices, receiver) + } + return nil +} diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index 0d104ab3372..1f8f12ec04c 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -1314,3 +1314,233 @@ func TestLauncher_DonPairsToUpdate(t *testing.T) { require.Equal(t, p2ptypes.DonPair{localRegistry.IDsToDONs[wfDONID].DON, localRegistry.IDsToDONs[mixedDONID].DON}, res[1]) require.Equal(t, p2ptypes.DonPair{localRegistry.IDsToDONs[capDONID].DON, localRegistry.IDsToDONs[mixedDONID].DON}, res[2]) } + +func TestLauncher_CreateCombinedClientForV2Capabilities(t *testing.T) { + lggr := logger.Test(t) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + workflowDonNodes := newNodes(4) + capabilityDonNodes := newNodes(4) + + fullTriggerCapID := "streams-trigger@1.0.0" + fullExecutableCapID := "evm@1.0.0" + triggerCapID := RandomUTF8BytesWord() + executableCapID := RandomUTF8BytesWord() + wfDonID := uint32(1) + capDonID := uint32(2) + rtc := &capabilities.RemoteTriggerConfig{} + rtc.ApplyDefaults() + + triggerCfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{ + MethodConfigs: map[string]*capabilitiespb.CapabilityMethodConfig{ + "StreamsTrigger": { + RemoteConfig: &capabilitiespb.CapabilityMethodConfig_RemoteTriggerConfig{ + RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{ + RegistrationRefresh: durationpb.New(1 * time.Second), + MinResponsesToAggregate: 3, + }, + }, + }, + }, + }) + require.NoError(t, err) + + execCfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{ + MethodConfigs: map[string]*capabilitiespb.CapabilityMethodConfig{ + "Write": { + RemoteConfig: &capabilitiespb.CapabilityMethodConfig_RemoteExecutableConfig{ + RemoteExecutableConfig: &capabilitiespb.RemoteExecutableConfig{ + RequestTimeout: durationpb.New(30 * time.Second), + DeltaStage: durationpb.New(1 * time.Second), + }, + }, + }, + }, + }) + require.NoError(t, err) + + localRegistry := buildLocalRegistry() + addDON(localRegistry, wfDonID, 0, 1, true, true, workflowDonNodes, 1, nil) + addDON(localRegistry, capDonID, 0, 1, true, false, capabilityDonNodes, 1, [][32]byte{triggerCapID, executableCapID}) + addCapabilityToDON(localRegistry, capDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, triggerCfg) + addCapabilityToDON(localRegistry, capDonID, fullExecutableCapID, capabilities.CapabilityTypeTarget, execCfg) + + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(workflowDonNodes[0]) + peer.On("IsBootstrap").Return(false) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + launcher := NewLauncher( + lggr, + wrapper, + nil, + nil, + dispatcher, + registry, + &mockDonNotifier{}, + ) + + dispatcher.On("SetReceiverForMethod", fullTriggerCapID, wfDonID, "StreamsTrigger", mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) + dispatcher.On("SetReceiverForMethod", fullExecutableCapID, wfDonID, "Write", mock.AnythingOfType("*executable.client")).Return(nil) + + err = launcher.OnNewRegistry(t.Context(), localRegistry) + require.NoError(t, err) + + _, err = registry.Get(t.Context(), fullTriggerCapID) + require.NoError(t, err) + + executableCap, err := registry.Get(t.Context(), fullExecutableCapID) + require.NoError(t, err) + + _, ok := executableCap.(capabilities.ExecutableAndTriggerCapability) + assert.True(t, ok, "expected executableCap to be of type capabilities.ExecutableAndTriggerCapability") +} + +func TestLauncher_ExposeV2CapabilitiesRemotely(t *testing.T) { + lggr := logger.Test(t) + registry := NewRegistry(lggr) + fullTriggerCapID := "streams-trigger@1.0.0" + mt := newMockTrigger(capabilities.MustNewCapabilityInfo( + fullTriggerCapID, + capabilities.CapabilityTypeTrigger, + "streams trigger", + )) + require.NoError(t, registry.Add(t.Context(), mt)) + + fullExecutableCapID := "evm@1.0.0" + mtarg := &mockCapability{ + CapabilityInfo: capabilities.MustNewCapabilityInfo( + fullExecutableCapID, + capabilities.CapabilityTypeTarget, + "evm", + ), + } + require.NoError(t, registry.Add(t.Context(), mtarg)) + + dispatcher := remoteMocks.NewDispatcher(t) + + workflowDonNodes := newNodes(4) + capabilityDonNodes := newNodes(4) + + triggerCapID := RandomUTF8BytesWord() + executableCapID := RandomUTF8BytesWord() + wfDonID := uint32(1) + capDonID := uint32(2) + rtc := &capabilities.RemoteTriggerConfig{} + rtc.ApplyDefaults() + + triggerCfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{ + MethodConfigs: map[string]*capabilitiespb.CapabilityMethodConfig{ + "StreamsTrigger": { + RemoteConfig: &capabilitiespb.CapabilityMethodConfig_RemoteTriggerConfig{ + RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{ + RegistrationRefresh: durationpb.New(1 * time.Second), + MinResponsesToAggregate: 3, + }, + }, + }, + }, + }) + require.NoError(t, err) + + execCfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{ + MethodConfigs: map[string]*capabilitiespb.CapabilityMethodConfig{ + "Write": { + RemoteConfig: &capabilitiespb.CapabilityMethodConfig_RemoteExecutableConfig{ + RemoteExecutableConfig: &capabilitiespb.RemoteExecutableConfig{ + RequestTimeout: durationpb.New(30 * time.Second), + DeltaStage: durationpb.New(1 * time.Second), + }, + }, + }, + }, + }) + require.NoError(t, err) + + localRegistry := buildLocalRegistry() + addDON(localRegistry, wfDonID, 0, 1, true, true, workflowDonNodes, 1, nil) + addDON(localRegistry, capDonID, 0, 1, true, false, capabilityDonNodes, 1, [][32]byte{triggerCapID, executableCapID}) + addCapabilityToDON(localRegistry, capDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, triggerCfg) + addCapabilityToDON(localRegistry, capDonID, fullExecutableCapID, capabilities.CapabilityTypeTarget, execCfg) + + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(capabilityDonNodes[0]) + peer.On("IsBootstrap").Return(false) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + launcher := NewLauncher( + lggr, + wrapper, + nil, + nil, + dispatcher, + registry, + &mockDonNotifier{}, + ) + + dispatcher.On("SetReceiverForMethod", fullTriggerCapID, capDonID, "StreamsTrigger", mock.AnythingOfType("*remote.triggerPublisher")).Return(nil) + dispatcher.On("SetReceiverForMethod", fullExecutableCapID, capDonID, "Write", mock.AnythingOfType("*executable.server")).Return(nil) + + err = launcher.OnNewRegistry(t.Context(), localRegistry) + require.NoError(t, err) +} + +// Helper functions for building LocalRegistry +func newNodes(count int) []ragetypes.PeerID { + nodes := make([]ragetypes.PeerID, count) + for i := range count { + nodes[i] = RandomUTF8BytesWord() + } + return nodes +} + +func buildLocalRegistry() *registrysyncer.LocalRegistry { + return ®istrysyncer.LocalRegistry{ + IDsToDONs: make(map[registrysyncer.DonID]registrysyncer.DON), + IDsToCapabilities: make(map[string]registrysyncer.Capability), + IDsToNodes: make(map[ragetypes.PeerID]registrysyncer.NodeInfo), + } +} + +func addDON(registry *registrysyncer.LocalRegistry, donID uint32, configVersion uint32, f uint8, isPublic bool, acceptsWorkflows bool, members []ragetypes.PeerID, operatorID uint32, hashedCapabilityIDs [][32]byte) { + registry.IDsToDONs[registrysyncer.DonID(donID)] = registrysyncer.DON{ + DON: capabilities.DON{ + ID: donID, + ConfigVersion: configVersion, + F: f, + IsPublic: isPublic, + AcceptsWorkflows: acceptsWorkflows, + Members: members, + }, + CapabilityConfigurations: make(map[string]registrysyncer.CapabilityConfiguration), + } + + // Add each member node to the registry + for _, peerID := range members { + registry.IDsToNodes[peerID] = registrysyncer.NodeInfo{ + NodeOperatorID: operatorID, + Signer: RandomUTF8BytesWord(), + P2pID: peerID, + EncryptionPublicKey: RandomUTF8BytesWord(), + HashedCapabilityIDs: hashedCapabilityIDs, + } + } +} + +func addCapabilityToDON(registry *registrysyncer.LocalRegistry, donID uint32, capabilityID string, capabilityType capabilities.CapabilityType, config []byte) { + don := registry.IDsToDONs[registrysyncer.DonID(donID)] + don.CapabilityConfigurations[capabilityID] = registrysyncer.CapabilityConfiguration{ + Config: config, + } + registry.IDsToDONs[registrysyncer.DonID(donID)] = don + + registry.IDsToCapabilities[capabilityID] = registrysyncer.Capability{ + ID: capabilityID, + CapabilityType: capabilityType, + } +} diff --git a/core/capabilities/remote/combined_client.go b/core/capabilities/remote/combined_client.go new file mode 100644 index 00000000000..62e928805bd --- /dev/null +++ b/core/capabilities/remote/combined_client.go @@ -0,0 +1,71 @@ +package remote + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +) + +// CombinedClient represents a remote capability V2 accessed from a local node (by the Engine). +// The capability can have multiple methods, each one being a trigger or an executable. +// The CombinedClient holds method-specific shims for each method and forwards capability API calls +// to them. Responses are passed directly to method-specific shims from the Dispatcher. +type combinedClient struct { + info capabilities.CapabilityInfo + triggerSubscribers map[string]capabilities.TriggerCapability + executableClients map[string]capabilities.ExecutableCapability +} + +var _ capabilities.ExecutableAndTriggerCapability = &combinedClient{} + +func (c *combinedClient) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { + return c.info, nil +} + +func (c *combinedClient) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { + if _, ok := c.triggerSubscribers[request.Method]; !ok { + return nil, fmt.Errorf("method %s not defined", request.Method) + } + return c.triggerSubscribers[request.Method].RegisterTrigger(ctx, request) +} + +func (c *combinedClient) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error { + if _, ok := c.triggerSubscribers[request.Method]; !ok { + return fmt.Errorf("method %s not defined", request.Method) + } + return c.triggerSubscribers[request.Method].UnregisterTrigger(ctx, request) +} + +func (c *combinedClient) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { + return errors.New("RegisterToWorkflow is not supported by remote capabilities") +} + +func (c *combinedClient) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { + return errors.New("UnregisterFromWorkflow is not supported by remote capabilities") +} + +func (c *combinedClient) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { + if _, ok := c.executableClients[request.Method]; !ok { + return capabilities.CapabilityResponse{}, fmt.Errorf("method %s not defined", request.Method) + } + return c.executableClients[request.Method].Execute(ctx, request) +} + +func NewCombinedClient(info capabilities.CapabilityInfo) *combinedClient { + return &combinedClient{ + info: info, + triggerSubscribers: make(map[string]capabilities.TriggerCapability), + executableClients: make(map[string]capabilities.ExecutableCapability), + } +} + +func (c *combinedClient) AddTriggerSubscriber(method string, subscriber capabilities.TriggerCapability) { + c.triggerSubscribers[method] = subscriber +} + +func (c *combinedClient) AddExecutableClient(method string, client capabilities.ExecutableCapability) { + c.executableClients[method] = client +} diff --git a/core/capabilities/remote/combined_client_test.go b/core/capabilities/remote/combined_client_test.go new file mode 100644 index 00000000000..5f8ef56c61e --- /dev/null +++ b/core/capabilities/remote/combined_client_test.go @@ -0,0 +1,401 @@ +package remote_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/mocks" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" +) + +func TestCombinedClient_Info(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-capability-info", commoncap.CapabilityTypeAction) + client := remote.NewCombinedClient(info) + + returnedInfo, err := client.Info(ctx) + require.NoError(t, err) + assert.Equal(t, info, returnedInfo) +} + +func TestCombinedClient_RegisterTrigger_Success(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-trigger", commoncap.CapabilityTypeTrigger) + client := remote.NewCombinedClient(info) + + mockTrigger := &mocks.TriggerCapability{} + method := "test-method" + + client.AddTriggerSubscriber(method, mockTrigger) + + request := commoncap.TriggerRegistrationRequest{ + TriggerID: "test-trigger-id", + Method: method, + } + + responseChan := make(<-chan commoncap.TriggerResponse, 1) + mockTrigger.On("RegisterTrigger", ctx, request).Return(responseChan, nil) + + result, err := client.RegisterTrigger(ctx, request) + require.NoError(t, err) + assert.Equal(t, responseChan, result) +} + +func TestCombinedClient_RegisterTrigger_MethodNotDefined(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-trigger", commoncap.CapabilityTypeTrigger) + client := remote.NewCombinedClient(info) + + request := commoncap.TriggerRegistrationRequest{ + TriggerID: "test-trigger-id", + Method: "undefined-method", + } + + result, err := client.RegisterTrigger(ctx, request) + require.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "method undefined-method not defined") +} + +func TestCombinedClient_RegisterTrigger_ErrorFromSubscriber(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-trigger", commoncap.CapabilityTypeTrigger) + client := remote.NewCombinedClient(info) + + mockTrigger := &mocks.TriggerCapability{} + method := "test-method" + + client.AddTriggerSubscriber(method, mockTrigger) + + request := commoncap.TriggerRegistrationRequest{ + TriggerID: "test-trigger-id", + Method: method, + } + + expectedError := errors.New("registration failed") + mockTrigger.On("RegisterTrigger", ctx, request).Return((<-chan commoncap.TriggerResponse)(nil), expectedError) + + result, err := client.RegisterTrigger(ctx, request) + require.Error(t, err) + assert.Nil(t, result) + assert.Equal(t, expectedError, err) +} + +func TestCombinedClient_UnregisterTrigger_Success(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-trigger", commoncap.CapabilityTypeTrigger) + client := remote.NewCombinedClient(info) + + mockTrigger := &mocks.TriggerCapability{} + method := "test-method" + + client.AddTriggerSubscriber(method, mockTrigger) + + request := commoncap.TriggerRegistrationRequest{ + TriggerID: "test-trigger-id", + Method: method, + } + + mockTrigger.On("UnregisterTrigger", ctx, request).Return(nil) + err := client.UnregisterTrigger(ctx, request) + require.NoError(t, err) +} + +func TestCombinedClient_UnregisterTrigger_MethodNotDefined(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-trigger", commoncap.CapabilityTypeTrigger) + client := remote.NewCombinedClient(info) + + request := commoncap.TriggerRegistrationRequest{ + TriggerID: "test-trigger-id", + Method: "undefined-method", + } + + err := client.UnregisterTrigger(ctx, request) + require.Error(t, err) + assert.Contains(t, err.Error(), "method undefined-method not defined") +} + +func TestCombinedClient_UnregisterTrigger_ErrorFromSubscriber(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-trigger", commoncap.CapabilityTypeTrigger) + client := remote.NewCombinedClient(info) + + mockTrigger := &mocks.TriggerCapability{} + method := "test-method" + + client.AddTriggerSubscriber(method, mockTrigger) + + request := commoncap.TriggerRegistrationRequest{ + TriggerID: "test-trigger-id", + Method: method, + } + + expectedError := errors.New("unregistration failed") + mockTrigger.On("UnregisterTrigger", ctx, request).Return(expectedError) + + err := client.UnregisterTrigger(ctx, request) + require.Error(t, err) + assert.Equal(t, expectedError, err) +} + +func TestCombinedClient_RegisterToWorkflow_NotSupported(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-capability", commoncap.CapabilityTypeTrigger) + client := remote.NewCombinedClient(info) + + request := commoncap.RegisterToWorkflowRequest{ + Metadata: commoncap.RegistrationMetadata{ + WorkflowID: "test-workflow", + }, + } + + err := client.RegisterToWorkflow(ctx, request) + require.Error(t, err) + assert.Contains(t, err.Error(), "RegisterToWorkflow is not supported by remote capabilities") +} + +func TestCombinedClient_UnregisterFromWorkflow_NotSupported(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-capability", commoncap.CapabilityTypeTrigger) + client := remote.NewCombinedClient(info) + + request := commoncap.UnregisterFromWorkflowRequest{ + Metadata: commoncap.RegistrationMetadata{ + WorkflowID: "test-workflow", + }, + } + + err := client.UnregisterFromWorkflow(ctx, request) + require.Error(t, err) + assert.Contains(t, err.Error(), "UnregisterFromWorkflow is not supported by remote capabilities") +} + +func TestCombinedClient_Execute_Success(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-executable", commoncap.CapabilityTypeAction) + client := remote.NewCombinedClient(info) + + mockExecutable := &mocks.ExecutableCapability{} + method := "test-execute-method" + + client.AddExecutableClient(method, mockExecutable) + + request := commoncap.CapabilityRequest{ + Method: method, + Config: nil, + Inputs: nil, + Metadata: commoncap.RequestMetadata{ + WorkflowID: "test-workflow", + WorkflowExecutionID: "test-execution", + }, + } + + expectedResponse := commoncap.CapabilityResponse{ + Value: nil, + } + + mockExecutable.On("Execute", ctx, request).Return(expectedResponse, nil) + + result, err := client.Execute(ctx, request) + require.NoError(t, err) + assert.Equal(t, expectedResponse, result) +} + +func TestCombinedClient_Execute_MethodNotDefined(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-executable", commoncap.CapabilityTypeAction) + client := remote.NewCombinedClient(info) + + request := commoncap.CapabilityRequest{ + Method: "undefined-method", + Config: nil, + Inputs: nil, + Metadata: commoncap.RequestMetadata{ + WorkflowID: "test-workflow", + WorkflowExecutionID: "test-execution", + }, + } + + result, err := client.Execute(ctx, request) + require.Error(t, err) + assert.Equal(t, commoncap.CapabilityResponse{}, result) + assert.Contains(t, err.Error(), "method undefined-method not defined") +} + +func TestCombinedClient_Execute_ErrorFromExecutable(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-executable", commoncap.CapabilityTypeAction) + client := remote.NewCombinedClient(info) + + // Create mock executable capability + mockExecutable := &mocks.ExecutableCapability{} + method := "test-execute-method" + + client.AddExecutableClient(method, mockExecutable) + + request := commoncap.CapabilityRequest{ + Method: method, + Config: nil, + Inputs: nil, + Metadata: commoncap.RequestMetadata{ + WorkflowID: "test-workflow", + WorkflowExecutionID: "test-execution", + }, + } + + expectedError := errors.New("execution failed") + mockExecutable.On("Execute", ctx, request).Return(commoncap.CapabilityResponse{}, expectedError) + + result, err := client.Execute(ctx, request) + require.Error(t, err) + assert.Equal(t, commoncap.CapabilityResponse{}, result) + assert.Equal(t, expectedError, err) +} + +func TestCombinedClient_AddTriggerSubscriber(t *testing.T) { + info := createTestCapabilityInfo("test-capability", commoncap.CapabilityTypeTrigger) + + client := remote.NewCombinedClient(info) + mockTrigger := &mocks.TriggerCapability{} + method := "test-method" + + client.AddTriggerSubscriber(method, mockTrigger) + + ctx := testutils.Context(t) + request := commoncap.TriggerRegistrationRequest{ + TriggerID: "test-trigger-id", + Method: method, + } + + responseChan := make(chan commoncap.TriggerResponse, 1) + mockTrigger.On("RegisterTrigger", ctx, request).Return((<-chan commoncap.TriggerResponse)(responseChan), nil) + + _, err := client.RegisterTrigger(ctx, request) + require.NoError(t, err) +} + +func TestCombinedClient_AddExecutableClient(t *testing.T) { + info := createTestCapabilityInfo("test-capability", commoncap.CapabilityTypeAction) + + client := remote.NewCombinedClient(info) + mockExecutable := &mocks.ExecutableCapability{} + method := "test-method" + + client.AddExecutableClient(method, mockExecutable) + + ctx := testutils.Context(t) + request := commoncap.CapabilityRequest{ + Method: method, + Config: nil, + Inputs: nil, + Metadata: commoncap.RequestMetadata{ + WorkflowID: "test-workflow", + WorkflowExecutionID: "test-execution", + }, + } + + expectedResponse := commoncap.CapabilityResponse{ + Value: nil, + } + + mockExecutable.On("Execute", ctx, request).Return(expectedResponse, nil) + + _, err := client.Execute(ctx, request) + require.NoError(t, err) +} + +func TestCombinedClient_MultipleMethodsAndCapabilities(t *testing.T) { + ctx := testutils.Context(t) + info := createTestCapabilityInfo("test-multi-capability", commoncap.CapabilityTypeAction) + client := remote.NewCombinedClient(info) + + // Add multiple trigger subscribers + mockTrigger1 := &mocks.TriggerCapability{} + mockTrigger2 := &mocks.TriggerCapability{} + triggerMethod1 := "trigger-method-1" + triggerMethod2 := "trigger-method-2" + + client.AddTriggerSubscriber(triggerMethod1, mockTrigger1) + client.AddTriggerSubscriber(triggerMethod2, mockTrigger2) + + // Add multiple executable clients + mockExecutable1 := &mocks.ExecutableCapability{} + mockExecutable2 := &mocks.ExecutableCapability{} + execMethod1 := "exec-method-1" + execMethod2 := "exec-method-2" + + client.AddExecutableClient(execMethod1, mockExecutable1) + client.AddExecutableClient(execMethod2, mockExecutable2) + + // Test trigger method 1 + triggerRequest1 := commoncap.TriggerRegistrationRequest{ + TriggerID: "trigger-1", + Method: triggerMethod1, + } + responseChan1 := make(chan commoncap.TriggerResponse, 1) + mockTrigger1.On("RegisterTrigger", ctx, triggerRequest1).Return((<-chan commoncap.TriggerResponse)(responseChan1), nil) + + _, err := client.RegisterTrigger(ctx, triggerRequest1) + require.NoError(t, err) + + // Test trigger method 2 + triggerRequest2 := commoncap.TriggerRegistrationRequest{ + TriggerID: "trigger-2", + Method: triggerMethod2, + } + responseChan2 := make(chan commoncap.TriggerResponse, 1) + mockTrigger2.On("RegisterTrigger", ctx, triggerRequest2).Return((<-chan commoncap.TriggerResponse)(responseChan2), nil) + + _, err = client.RegisterTrigger(ctx, triggerRequest2) + require.NoError(t, err) + + // Test executable method 1 + execRequest1 := commoncap.CapabilityRequest{ + Method: execMethod1, + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflow-1", + WorkflowExecutionID: "execution-1", + }, + } + execResponse1 := commoncap.CapabilityResponse{Value: nil} + mockExecutable1.On("Execute", ctx, execRequest1).Return(execResponse1, nil) + + _, err = client.Execute(ctx, execRequest1) + require.NoError(t, err) + + // Test executable method 2 + execRequest2 := commoncap.CapabilityRequest{ + Method: execMethod2, + Metadata: commoncap.RequestMetadata{ + WorkflowID: "workflow-2", + WorkflowExecutionID: "execution-2", + }, + } + execResponse2 := commoncap.CapabilityResponse{Value: nil} + mockExecutable2.On("Execute", ctx, execRequest2).Return(execResponse2, nil) + + _, err = client.Execute(ctx, execRequest2) + require.NoError(t, err) + + // Assert all expectations + mockTrigger1.AssertExpectations(t) + mockTrigger2.AssertExpectations(t) + mockExecutable1.AssertExpectations(t) + mockExecutable2.AssertExpectations(t) +} + +func createTestCapabilityInfo(id string, capType commoncap.CapabilityType) commoncap.CapabilityInfo { + return commoncap.CapabilityInfo{ + ID: id, + CapabilityType: capType, + Description: "Test capability", + DON: nil, + IsLocal: false, + } +} diff --git a/core/capabilities/remote/executable/client.go b/core/capabilities/remote/executable/client.go index 782f617f830..1c81f820c4e 100644 --- a/core/capabilities/remote/executable/client.go +++ b/core/capabilities/remote/executable/client.go @@ -34,6 +34,7 @@ type client struct { requestTimeout time.Duration // Has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request. transmissionConfig *transmission.TransmissionConfig + capMethodName string requestIDToCallerRequest map[string]*request.ClientRequest mutex sync.Mutex @@ -54,7 +55,7 @@ var ( // TransmissionConfig has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request. func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher, - requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, lggr logger.Logger) *client { + requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, capMethodName string, lggr logger.Logger) *client { return &client{ lggr: logger.Named(lggr, "ExecutableCapabilityClient"), remoteCapabilityInfo: remoteCapabilityInfo, @@ -62,6 +63,7 @@ func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commo dispatcher: dispatcher, requestTimeout: requestTimeout, transmissionConfig: transmissionConfig, + capMethodName: capMethodName, requestIDToCallerRequest: make(map[string]*request.ClientRequest), stopCh: make(services.StopChan), } @@ -166,7 +168,7 @@ func (c *client) UnregisterFromWorkflow(ctx context.Context, unregisterRequest c func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) { req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher, - c.requestTimeout, c.transmissionConfig) + c.requestTimeout, c.transmissionConfig, c.capMethodName) if err != nil { return commoncap.CapabilityResponse{}, fmt.Errorf("failed to create client request: %w", err) } diff --git a/core/capabilities/remote/executable/client_test.go b/core/capabilities/remote/executable/client_test.go index d5d8f334c87..d2eceea6259 100644 --- a/core/capabilities/remote/executable/client_test.go +++ b/core/capabilities/remote/executable/client_test.go @@ -242,7 +242,7 @@ func testClient(t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout for i := 0; i < numWorkflowPeers; i++ { workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) - caller := executable.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout, nil, lggr) + caller := executable.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeResponseTimeout, nil, "", lggr) servicetest.Run(t, caller) broker.RegisterReceiverNode(workflowPeers[i], caller) callers[i] = caller diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index fa2ed8ae7b6..73c29cec2f5 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -294,7 +294,7 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) capabilityNode := executable.NewServer(&commoncap.RemoteExecutableConfig{RequestHashExcludedAttributes: []string{}}, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, - capabilityNodeResponseTimeout, 10, nil, lggr) + capabilityNodeResponseTimeout, 10, nil, "", lggr) servicetest.Run(t, capabilityNode) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) capabilityNodes[i] = capabilityNode @@ -303,7 +303,7 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin workflowNodes := make([]commoncap.ExecutableCapability, numWorkflowPeers) for i := 0; i < numWorkflowPeers; i++ { workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeers[i]) - workflowNode := executable.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout, nil, lggr) + workflowNode := executable.NewClient(capInfo, workflowDonInfo, workflowPeerDispatcher, workflowNodeTimeout, nil, "", lggr) servicetest.Run(t, workflowNode) broker.RegisterReceiverNode(workflowPeers[i], workflowNode) workflowNodes[i] = workflowNode diff --git a/core/capabilities/remote/executable/request/client_request.go b/core/capabilities/remote/executable/request/client_request.go index 957fa9c7654..73e0e8c090a 100644 --- a/core/capabilities/remote/executable/request/client_request.go +++ b/core/capabilities/remote/executable/request/client_request.go @@ -57,7 +57,7 @@ type ClientRequest struct { // TransmissionConfig has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request. func NewClientExecuteRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher, - requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig) (*ClientRequest, error) { + requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, capMethodName string) (*ClientRequest, error) { rawRequest, err := proto.MarshalOptions{Deterministic: true}.Marshal(pb.CapabilityRequestToProto(req)) if err != nil { return nil, fmt.Errorf("failed to marshal capability request: %w", err) @@ -83,7 +83,7 @@ func NewClientExecuteRequest(ctx context.Context, lggr logger.Logger, req common } lggr = logger.With(lggr, "requestId", requestID, "capabilityID", remoteCapabilityInfo.ID) - return newClientRequest(ctx, lggr, requestID, remoteCapabilityInfo, localDonInfo, dispatcher, requestTimeout, tc, types.MethodExecute, rawRequest, workflowExecutionID, req.Metadata.ReferenceID) + return newClientRequest(ctx, lggr, requestID, remoteCapabilityInfo, localDonInfo, dispatcher, requestTimeout, tc, types.MethodExecute, rawRequest, workflowExecutionID, req.Metadata.ReferenceID, capMethodName) } var ( @@ -92,7 +92,7 @@ var ( func newClientRequest(ctx context.Context, lggr logger.Logger, requestID string, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, - tc transmission.TransmissionConfig, methodType string, rawRequest []byte, workflowExecutionID string, stepRef string) (*ClientRequest, error) { + tc transmission.TransmissionConfig, methodType string, rawRequest []byte, workflowExecutionID string, stepRef string, capMethodName string) (*ClientRequest, error) { remoteCapabilityDonInfo := remoteCapabilityInfo.DON if remoteCapabilityDonInfo == nil { return nil, errors.New("remote capability info missing DON") @@ -163,12 +163,13 @@ func newClientRequest(ctx context.Context, lggr logger.Logger, requestID string, go func(innerCtx context.Context, peerID ragep2ptypes.PeerID, delay time.Duration) { defer wg.Done() message := &types.MessageBody{ - CapabilityId: remoteCapabilityInfo.ID, - CapabilityDonId: remoteCapabilityDonInfo.ID, - CallerDonId: localDonInfo.ID, - Method: methodType, - Payload: rawRequest, - MessageId: []byte(requestID), + CapabilityId: remoteCapabilityInfo.ID, + CapabilityDonId: remoteCapabilityDonInfo.ID, + CallerDonId: localDonInfo.ID, + Method: methodType, + Payload: rawRequest, + MessageId: []byte(requestID), + CapabilityMethod: capMethodName, } select { diff --git a/core/capabilities/remote/executable/request/client_request_test.go b/core/capabilities/remote/executable/request/client_request_test.go index dc035fde48d..f9d55f8a9dd 100644 --- a/core/capabilities/remote/executable/request/client_request_test.go +++ b/core/capabilities/remote/executable/request/client_request_test.go @@ -84,7 +84,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo, - workflowDonInfo, dispatcher, 10*time.Minute, nil) + workflowDonInfo, dispatcher, 10*time.Minute, nil, "") defer req.Cancel(errors.New("test end")) require.NoError(t, err) @@ -135,7 +135,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo, - workflowDonInfo, dispatcher, 10*time.Minute, nil) + workflowDonInfo, dispatcher, 10*time.Minute, nil, "") require.NoError(t, err) defer req.Cancel(errors.New("test end")) @@ -169,7 +169,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo, - workflowDonInfo, dispatcher, 10*time.Minute, nil) + workflowDonInfo, dispatcher, 10*time.Minute, nil, "") require.NoError(t, err) defer req.Cancel(errors.New("test end")) @@ -200,7 +200,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo, - workflowDonInfo, dispatcher, 10*time.Minute, nil) + workflowDonInfo, dispatcher, 10*time.Minute, nil, "") require.NoError(t, err) defer req.Cancel(errors.New("test end")) @@ -238,7 +238,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo, - workflowDonInfo, dispatcher, 10*time.Minute, nil) + workflowDonInfo, dispatcher, 10*time.Minute, nil, "") require.NoError(t, err) defer req.Cancel(errors.New("test end")) @@ -299,7 +299,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo, - workflowDonInfo, dispatcher, 10*time.Minute, nil) + workflowDonInfo, dispatcher, 10*time.Minute, nil, "") require.NoError(t, err) defer req.Cancel(errors.New("test end")) @@ -357,6 +357,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { dispatcher, 10*time.Minute, nil, + "", ) require.NoError(t, err) defer req.Cancel(errors.New("test end")) @@ -477,6 +478,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { dispatcher, 10*time.Minute, nil, + "", ) require.NoError(t, err) defer req.Cancel(errors.New("test end")) @@ -569,7 +571,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo, - workflowDonInfo, dispatcher, 10*time.Minute, nil) + workflowDonInfo, dispatcher, 10*time.Minute, nil, "") require.NoError(t, err) defer req.Cancel(errors.New("test end")) @@ -633,6 +635,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { Schedule: transmission.Schedule_OneAtATime, DeltaStage: 1000 * time.Millisecond, }, + "", ) require.NoError(t, err) defer req.Cancel(errors.New("test end")) diff --git a/core/capabilities/remote/executable/request/server_request.go b/core/capabilities/remote/executable/request/server_request.go index b4bcf735478..aa0320637ff 100644 --- a/core/capabilities/remote/executable/request/server_request.go +++ b/core/capabilities/remote/executable/request/server_request.go @@ -127,6 +127,7 @@ type ServerRequest struct { requestMessageID string method string requestTimeout time.Duration + capMethodName string mux sync.Mutex lggr logger.Logger @@ -137,7 +138,7 @@ type ServerRequest struct { func NewServerRequest(capability capabilities.ExecutableCapability, method string, capabilityID string, capabilityDonID uint32, capabilityPeerID p2ptypes.PeerID, callingDon capabilities.DON, requestID string, - dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) (*ServerRequest, error) { + dispatcher types.Dispatcher, requestTimeout time.Duration, capMethodName string, lggr logger.Logger) (*ServerRequest, error) { lggr = logger.Sugared(lggr).Named("ServerRequest").With("requestID", requestID, "capabilityID", capabilityID) m, err := newSrMetrics(capabilityID, callingDon.ID) @@ -158,6 +159,7 @@ func NewServerRequest(capability capabilities.ExecutableCapability, method strin requestMessageID: requestID, method: method, requestTimeout: requestTimeout, + capMethodName: capMethodName, lggr: lggr, metrics: m, }, nil @@ -295,13 +297,14 @@ func (e *ServerRequest) sendResponses(ctx context.Context) error { func (e *ServerRequest) sendResponse(ctx context.Context, requester p2ptypes.PeerID) error { responseMsg := types.MessageBody{ - CapabilityId: e.capabilityID, - CapabilityDonId: e.capabilityDonID, - CallerDonId: e.callingDon.ID, - Method: types.MethodExecute, - MessageId: []byte(e.requestMessageID), - Sender: e.capabilityPeerID[:], - Receiver: requester[:], + CapabilityId: e.capabilityID, + CapabilityDonId: e.capabilityDonID, + CallerDonId: e.callingDon.ID, + Method: types.MethodExecute, + MessageId: []byte(e.requestMessageID), + Sender: e.capabilityPeerID[:], + Receiver: requester[:], + CapabilityMethod: e.capMethodName, } if e.response.error != types.Error_OK { diff --git a/core/capabilities/remote/executable/request/server_request_test.go b/core/capabilities/remote/executable/request/server_request_test.go index 9741221864c..634a33a66ac 100644 --- a/core/capabilities/remote/executable/request/server_request_test.go +++ b/core/capabilities/remote/executable/request/server_request_test.go @@ -59,7 +59,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Send duplicate message", func(t *testing.T) { req, err := request.NewServerRequest(capability, types.MethodExecute, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, "", lggr) require.NoError(t, err) err = sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) @@ -70,7 +70,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Send message with non calling don peer", func(t *testing.T) { req, err := request.NewServerRequest(capability, types.MethodExecute, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, "", lggr) require.NoError(t, err) err = sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) @@ -94,7 +94,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Send message invalid payload", func(t *testing.T) { req, err := request.NewServerRequest(capability, types.MethodExecute, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, "", lggr) require.NoError(t, err) err = sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) @@ -120,7 +120,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Send second valid request when capability errors", func(t *testing.T) { dispatcher := &testDispatcher{} req, err := request.NewServerRequest(TestErrorCapability{err: errors.New("an error")}, types.MethodExecute, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, "", lggr) require.NoError(t, err) err = sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) @@ -148,7 +148,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Reportable errors are returned to the caller", func(t *testing.T) { dispatcher := &testDispatcher{} req, err := request.NewServerRequest(TestErrorCapability{err: commoncap.NewRemoteReportableError(errors.New("error details"))}, types.MethodExecute, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, "", lggr) require.NoError(t, err) err = sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) @@ -176,7 +176,7 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { t.Run("Execute capability", func(t *testing.T) { dispatcher := &testDispatcher{} req, err := request.NewServerRequest(capability, types.MethodExecute, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) + capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, "", lggr) require.NoError(t, err) err = sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) diff --git a/core/capabilities/remote/executable/server.go b/core/capabilities/remote/executable/server.go index f257dde9635..bcaac9dc773 100644 --- a/core/capabilities/remote/executable/server.go +++ b/core/capabilities/remote/executable/server.go @@ -40,6 +40,7 @@ type server struct { requestIDToRequest map[string]requestAndMsgID requestTimeout time.Duration + capMethodName string // Used to detect messages with the same message id but different payloads messageIDToRequestIDsCount map[string]map[string]int @@ -62,7 +63,7 @@ type requestAndMsgID struct { func NewServer(remoteExecutableConfig *commoncap.RemoteExecutableConfig, peerID p2ptypes.PeerID, underlying commoncap.ExecutableCapability, capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, - maxParallelRequests int, messageHasher types.MessageHasher, + maxParallelRequests int, messageHasher types.MessageHasher, capMethodName string, lggr logger.Logger) *server { if remoteExecutableConfig == nil { lggr.Info("no remote config provided, using default values") @@ -84,6 +85,7 @@ func NewServer(remoteExecutableConfig *commoncap.RemoteExecutableConfig, peerID requestIDToRequest: map[string]requestAndMsgID{}, messageIDToRequestIDsCount: map[string]map[string]int{}, requestTimeout: requestTimeout, + capMethodName: capMethodName, lggr: logger.Named(lggr, "ExecutableCapabilityServer"), stopCh: make(services.StopChan), @@ -202,7 +204,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { } sr, ierr := request.NewServerRequest(r.underlying, msg.Method, r.capInfo.ID, r.localDonInfo.ID, r.peerID, - callingDon, messageID, r.dispatcher, r.requestTimeout, r.lggr) + callingDon, messageID, r.dispatcher, r.requestTimeout, r.capMethodName, r.lggr) if ierr != nil { r.lggr.Errorw("failed to instantiate server request", "err", ierr) return diff --git a/core/capabilities/remote/executable/server_test.go b/core/capabilities/remote/executable/server_test.go index a7ac04845df..80d3aebd294 100644 --- a/core/capabilities/remote/executable/server_test.go +++ b/core/capabilities/remote/executable/server_test.go @@ -362,7 +362,7 @@ func testRemoteExecutableCapabilityServer(ctx context.Context, t *testing.T, capabilityPeer := capabilityPeers[i] capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer) capabilityNode := executable.NewServer(config, capabilityPeer, underlying, capInfo, capDonInfo, workflowDONs, capabilityDispatcher, - capabilityNodeResponseTimeout, 10, messageHasher, lggr) + capabilityNodeResponseTimeout, 10, messageHasher, "", lggr) require.NoError(t, capabilityNode.Start(ctx)) broker.RegisterReceiverNode(capabilityPeer, capabilityNode) capabilityNodes[i] = capabilityNode diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index e3e9fffe541..00cbb983aaf 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -33,6 +33,7 @@ type triggerPublisher struct { workflowDONs map[uint32]commoncap.DON membersCache map[uint32]map[p2ptypes.PeerID]bool dispatcher types.Dispatcher + capMethodName string messageCache *messagecache.MessageCache[registrationKey, p2ptypes.PeerID] registrations map[registrationKey]*pubRegState mu sync.RWMutex // protects messageCache and registrations @@ -66,7 +67,7 @@ var _ types.ReceiverService = &triggerPublisher{} const minAllowedBatchCollectionPeriod = 10 * time.Millisecond -func NewTriggerPublisher(config *commoncap.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher { +func NewTriggerPublisher(config *commoncap.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, capMethodName string, lggr logger.Logger) *triggerPublisher { if config == nil { lggr.Info("no config provided, using default values") config = &commoncap.RemoteTriggerConfig{} @@ -88,6 +89,7 @@ func NewTriggerPublisher(config *commoncap.RemoteTriggerConfig, underlying commo workflowDONs: workflowDONs, membersCache: membersCache, dispatcher: dispatcher, + capMethodName: capMethodName, messageCache: messagecache.NewMessageCache[registrationKey, p2ptypes.PeerID](), registrations: make(map[registrationKey]*pubRegState), batchingQueue: make(map[[32]byte]*batchedResponse), @@ -301,6 +303,7 @@ func (p *triggerPublisher) sendBatch(resp *batchedResponse) { TriggerEventId: resp.triggerEventID, }, }, + CapabilityMethod: p.capMethodName, } // NOTE: send to all nodes by default, introduce different strategies later (KS-76) for _, peerID := range p.workflowDONs[resp.callerDonID].Members { diff --git a/core/capabilities/remote/trigger_publisher_test.go b/core/capabilities/remote/trigger_publisher_test.go index 6b49483a86f..4a3bdacb4e3 100644 --- a/core/capabilities/remote/trigger_publisher_test.go +++ b/core/capabilities/remote/trigger_publisher_test.go @@ -138,7 +138,7 @@ func newServices(t *testing.T, capabilityDONID uint32, workflowDONID uint32, max registrationsCh: make(chan commoncap.TriggerRegistrationRequest, 2), eventCh: make(chan commoncap.TriggerResponse, 2), } - publisher := remote.NewTriggerPublisher(config, underlying, capInfo, capDonInfo, workflowDONs, dispatcher, lggr) + publisher := remote.NewTriggerPublisher(config, underlying, capInfo, capDonInfo, workflowDONs, dispatcher, "", lggr) require.NoError(t, publisher.Start(ctx)) return underlying, publisher, dispatcher, peers } diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index c96adf523b4..24c010e7342 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -31,6 +31,7 @@ type triggerSubscriber struct { capDonMembers map[p2ptypes.PeerID]struct{} localDonInfo commoncap.DON dispatcher types.Dispatcher + capMethodName string aggregator types.Aggregator messageCache *messagecache.MessageCache[triggerEventKey, p2ptypes.PeerID] registeredWorkflows map[string]*subRegState @@ -65,7 +66,7 @@ const ( maxBatchedWorkflowIDs = 1000 ) -func NewTriggerSubscriber(config *commoncap.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, localDonInfo commoncap.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber { +func NewTriggerSubscriber(config *commoncap.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, localDonInfo commoncap.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, capMethodName string, lggr logger.Logger) *triggerSubscriber { if aggregator == nil { lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID) aggregator = aggregation.NewDefaultModeAggregator(uint32(capDonInfo.F + 1)) @@ -86,6 +87,7 @@ func NewTriggerSubscriber(config *commoncap.RemoteTriggerConfig, capInfo commonc capDonMembers: capDonMembers, localDonInfo: localDonInfo, dispatcher: dispatcher, + capMethodName: capMethodName, aggregator: aggregator, messageCache: messagecache.NewMessageCache[triggerEventKey, p2ptypes.PeerID](), registeredWorkflows: make(map[string]*subRegState), @@ -151,11 +153,12 @@ func (s *triggerSubscriber) registrationLoop() { // NOTE: send to all by default, introduce different strategies later (KS-76) for _, peerID := range s.capDonInfo.Members { m := &types.MessageBody{ - CapabilityId: s.capInfo.ID, - CapabilityDonId: s.capDonInfo.ID, - CallerDonId: s.localDonInfo.ID, - Method: types.MethodRegisterTrigger, - Payload: registration.rawRequest, + CapabilityId: s.capInfo.ID, + CapabilityDonId: s.capDonInfo.ID, + CallerDonId: s.localDonInfo.ID, + Method: types.MethodRegisterTrigger, + Payload: registration.rawRequest, + CapabilityMethod: s.capMethodName, } err := s.dispatcher.Send(peerID, m) if err != nil { diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index 73f79ba4d21..a1fa6fabb01 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -48,7 +48,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiry: 100 * time.Second, } - subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, lggr) + subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, "", lggr) require.NoError(t, subscriber.Start(t.Context())) req := commoncap.TriggerRegistrationRequest{ @@ -95,7 +95,7 @@ func TestTriggerSubscriber_CorrectEventExpiryCheck(t *testing.T) { MinResponsesToAggregate: 2, MessageExpiry: 10 * time.Second, } - subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, lggr) + subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, "", lggr) require.NoError(t, subscriber.Start(t.Context())) regReq := commoncap.TriggerRegistrationRequest{ diff --git a/core/capabilities/streams/trigger_test.go b/core/capabilities/streams/trigger_test.go index 5c81bfe7173..dbc8abac695 100644 --- a/core/capabilities/streams/trigger_test.go +++ b/core/capabilities/streams/trigger_test.go @@ -90,7 +90,7 @@ func TestStreamsTrigger(t *testing.T) { config := &capabilities.RemoteTriggerConfig{ MinResponsesToAggregate: uint32(F + 1), } - subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, capabilities.DON{}, nil, agg, lggr) + subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, capabilities.DON{}, nil, agg, "", lggr) // register trigger req := capabilities.TriggerRegistrationRequest{ diff --git a/core/capabilities/transmission/transmission.go b/core/capabilities/transmission/transmission.go index 096b34c30dd..1e673904065 100644 --- a/core/capabilities/transmission/transmission.go +++ b/core/capabilities/transmission/transmission.go @@ -56,6 +56,17 @@ func ExtractTransmissionConfig(config *values.Map) (TransmissionConfig, error) { }, nil } +func EnumToString(t capabilities.TransmissionSchedule) string { + switch t { + case capabilities.Schedule_AllAtOnce: + return Schedule_AllAtOnce + case capabilities.Schedule_OneAtATime: + return Schedule_OneAtATime + default: + return "unknown" + } +} + // GetPeerIDToTransmissionDelay returns a map of PeerID to the time.Duration that the node with that PeerID should wait // before transmitting the capability request. If a node is not in the map, it should not transmit. func GetPeerIDToTransmissionDelay(donPeerIDs []types.PeerID, req capabilities.CapabilityRequest) (map[types.PeerID]time.Duration, error) { diff --git a/core/services/registrysyncer/local_registry.go b/core/services/registrysyncer/local_registry.go index ffd595666d6..06fe79f628c 100644 --- a/core/services/registrysyncer/local_registry.go +++ b/core/services/registrysyncer/local_registry.go @@ -63,12 +63,54 @@ func (c CapabilityConfiguration) Unmarshal() (capabilities.CapabilityConfigurati return capabilities.CapabilityConfiguration{}, fmt.Errorf("failed to unmarshal capability configuration: %w", err) } + var methodConfigs map[string]capabilities.CapabilityMethodConfig + if cconf.MethodConfigs != nil { + methodConfigs = make(map[string]capabilities.CapabilityMethodConfig, len(cconf.MethodConfigs)) + for method, methodConfig := range cconf.MethodConfigs { + var config capabilities.CapabilityMethodConfig + switch remoteCfg := methodConfig.RemoteConfig.(type) { + case *capabilitiespb.CapabilityMethodConfig_RemoteTriggerConfig: + config = capabilities.CapabilityMethodConfig{ + RemoteTriggerConfig: &capabilities.RemoteTriggerConfig{ + RegistrationRefresh: remoteCfg.RemoteTriggerConfig.RegistrationRefresh.AsDuration(), + RegistrationExpiry: remoteCfg.RemoteTriggerConfig.RegistrationExpiry.AsDuration(), + MinResponsesToAggregate: remoteCfg.RemoteTriggerConfig.MinResponsesToAggregate, + MessageExpiry: remoteCfg.RemoteTriggerConfig.MessageExpiry.AsDuration(), + MaxBatchSize: remoteCfg.RemoteTriggerConfig.MaxBatchSize, + BatchCollectionPeriod: remoteCfg.RemoteTriggerConfig.BatchCollectionPeriod.AsDuration(), + }, + } + case *capabilitiespb.CapabilityMethodConfig_RemoteExecutableConfig: + config = capabilities.CapabilityMethodConfig{ + RemoteExecutableConfig: &capabilities.RemoteExecutableConfig{ + TransmissionSchedule: capabilities.TransmissionSchedule(remoteCfg.RemoteExecutableConfig.TransmissionSchedule), + DeltaStage: remoteCfg.RemoteExecutableConfig.DeltaStage.AsDuration(), + RequestTimeout: remoteCfg.RemoteExecutableConfig.RequestTimeout.AsDuration(), + ServerMaxParallelRequests: remoteCfg.RemoteExecutableConfig.ServerMaxParallelRequests, + RequestHasherType: capabilities.RequestHasherType(remoteCfg.RemoteExecutableConfig.RequestHasherType), + }, + } + default: + return capabilities.CapabilityConfiguration{}, fmt.Errorf("unknown method config type for method %s", method) + } + + if methodConfig.AggregatorConfig != nil { + config.AggregatorConfig = &capabilities.AggregatorConfig{ + AggregatorType: capabilities.AggregatorType(methodConfig.AggregatorConfig.AggregatorType), + } + } + + methodConfigs[method] = config + } + } + return capabilities.CapabilityConfiguration{ - DefaultConfig: dc, - RestrictedKeys: cconf.RestrictedKeys, - RestrictedConfig: rc, - RemoteTriggerConfig: remoteTriggerConfig, - RemoteTargetConfig: remoteTargetConfig, + DefaultConfig: dc, + RestrictedKeys: cconf.RestrictedKeys, + RestrictedConfig: rc, + RemoteTriggerConfig: remoteTriggerConfig, + RemoteTargetConfig: remoteTargetConfig, + CapabilityMethodConfig: methodConfigs, }, nil }