Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mighty-trains-heal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#added Support capabilities that are both Triggers and Executables
175 changes: 175 additions & 0 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/streams"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/config"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
Expand Down Expand Up @@ -354,6 +355,15 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
return fmt.Errorf("could not unmarshal capability config for id %s", cid)
}

methodConfig := capabilityConfig.CapabilityMethodConfig
if methodConfig != nil { // v2 capability - handle via CombinedClient
errAdd := w.addRemoteCapabilityV2(ctx, capability.ID, methodConfig, myDON, remoteDON)
if errAdd != nil {
return fmt.Errorf("failed to add remote v2 capability %s: %w", capability.ID, errAdd)
}
continue
}

switch capability.CapabilityType {
case capabilities.CapabilityTypeTrigger:
newTriggerFn := func(info capabilities.CapabilityInfo) (capabilityService, error) {
Expand Down Expand Up @@ -417,6 +427,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
myDON.DON,
w.dispatcher,
aggregator,
"", // empty method name for v1
w.lggr,
)
return triggerCap, nil
Expand All @@ -433,6 +444,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
w.dispatcher,
defaultTargetRequestTimeout,
nil, // V1 capabilities read transmission schedule from every request
"", // empty method name for v1
w.lggr,
)
return client, nil
Expand All @@ -452,6 +464,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
w.dispatcher,
defaultTargetRequestTimeout,
nil, // V1 capabilities read transmission schedule from every request
"", // empty method name for v1
w.lggr,
)
return client, nil
Expand Down Expand Up @@ -543,6 +556,15 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
return fmt.Errorf("could not unmarshal capability config for id %s", cid)
}

methodConfig := capabilityConfig.CapabilityMethodConfig
if methodConfig != nil { // v2 capability
errExpose := w.exposeCapabilityV2(ctx, cid, methodConfig, myPeerID, don, idsToDONs)
if errExpose != nil {
return fmt.Errorf("failed to expose v2 capability remotely %s: %w", cid, errExpose)
}
continue
}

switch capability.CapabilityType {
case capabilities.CapabilityTypeTrigger:
newTriggerPublisher := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
Expand All @@ -558,6 +580,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
don.DON,
idsToDONs,
w.dispatcher,
"", // empty method name for v1
w.lggr,
)
return publisher, nil
Expand Down Expand Up @@ -591,6 +614,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
defaultTargetRequestTimeout,
defaultMaxParallelCapabilityExecuteRequests,
nil, // TODO: create a capability-specific hasher
"", // empty method name for v1
w.lggr,
), nil
}
Expand Down Expand Up @@ -625,6 +649,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
defaultTargetRequestTimeout,
defaultMaxParallelCapabilityExecuteRequests,
nil, // TODO: create a capability-specific hasher
"", // empty method name for v1
w.lggr,
), nil
}
Expand Down Expand Up @@ -697,3 +722,153 @@ func signersFor(don registrysyncer.DON, state *registrysyncer.LocalRegistry) ([]

return s, nil
}

// Add a V2 capability with multiple methods, using CombinedClient.
func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, methodConfig map[string]capabilities.CapabilityMethodConfig, myDON registrysyncer.DON, remoteDON registrysyncer.DON) error {
info, err := capabilities.NewRemoteCapabilityInfo(
capID,
capabilities.CapabilityTypeCombined,
"Remote Capability for "+capID,
&myDON.DON,
)
if err != nil {
return fmt.Errorf("failed to create remote capability info: %w", err)
}
cc := remote.NewCombinedClient(info)

for method, config := range methodConfig {
var receiver remotetypes.Receiver
if config.RemoteTriggerConfig != nil {
// TODO(CRE-590): add support for SignedReportAggregator (needed by LLO Streams Trigger V2)
aggregator := aggregation.NewDefaultModeAggregator(config.RemoteTriggerConfig.MinResponsesToAggregate)
sub := remote.NewTriggerSubscriber(
config.RemoteTriggerConfig,
info,
remoteDON.DON,
myDON.DON,
w.dispatcher,
aggregator,
method,
w.lggr,
)
receiver = sub
cc.AddTriggerSubscriber(method, sub)
}
if receiver == nil && config.RemoteExecutableConfig != nil {
client := executable.NewClient(
info,
myDON.DON,
w.dispatcher,
config.RemoteExecutableConfig.RequestTimeout,
&transmission.TransmissionConfig{
Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule),
DeltaStage: config.RemoteExecutableConfig.DeltaStage,
},
method,
w.lggr,
)
receiver = client
cc.AddExecutableClient(method, client)
}
if receiver == nil {
return fmt.Errorf("no remote config found for method %s of capability %s", method, capID)
}
if err = w.dispatcher.SetReceiverForMethod(capID, myDON.ID, method, receiver); err != nil {
return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err)
}
}

