Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dry-pets-knock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#changed Support dynamic config updates in TriggerSubscriber
186 changes: 120 additions & 66 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,27 @@ type launcher struct {
myPeerID p2ptypes.PeerID
peerWrapper p2ptypes.PeerWrapper
dispatcher remotetypes.Dispatcher
cachedShims cachedShims
registry *Registry
subServices []services.Service
workflowDonNotifier donNotifier
don2donSharedPeer p2ptypes.SharedPeer
p2pStreamConfig p2ptypes.StreamConfig
}

// For V2 capabilities, shims are created once and their config is updated dynamically.
type cachedShims struct {
combinedClients map[string]remote.CombinedClient
triggerSubscribers map[string]remote.TriggerSubscriber
executableClients map[string]executable.Client

// TODO(CRE-942): add trigger publishers and executable servers
}

func shimKey(capID string, donID uint32, method string) string {
return fmt.Sprintf("%s:%d:%s", capID, donID, method)
}

type donNotifier interface {
NotifyDonSet(don capabilities.DON)
}
Expand Down Expand Up @@ -85,9 +99,14 @@ func NewLauncher(
}
}
return &launcher{
lggr: logger.Named(lggr, "CapabilitiesLauncher"),
peerWrapper: peerWrapper,
dispatcher: dispatcher,
lggr: logger.Named(lggr, "CapabilitiesLauncher"),
peerWrapper: peerWrapper,
dispatcher: dispatcher,
cachedShims: cachedShims{
combinedClients: make(map[string]remote.CombinedClient),
triggerSubscribers: make(map[string]remote.TriggerSubscriber),
executableClients: make(map[string]executable.Client),
},
registry: registry,
subServices: []services.Service{},
workflowDonNotifier: workflowDonNotifier,
Expand Down Expand Up @@ -394,6 +413,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
return nil, err
}

// deprecated pre-LLO Mercury aggregator
aggregator = triggers.NewMercuryRemoteAggregator(
codec,
signers,
Expand Down Expand Up @@ -424,23 +444,21 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
aggregator = aggregation.NewDefaultModeAggregator(uint32(remoteDON.F) + 1)
}

// TODO: We need to implement a custom, Mercury-specific
// aggregator here, because there is no guarantee that
// all trigger events in the workflow will have the same
// payloads. As a workaround, we validate the signatures.
// When this is solved, we can move to a generic aggregator
// and remove this.
triggerCap := remote.NewTriggerSubscriber(
capabilityConfig.RemoteTriggerConfig,
info,
remoteDON.DON,
myDON.DON,
w.dispatcher,
aggregator,
"", // empty method name for v1
w.lggr,
)
return triggerCap, nil
shimKey := shimKey(capability.ID, remoteDON.ID, "") // empty method name for V1
triggerCap, alreadyExists := w.cachedShims.triggerSubscribers[shimKey]
if !alreadyExists {
triggerCap = remote.NewTriggerSubscriber(
capability.ID,
"", // empty method name for v1
w.dispatcher,
w.lggr,
)
w.cachedShims.triggerSubscribers[shimKey] = triggerCap
}
if errCfg := triggerCap.SetConfig(capabilityConfig.RemoteTriggerConfig, info, myDON.ID, remoteDON.DON, aggregator); errCfg != nil {
return nil, fmt.Errorf("failed to set trigger config: %w", errCfg)
}
return triggerCap.(capabilityService), nil
}
err := w.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newTriggerFn)
if err != nil {
Expand Down Expand Up @@ -744,60 +762,84 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth
if err != nil {
return fmt.Errorf("failed to create remote capability info: %w", err)
}
cc := remote.NewCombinedClient(info)

cc, isNewCC := w.getCombinedClient(info)
for method, config := range methodConfig {
var receiver remotetypes.ReceiverService
if config.RemoteTriggerConfig != nil {
// TODO(CRE-590): add support for SignedReportAggregator (needed by LLO Streams Trigger V2)
aggregator := aggregation.NewDefaultModeAggregator(config.RemoteTriggerConfig.MinResponsesToAggregate)
sub := remote.NewTriggerSubscriber(
config.RemoteTriggerConfig,
info,
remoteDON.DON,
myDON.DON,
w.dispatcher,
aggregator,
method,
w.lggr,
)
receiver = sub
cc.AddTriggerSubscriber(method, sub)
}
if receiver == nil && config.RemoteExecutableConfig != nil {
client := executable.NewClient(
info,
myDON.DON,
w.dispatcher,
config.RemoteExecutableConfig.RequestTimeout,
&transmission.TransmissionConfig{
Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule),
DeltaStage: config.RemoteExecutableConfig.DeltaStage,
},
method,
w.lggr,
)
receiver = client
cc.AddExecutableClient(method, client)
}
if receiver == nil {
return fmt.Errorf("no remote config found for method %s of capability %s", method, capID)
}
if err = w.dispatcher.SetReceiverForMethod(capID, remoteDON.ID, method, receiver); err != nil {
return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err)
w.lggr.Infow("addRemoteCapabilityV2", "capID", capID, "method", method)
if config.RemoteTriggerConfig == nil && config.RemoteExecutableConfig == nil {
w.lggr.Errorw("no remote config found", "method", method, "capID", capID)
continue
}
err = receiver.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start receiver for capability %s, method %s: %w", capID, method, err)

shimKey := shimKey(capID, remoteDON.ID, method)
if config.RemoteTriggerConfig != nil { // trigger
sub, alreadyExists := w.cachedShims.triggerSubscribers[shimKey]
if !alreadyExists {
sub = remote.NewTriggerSubscriber(capID, method, w.dispatcher, w.lggr)
cc.SetTriggerSubscriber(method, sub)
}
// TODO(CRE-590): add support for SignedReportAggregator (needed by LLO Streams Trigger V2)
agg := aggregation.NewDefaultModeAggregator(config.RemoteTriggerConfig.MinResponsesToAggregate)
if errCfg := sub.SetConfig(config.RemoteTriggerConfig, info, myDON.ID, remoteDON.DON, agg); errCfg != nil {
return fmt.Errorf("failed to set trigger config: %w", errCfg)
}

if !alreadyExists {
if err2 := w.startNewShim(ctx, sub.(remotetypes.ReceiverService), capID, remoteDON.ID, method); err2 != nil {
w.lggr.Errorw("failed to start receiver", "capID", capID, "method", method, "error", err2)
continue
}
w.cachedShims.triggerSubscribers[shimKey] = sub
w.lggr.Infow("added new remote trigger subscriber", "capID", capID, "method", method)
}
} else { // executable
client, alreadyExists := w.cachedShims.executableClients[shimKey]
if !alreadyExists {
client = executable.NewClient(
info,
myDON.DON,
w.dispatcher,
config.RemoteExecutableConfig.RequestTimeout,
&transmission.TransmissionConfig{
Schedule: transmission.EnumToString(config.RemoteExecutableConfig.TransmissionSchedule),
DeltaStage: config.RemoteExecutableConfig.DeltaStage,
},
method,
w.lggr,
)
cc.SetExecutableClient(method, client)
}

// TODO(CRE-941): implement setters for executable client config

if !alreadyExists {
if err2 := w.startNewShim(ctx, client.(remotetypes.ReceiverService), capID, remoteDON.ID, method); err2 != nil {
w.lggr.Errorw("failed to start receiver", "capID", capID, "method", method, "error", err2)
continue
}
w.cachedShims.executableClients[shimKey] = client
w.lggr.Infow("added new remote executable client", "capID", capID, "method", method)
}
}
w.subServices = append(w.subServices, receiver)
}

err = w.registry.Add(ctx, cc)
if err != nil {
return fmt.Errorf("failed to add CombinedClient for capability %s to registry: %w", capID, err)
if isNewCC { // add new CombinedClient to registry, only after all methods are configured
if err2 := w.registry.Add(ctx, cc); err2 != nil {
return fmt.Errorf("failed to add CombinedClient for capability %s to registry: %w", capID, err2)
}
}
return nil
}

func (w *launcher) startNewShim(ctx context.Context, receiver remotetypes.ReceiverService, capID string, remoteDonID uint32, method string) error {
if err := receiver.Start(ctx); err != nil {
return fmt.Errorf("failed to start receiver for capability %s, method %s: %w", capID, method, err)
}
if err := w.dispatcher.SetReceiverForMethod(capID, remoteDonID, method, receiver); err != nil {
_ = receiver.Close()
return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err)
}
w.subServices = append(w.subServices, receiver)
return nil
}

Expand Down Expand Up @@ -887,3 +929,15 @@ func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodC
}
return nil
}

// retrieve or create a CombinedClient for the given capability
func (w *launcher) getCombinedClient(info capabilities.CapabilityInfo) (remote.CombinedClient, bool) {
key := shimKey(info.ID, info.DON.ID, "") // empty method name - CombinedClient covers all methods
cc, exists := w.cachedShims.combinedClients[key]
if !exists { // create a new combined client and cache it
cc = remote.NewCombinedClient(info)
w.cachedShims.combinedClients[key] = cc
return cc, true
}
return cc, false
}
54 changes: 37 additions & 17 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,6 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) {
dID := uint32(1)
capDonID := uint32(2)

rtc := &capabilities.RemoteTriggerConfig{}
rtc.ApplyDefaults()
cfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{
RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{
RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{
Expand Down Expand Up @@ -442,8 +440,6 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) {
dID := uint32(1)
capDonID := uint32(2)

rtc := &capabilities.RemoteTriggerConfig{}
rtc.ApplyDefaults()
cfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{
RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{
RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{
Expand Down Expand Up @@ -504,15 +500,20 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie
dID := uint32(1)
triggerCapDonID := uint32(2)
targetCapDonID := uint32(3)
// The below state describes a Workflow DON (AcceptsWorkflows = true),
// which exposes the streams-trigger and write_chain capabilities.
// We expect receivers to be wired up and both capabilities to be added to the registry.

cfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{
RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{
RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{},
},
})
require.NoError(t, err)

localRegistry := buildLocalRegistry()
addDON(localRegistry, dID, uint32(0), uint8(1), true, true, workflowDonNodes, 1, nil)
addDON(localRegistry, triggerCapDonID, uint32(0), uint8(1), true, false, capabilityDonNodes, 1, [][32]byte{triggerCapID, targetCapID})
addCapabilityToDON(localRegistry, triggerCapDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil)
addCapabilityToDON(localRegistry, triggerCapDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, cfg)
addDON(localRegistry, targetCapDonID, uint32(0), uint8(1), false, false, capabilityDonNodes, 1, [][32]byte{triggerCapID, targetCapID})
addCapabilityToDON(localRegistry, targetCapDonID, fullTargetID, capabilities.CapabilityTypeTarget, nil)
addCapabilityToDON(localRegistry, targetCapDonID, fullTargetID, capabilities.CapabilityTypeTarget, cfg)

launcher := NewLauncher(
lggr,
Expand All @@ -527,8 +528,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie
defer launcher.Close()
dispatcher.On("SetReceiver", fullTriggerCapID, triggerCapDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)

err := launcher.OnNewRegistry(t.Context(), localRegistry)
require.NoError(t, err)
require.NoError(t, launcher.OnNewRegistry(t.Context(), localRegistry))

_, err = registry.Get(t.Context(), fullTriggerCapID)
require.NoError(t, err)
Expand Down Expand Up @@ -738,7 +738,7 @@ func TestLauncher_DonPairsToUpdate(t *testing.T) {
require.Equal(t, p2ptypes.DonPair{localRegistry.IDsToDONs[capDONID].DON, localRegistry.IDsToDONs[mixedDONID].DON}, res[2])
}

func TestLauncher_CreateCombinedClientForV2Capabilities(t *testing.T) {
func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) {
lggr := logger.Test(t)
registry := NewRegistry(lggr)
dispatcher := remoteMocks.NewDispatcher(t)
Expand Down Expand Up @@ -805,20 +805,40 @@ func TestLauncher_CreateCombinedClientForV2Capabilities(t *testing.T) {
dispatcher.On("SetReceiverForMethod", fullTriggerCapID, capDonID, "StreamsTrigger", mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)
dispatcher.On("SetReceiverForMethod", fullExecutableCapID, capDonID, "Write", mock.AnythingOfType("*executable.client")).Return(nil)

// first test the initial CombinedClient creation
err = launcher.OnNewRegistry(t.Context(), localRegistry)
require.NoError(t, err)

_, err = registry.Get(t.Context(), fullTriggerCapID)
trigCap, err := registry.Get(t.Context(), fullTriggerCapID)
require.NoError(t, err)
trigCC, ok := trigCap.(remote.CombinedClient)
assert.True(t, ok, "expected CombinedClient object")
subscriber := trigCC.GetTriggerSubscriber("StreamsTrigger")
capInfo, err := subscriber.Info(t.Context())
require.NoError(t, err)
assert.Equal(t, fullTriggerCapID, capInfo.ID)
assert.Len(t, capInfo.DON.Members, 4)

executableCap, err := registry.Get(t.Context(), fullExecutableCapID)
execCap, err := registry.Get(t.Context(), fullExecutableCapID)
require.NoError(t, err)
execCC, ok := execCap.(remote.CombinedClient)
assert.True(t, ok, "expected CombinedClient object")
require.NotNil(t, execCC.GetExecutableClient("Write"))

// Now update config for one capability and verify it's propagated correctly (DON size)
capDon := localRegistry.IDsToDONs[registrysyncer.DonID(capDonID)]
capDon.Members = append(capDon.Members, ragetypes.PeerID(RandomUTF8BytesWord()))
localRegistry.IDsToDONs[registrysyncer.DonID(capDonID)] = capDon
err = launcher.OnNewRegistry(t.Context(), localRegistry)
require.NoError(t, err)

_, ok := executableCap.(capabilities.ExecutableAndTriggerCapability)
assert.True(t, ok, "expected executableCap to be of type capabilities.ExecutableAndTriggerCapability")
capInfo, err = subscriber.Info(t.Context())
require.NoError(t, err)
assert.Equal(t, fullTriggerCapID, capInfo.ID)
assert.Len(t, capInfo.DON.Members, 5)
}

func TestLauncher_ExposeV2CapabilitiesRemotely(t *testing.T) {
func TestLauncher_V2CapabilitiesExposeRemotely(t *testing.T) {
lggr := logger.Test(t)
registry := NewRegistry(lggr)
fullTriggerCapID := "streams-trigger@1.0.0"
Expand Down
Loading
Loading