err = w.registry.Add(ctx, cc)
if err != nil {
return fmt.Errorf("failed to add CombinedClient for capability %s to registry: %w", capID, err)
}

return nil
}

func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodConfig map[string]capabilities.CapabilityMethodConfig, myPeerID p2ptypes.PeerID, myDON registrysyncer.DON, idsToDONs map[uint32]capabilities.DON) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@george-dorin uncovered a very hairy bug (ticket pending...) where it seem like if the loop crashes the in-memory registry is corrupted and never recovers despite a new loop process starting.

how does this code get alerted/triggered by, say, a looop restart?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't. And it didn't even before this change :( I opened a separate epic to handle dynamic updates to underlying capabilities and config, which should include LOOPP restarts (CRE-788)

info, err := capabilities.NewRemoteCapabilityInfo(
capID,
capabilities.CapabilityTypeCombined,
"Remote Capability for "+capID,
&myDON.DON,
)
if err != nil {
return fmt.Errorf("failed to create remote capability info: %w", err)
}
underlying, err := w.registry.Get(ctx, capID)
if err != nil {
return fmt.Errorf("failed to get capability %s from registry: %w", capID, err)
}
for method, config := range methodConfig {
var receiver remotetypes.ReceiverService
if config.RemoteTriggerConfig != nil {
underlyingTriggerCapability, ok := (underlying).(capabilities.TriggerCapability)
if !ok {
return fmt.Errorf("capability %s does not implement TriggerCapability", capID)
}
receiver = remote.NewTriggerPublisher(
config.RemoteTriggerConfig,
underlyingTriggerCapability,
info,
myDON.DON,
idsToDONs,
w.dispatcher,
method,
w.lggr,
)
}
if receiver == nil && config.RemoteExecutableConfig != nil {
underlyingExecutableCapability, ok := (underlying).(capabilities.ExecutableCapability)
if !ok {
return fmt.Errorf("capability %s does not implement ExecutableCapability", capID)
}
var requestHasher remotetypes.MessageHasher
switch config.RemoteExecutableConfig.RequestHasherType {
case capabilities.RequestHasherType_Simple:
requestHasher = executable.NewSimpleHasher()
case capabilities.RequestHasherType_WriteReportExcludeSignatures:
requestHasher = executable.NewWriteReportExcludeSignaturesHasher()
default:
requestHasher = executable.NewSimpleHasher()
}
receiver = executable.NewServer(
config.RemoteExecutableConfig,
myPeerID,
underlyingExecutableCapability,
info,
myDON.DON,
idsToDONs,
w.dispatcher,
config.RemoteExecutableConfig.RequestTimeout,
int(config.RemoteExecutableConfig.ServerMaxParallelRequests),
requestHasher,
method,
w.lggr,
)
}
if receiver == nil {
return fmt.Errorf("no remote config found for method %s of capability %s", method, capID)
}

w.lggr.Debugw("Enabling external access for capability method", "id", capID, "method", method, "donID", myDON.ID)
err := w.dispatcher.SetReceiverForMethod(capID, myDON.ID, method, receiver)
if errors.Is(err, remote.ErrReceiverExists) {
// If a receiver already exists, let's log the error for debug purposes, but
// otherwise short-circuit here. We've handled this capability in a previous iteration.
// TODO(CRE-788) support dynamic changes to config and underlying capability
w.lggr.Debugw("receiver already exists", "capabilityID", capID, "donID", myDON.ID, "method", method, "error", err)
return nil
} else if err != nil {
return fmt.Errorf("failed to set receiver for capability %s, method %s: %w", capID, method, err)
}

err = receiver.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start receiver for capability %s, method %s: %w", capID, method, err)
}

w.subServices = append(w.subServices, receiver)
}
return nil
}
Loading
Loading