diff --git a/.changeset/floppy-parts-argue.md b/.changeset/floppy-parts-argue.md
new file mode 100644
index 00000000000..5a74047f903
--- /dev/null
+++ b/.changeset/floppy-parts-argue.md
@@ -0,0 +1,5 @@
+---
+"chainlink": minor
+---
+
+Added Bridge Status Reporter Service that polls external adapter status endpoints and emits telemetry events for operational monitoring #nops #added
diff --git a/core/config/app_config.go b/core/config/app_config.go
index a76544f2e5f..87d39a26ed2 100644
--- a/core/config/app_config.go
+++ b/core/config/app_config.go
@@ -62,6 +62,7 @@ type AppConfig interface {
Telemetry() Telemetry
CRE() CRE
Billing() Billing
+ BridgeStatusReporter() BridgeStatusReporter
}
type DatabaseBackupMode string
diff --git a/core/config/bridge_status_config.go b/core/config/bridge_status_config.go
new file mode 100644
index 00000000000..11895686d32
--- /dev/null
+++ b/core/config/bridge_status_config.go
@@ -0,0 +1,13 @@
+package config
+
+import "time"
+
+const MinimumPollingInterval = time.Minute
+
+type BridgeStatusReporter interface {
+ Enabled() bool
+ StatusPath() string
+ PollingInterval() time.Duration
+ IgnoreInvalidBridges() bool
+ IgnoreJoblessBridges() bool
+}
diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml
index abb467854ed..dc4571707e0 100644
--- a/core/config/docs/core.toml
+++ b/core/config/docs/core.toml
@@ -827,3 +827,16 @@ URL = '' # Default
URL = "localhost:4319" # Default
# TLSEnabled enables TLS to be used to secure communication with the billing service. This is enabled by default.
TLSEnabled = true # Default
+
+# BridgeStatusReporter holds settings for the Bridge Status Reporter service.
+[BridgeStatusReporter]
+# Enabled enables the Bridge Status Reporter service that polls bridge status endpoints.
+Enabled = false # Default
+# StatusPath is the path to append to bridge URLs for status polling.
+StatusPath = "/status" # Default
+# PollingInterval is how often to poll bridge status endpoints for status.
+PollingInterval = "5m" # Default
+# IgnoreInvalidBridges skips bridges that return HTTP errors or invalid responses.
+IgnoreInvalidBridges = true # Default
+# IgnoreJoblessBridges skips bridges that have no associated jobs.
+IgnoreJoblessBridges = false # Default
diff --git a/core/config/toml/types.go b/core/config/toml/types.go
index 6552a91ba99..213716f2b7c 100644
--- a/core/config/toml/types.go
+++ b/core/config/toml/types.go
@@ -39,30 +39,31 @@ type Core struct {
RootDir *string
ShutdownGracePeriod *commonconfig.Duration
- Feature Feature `toml:",omitempty"`
- Database Database `toml:",omitempty"`
- TelemetryIngress TelemetryIngress `toml:",omitempty"`
- AuditLogger AuditLogger `toml:",omitempty"`
- Log Log `toml:",omitempty"`
- WebServer WebServer `toml:",omitempty"`
- JobDistributor JobDistributor `toml:",omitempty"`
- JobPipeline JobPipeline `toml:",omitempty"`
- FluxMonitor FluxMonitor `toml:",omitempty"`
- OCR2 OCR2 `toml:",omitempty"`
- OCR OCR `toml:",omitempty"`
- P2P P2P `toml:",omitempty"`
- Keeper Keeper `toml:",omitempty"`
- AutoPprof AutoPprof `toml:",omitempty"`
- Pyroscope Pyroscope `toml:",omitempty"`
- Sentry Sentry `toml:",omitempty"`
- Insecure Insecure `toml:",omitempty"`
- Tracing Tracing `toml:",omitempty"`
- Mercury Mercury `toml:",omitempty"`
- Capabilities Capabilities `toml:",omitempty"`
- Telemetry Telemetry `toml:",omitempty"`
- Workflows Workflows `toml:",omitempty"`
- CRE CreConfig `toml:",omitempty"`
- Billing Billing `toml:",omitempty"`
+ Feature Feature `toml:",omitempty"`
+ Database Database `toml:",omitempty"`
+ TelemetryIngress TelemetryIngress `toml:",omitempty"`
+ AuditLogger AuditLogger `toml:",omitempty"`
+ Log Log `toml:",omitempty"`
+ WebServer WebServer `toml:",omitempty"`
+ JobDistributor JobDistributor `toml:",omitempty"`
+ JobPipeline JobPipeline `toml:",omitempty"`
+ FluxMonitor FluxMonitor `toml:",omitempty"`
+ OCR2 OCR2 `toml:",omitempty"`
+ OCR OCR `toml:",omitempty"`
+ P2P P2P `toml:",omitempty"`
+ Keeper Keeper `toml:",omitempty"`
+ AutoPprof AutoPprof `toml:",omitempty"`
+ Pyroscope Pyroscope `toml:",omitempty"`
+ Sentry Sentry `toml:",omitempty"`
+ Insecure Insecure `toml:",omitempty"`
+ Tracing Tracing `toml:",omitempty"`
+ Mercury Mercury `toml:",omitempty"`
+ Capabilities Capabilities `toml:",omitempty"`
+ Telemetry Telemetry `toml:",omitempty"`
+ Workflows Workflows `toml:",omitempty"`
+ CRE CreConfig `toml:",omitempty"`
+ Billing Billing `toml:",omitempty"`
+ BridgeStatusReporter BridgeStatusReporter `toml:",omitempty"`
}
// SetFrom updates c with any non-nil values from f. (currently TOML field only!)
@@ -107,6 +108,7 @@ func (c *Core) SetFrom(f *Core) {
c.Telemetry.setFrom(&f.Telemetry)
c.CRE.setFrom(&f.CRE)
c.Billing.setFrom(&f.Billing)
+ c.BridgeStatusReporter.setFrom(&f.BridgeStatusReporter)
}
func (c *Core) ValidateConfig() (err error) {
@@ -2273,6 +2275,64 @@ func (b *Billing) ValidateConfig() error {
return nil
}
+type BridgeStatusReporter struct {
+ Enabled *bool
+ StatusPath *string
+ PollingInterval *commonconfig.Duration
+ IgnoreInvalidBridges *bool
+ IgnoreJoblessBridges *bool
+}
+
+func (e *BridgeStatusReporter) setFrom(f *BridgeStatusReporter) {
+ if f.Enabled != nil {
+ e.Enabled = f.Enabled
+ }
+ if f.StatusPath != nil {
+ e.StatusPath = f.StatusPath
+ }
+ if f.PollingInterval != nil {
+ e.PollingInterval = f.PollingInterval
+ }
+ if f.IgnoreInvalidBridges != nil {
+ e.IgnoreInvalidBridges = f.IgnoreInvalidBridges
+ }
+ if f.IgnoreJoblessBridges != nil {
+ e.IgnoreJoblessBridges = f.IgnoreJoblessBridges
+ }
+}
+
+func (e *BridgeStatusReporter) ValidateConfig() error {
+ if e.Enabled == nil || !*e.Enabled {
+ return nil
+ }
+
+ // Default values when enabled
+ if e.StatusPath == nil || *e.StatusPath == "" {
+ defaultPath := "/status"
+ e.StatusPath = &defaultPath
+ }
+
+ if e.PollingInterval == nil {
+ return configutils.ErrInvalid{Name: "PollingInterval", Value: nil, Msg: "must be set"}
+ }
+
+ if e.PollingInterval.Duration() < config.MinimumPollingInterval {
+ return configutils.ErrInvalid{Name: "PollingInterval", Value: e.PollingInterval.Duration(), Msg: "must be greater than or equal to: " + config.MinimumPollingInterval.String()}
+ }
+
+ if e.IgnoreInvalidBridges == nil {
+ defaultIgnoreInvalid := true
+ e.IgnoreInvalidBridges = &defaultIgnoreInvalid
+ }
+
+ if e.IgnoreJoblessBridges == nil {
+ defaultIgnoreJobless := false
+ e.IgnoreJoblessBridges = &defaultIgnoreJobless
+ }
+
+ return nil
+}
+
type JobDistributor struct {
DisplayName *string
}
diff --git a/core/config/toml/types_test.go b/core/config/toml/types_test.go
index ccfde391cb4..15528fa5da6 100644
--- a/core/config/toml/types_test.go
+++ b/core/config/toml/types_test.go
@@ -6,6 +6,7 @@ import (
"net/url"
"strings"
"testing"
+ "time"
"github.com/pelletier/go-toml/v2"
"github.com/stretchr/testify/assert"
@@ -636,3 +637,166 @@ func TestEthKeys_SetFrom(t *testing.T) {
// ptr is a utility function for converting a value to a pointer to the value.
func ptr[T any](t T) *T { return &t }
+
+func TestBridgeStatusReporter_ValidateConfig(t *testing.T) {
+ testCases := []struct {
+ name string
+ config *BridgeStatusReporter
+ expectError bool
+ errorMsg string
+ }{
+ {
+ name: "disabled with nil fields",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(false),
+ StatusPath: nil,
+ PollingInterval: nil,
+ IgnoreInvalidBridges: nil,
+ IgnoreJoblessBridges: nil,
+ },
+ expectError: false,
+ },
+ {
+ name: "disabled with empty fields",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(false),
+ StatusPath: ptr(""),
+ PollingInterval: durationPtr(0),
+ IgnoreInvalidBridges: ptr(false),
+ IgnoreJoblessBridges: ptr(true),
+ },
+ expectError: false,
+ },
+ {
+ name: "disabled with valid fields",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(false),
+ StatusPath: ptr("/status"),
+ PollingInterval: durationPtr(5 * time.Minute),
+ IgnoreInvalidBridges: ptr(true),
+ IgnoreJoblessBridges: ptr(false),
+ },
+ expectError: false,
+ },
+ {
+ name: "nil enabled (defaults to disabled)",
+ config: &BridgeStatusReporter{
+ Enabled: nil,
+ StatusPath: ptr("/status"),
+ PollingInterval: durationPtr(5 * time.Minute),
+ IgnoreInvalidBridges: ptr(true),
+ IgnoreJoblessBridges: ptr(false),
+ },
+ expectError: false,
+ },
+ // Enabled valid cases with auto-defaulting
+ {
+ name: "enabled with valid config",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(true),
+ StatusPath: ptr("/status"),
+ PollingInterval: durationPtr(5 * time.Minute),
+ IgnoreInvalidBridges: ptr(true),
+ IgnoreJoblessBridges: ptr(false),
+ },
+ expectError: false,
+ },
+ {
+ name: "enabled with nil fields - should fail validation",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(true),
+ StatusPath: nil,
+ PollingInterval: nil,
+ IgnoreInvalidBridges: nil,
+ IgnoreJoblessBridges: nil,
+ },
+ expectError: true,
+ errorMsg: "must be set",
+ },
+ {
+ name: "enabled with empty status path - should auto-default",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(true),
+ StatusPath: ptr(""),
+ PollingInterval: durationPtr(5 * time.Minute),
+ IgnoreInvalidBridges: ptr(true),
+ IgnoreJoblessBridges: ptr(false),
+ },
+ expectError: false,
+ },
+ {
+ name: "enabled with zero polling interval - should fail validation",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(true),
+ StatusPath: ptr("/status"),
+ PollingInterval: durationPtr(0),
+ IgnoreInvalidBridges: ptr(true),
+ IgnoreJoblessBridges: ptr(false),
+ },
+ expectError: true,
+ errorMsg: "must be greater than or equal to: 1m",
+ },
+ {
+ name: "enabled with polling interval less than 1 minute - should fail validation",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(true),
+ StatusPath: ptr("/status"),
+ PollingInterval: durationPtr(30 * time.Second),
+ IgnoreInvalidBridges: ptr(true),
+ IgnoreJoblessBridges: ptr(false),
+ },
+ expectError: true,
+ errorMsg: "must be greater than or equal to: 1m",
+ },
+ {
+ name: "enabled with polling interval exactly 1 minute",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(true),
+ StatusPath: ptr("/status"),
+ PollingInterval: durationPtr(1 * time.Minute),
+ IgnoreInvalidBridges: ptr(true),
+ IgnoreJoblessBridges: ptr(false),
+ },
+ expectError: false,
+ },
+ {
+ name: "enabled with all fields missing - should fail validation",
+ config: &BridgeStatusReporter{
+ Enabled: ptr(true),
+ StatusPath: ptr(""),
+ PollingInterval: durationPtr(0),
+ IgnoreInvalidBridges: nil,
+ IgnoreJoblessBridges: nil,
+ },
+ expectError: true,
+ errorMsg: "must be greater than or equal to: 1m",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ err := tc.config.ValidateConfig()
+ if tc.expectError {
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), tc.errorMsg)
+ } else {
+ assert.NoError(t, err)
+
+ // Verify defaults are set when enabled
+ if tc.config.Enabled != nil && *tc.config.Enabled {
+ assert.NotNil(t, tc.config.StatusPath)
+ assert.NotEmpty(t, *tc.config.StatusPath)
+ assert.NotNil(t, tc.config.PollingInterval)
+ assert.GreaterOrEqual(t, tc.config.PollingInterval.Duration(), time.Minute)
+ assert.NotNil(t, tc.config.IgnoreInvalidBridges)
+ assert.NotNil(t, tc.config.IgnoreJoblessBridges)
+ }
+ }
+ })
+ }
+}
+
+func durationPtr(d time.Duration) *commonconfig.Duration {
+ cd := *commonconfig.MustNewDuration(d)
+ return &cd
+}
diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go
index 5a3a3622861..0c6af8752dc 100644
--- a/core/services/chainlink/application.go
+++ b/core/services/chainlink/application.go
@@ -24,6 +24,7 @@ import (
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/credentials"
+ "github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/billing"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
@@ -64,6 +65,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/retirement"
+ "github.com/smartcontractkit/chainlink/v2/core/services/nodestatusreporter/bridgestatus"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap"
@@ -649,6 +651,16 @@ func NewApplication(ctx context.Context, opts ApplicationOpts) (Application, err
globalLogger.Debug("Off-chain reporting v2 disabled")
}
+ bridgeStatusReporter := bridgestatus.NewBridgeStatusReporter(
+ cfg.BridgeStatusReporter(),
+ bridgeORM,
+ jobORM,
+ unrestrictedHTTPClient,
+ beholder.GetEmitter(),
+ globalLogger,
+ )
+ srvcs = append(srvcs, bridgeStatusReporter)
+
healthChecker := commonservices.NewChecker(static.Version, static.Sha)
var lbs []utils.DependentAwaiter
diff --git a/core/services/chainlink/config_bridge_status.go b/core/services/chainlink/config_bridge_status.go
new file mode 100644
index 00000000000..f83ade1ed51
--- /dev/null
+++ b/core/services/chainlink/config_bridge_status.go
@@ -0,0 +1,49 @@
+package chainlink
+
+import (
+ "time"
+
+ "github.com/smartcontractkit/chainlink/v2/core/config"
+ "github.com/smartcontractkit/chainlink/v2/core/config/toml"
+)
+
+var _ config.BridgeStatusReporter = (*bridgeStatusReporterConfig)(nil)
+
+type bridgeStatusReporterConfig struct {
+ c toml.BridgeStatusReporter
+}
+
+func (e *bridgeStatusReporterConfig) Enabled() bool {
+ if e.c.Enabled == nil {
+ return false
+ }
+ return *e.c.Enabled
+}
+
+func (e *bridgeStatusReporterConfig) StatusPath() string {
+ if e.c.StatusPath == nil {
+ return "/status"
+ }
+ return *e.c.StatusPath
+}
+
+func (e *bridgeStatusReporterConfig) PollingInterval() time.Duration {
+ if e.c.PollingInterval == nil {
+ return 5 * time.Minute
+ }
+ return e.c.PollingInterval.Duration()
+}
+
+func (e *bridgeStatusReporterConfig) IgnoreInvalidBridges() bool {
+ if e.c.IgnoreInvalidBridges == nil {
+ return true
+ }
+ return *e.c.IgnoreInvalidBridges
+}
+
+func (e *bridgeStatusReporterConfig) IgnoreJoblessBridges() bool {
+ if e.c.IgnoreJoblessBridges == nil {
+ return false
+ }
+ return *e.c.IgnoreJoblessBridges
+}
diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go
index 9d5270dbb35..f6c4223c597 100644
--- a/core/services/chainlink/config_general.go
+++ b/core/services/chainlink/config_general.go
@@ -566,4 +566,8 @@ func (g *generalConfig) Billing() coreconfig.Billing {
return &billingConfig{t: g.c.Billing}
}
+func (g *generalConfig) BridgeStatusReporter() coreconfig.BridgeStatusReporter {
+ return &bridgeStatusReporterConfig{c: g.c.BridgeStatusReporter}
+}
+
var zeroSha256Hash = models.Sha256Hash{}
diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go
index 5e28bc956b3..b433aa3ca6d 100644
--- a/core/services/chainlink/config_test.go
+++ b/core/services/chainlink/config_test.go
@@ -599,6 +599,13 @@ func TestConfig_Marshal(t *testing.T) {
URL: ptr("localhost:4319"),
TLSEnabled: ptr(true),
}
+ full.BridgeStatusReporter = toml.BridgeStatusReporter{
+ Enabled: ptr(false),
+ StatusPath: ptr("/status"),
+ PollingInterval: commoncfg.MustNewDuration(5 * time.Minute),
+ IgnoreInvalidBridges: ptr(true),
+ IgnoreJoblessBridges: ptr(false),
+ }
full.JobDistributor = toml.JobDistributor{
DisplayName: ptr("test-node"),
}
diff --git a/core/services/chainlink/mocks/general_config.go b/core/services/chainlink/mocks/general_config.go
index 04cb9dcfc20..f23a0c5a553 100644
--- a/core/services/chainlink/mocks/general_config.go
+++ b/core/services/chainlink/mocks/general_config.go
@@ -312,6 +312,53 @@ func (_c *GeneralConfig_Billing_Call) RunAndReturn(run func() config.Billing) *G
return _c
}
+// BridgeStatusReporter provides a mock function with no fields
+func (_m *GeneralConfig) BridgeStatusReporter() config.BridgeStatusReporter {
+ ret := _m.Called()
+
+ if len(ret) == 0 {
+ panic("no return value specified for BridgeStatusReporter")
+ }
+
+ var r0 config.BridgeStatusReporter
+ if rf, ok := ret.Get(0).(func() config.BridgeStatusReporter); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(config.BridgeStatusReporter)
+ }
+ }
+
+ return r0
+}
+
+// GeneralConfig_BridgeStatusReporter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BridgeStatusReporter'
+type GeneralConfig_BridgeStatusReporter_Call struct {
+ *mock.Call
+}
+
+// BridgeStatusReporter is a helper method to define mock.On call
+func (_e *GeneralConfig_Expecter) BridgeStatusReporter() *GeneralConfig_BridgeStatusReporter_Call {
+ return &GeneralConfig_BridgeStatusReporter_Call{Call: _e.mock.On("BridgeStatusReporter")}
+}
+
+func (_c *GeneralConfig_BridgeStatusReporter_Call) Run(run func()) *GeneralConfig_BridgeStatusReporter_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *GeneralConfig_BridgeStatusReporter_Call) Return(_a0 config.BridgeStatusReporter) *GeneralConfig_BridgeStatusReporter_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *GeneralConfig_BridgeStatusReporter_Call) RunAndReturn(run func() config.BridgeStatusReporter) *GeneralConfig_BridgeStatusReporter_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
// CRE provides a mock function with no fields
func (_m *GeneralConfig) CRE() config.CRE {
ret := _m.Called()
diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml
index 3f838ad1d93..57a0881fb4d 100644
--- a/core/services/chainlink/testdata/config-empty-effective.toml
+++ b/core/services/chainlink/testdata/config-empty-effective.toml
@@ -346,3 +346,10 @@ URL = ''
[Billing]
URL = 'localhost:4319'
TLSEnabled = true
+
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml
index 1b36397b2df..5ba1a790a87 100644
--- a/core/services/chainlink/testdata/config-full.toml
+++ b/core/services/chainlink/testdata/config-full.toml
@@ -361,6 +361,13 @@ URL = 'https://workflow.fetcher.url'
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
Enabled = false
diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml
index 7fc256608a7..38b78af0850 100644
--- a/core/services/chainlink/testdata/config-multi-chain-effective.toml
+++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml
@@ -347,6 +347,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
AutoCreateKey = true
diff --git a/core/services/nodestatusreporter/bridgestatus/bridge_status_reporter.go b/core/services/nodestatusreporter/bridgestatus/bridge_status_reporter.go
new file mode 100644
index 00000000000..6d2ed05e970
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/bridge_status_reporter.go
@@ -0,0 +1,321 @@
+package bridgestatus
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/url"
+ "sync"
+
+ "github.com/smartcontractkit/chainlink-common/pkg/beholder"
+ "github.com/smartcontractkit/chainlink-common/pkg/services"
+
+ "github.com/smartcontractkit/chainlink/v2/core/bridges"
+ "github.com/smartcontractkit/chainlink/v2/core/config"
+ "github.com/smartcontractkit/chainlink/v2/core/logger"
+ "github.com/smartcontractkit/chainlink/v2/core/services/job"
+ "github.com/smartcontractkit/chainlink/v2/core/services/nodestatusreporter/bridgestatus/events"
+)
+
+// Service polls Bridge status and pushes them to Beholder
+type Service struct {
+ services.Service
+ eng *services.Engine
+
+ config config.BridgeStatusReporter
+ bridgeORM bridges.ORM
+ jobORM job.ORM
+ httpClient *http.Client
+ emitter beholder.Emitter
+}
+
+const (
+ ServiceName = "BridgeStatusReporter"
+ bridgePollPageSize = 1_000
+)
+
+// NewBridgeStatusReporter creates a new Bridge Status Reporter Service
+func NewBridgeStatusReporter(
+ config config.BridgeStatusReporter,
+ bridgeORM bridges.ORM,
+ jobORM job.ORM,
+ httpClient *http.Client,
+ emitter beholder.Emitter,
+ lggr logger.Logger,
+) *Service {
+ s := &Service{
+ config: config,
+ bridgeORM: bridgeORM,
+ jobORM: jobORM,
+ httpClient: httpClient,
+ emitter: emitter,
+ }
+ s.Service, s.eng = services.Config{
+ Name: ServiceName,
+ Start: s.start,
+ }.NewServiceEngine(lggr)
+ return s
+}
+
+// start starts the Bridge Status Reporter Service
+func (s *Service) start(ctx context.Context) error {
+ if !s.config.Enabled() {
+ s.eng.Info("Bridge Status Reporter Service is disabled")
+ return nil
+ }
+
+ s.eng.Info("Starting Bridge Status Reporter Service")
+
+ // Start periodic polling using Engine's ticker support
+ ticker := services.NewTicker(s.config.PollingInterval())
+ s.eng.GoTick(ticker, s.pollAllBridges)
+
+ return nil
+}
+
+// HealthReport returns the service health
+func (s *Service) HealthReport() map[string]error {
+ return map[string]error{ServiceName: s.Ready()}
+}
+
+// pollAllBridges polls all registered bridges using pagination
+func (s *Service) pollAllBridges(ctx context.Context) {
+ var allBridges []bridges.BridgeType
+ var offset = 0
+
+ // Paginate through all bridges
+ for {
+ bridgeList, _, err := s.bridgeORM.BridgeTypes(ctx, offset, bridgePollPageSize)
+ if err != nil {
+ s.eng.Debugw("Failed to fetch bridges", "error", err, "offset", offset)
+ return
+ }
+
+ allBridges = append(allBridges, bridgeList...)
+
+ // If we got fewer than pageSize bridges, we've reached the end
+ if len(bridgeList) < bridgePollPageSize {
+ break
+ }
+
+ offset += bridgePollPageSize
+ }
+
+ if len(allBridges) == 0 {
+ s.eng.Debug("No bridges configured for Bridge Status Reporter polling")
+ return
+ }
+
+ s.eng.Debugw("Polling Bridge Status Reporter for all bridges", "count", len(allBridges))
+
+ // Poll each bridge concurrently and wait for completion
+ var wg sync.WaitGroup
+ for _, bridge := range allBridges {
+ wg.Add(1)
+ bridgeName := string(bridge.Name)
+ bridgeURL := bridge.URL.String()
+ go func(name, url string) {
+ defer wg.Done()
+ s.pollBridge(ctx, name, url)
+ }(bridgeName, bridgeURL)
+ }
+
+ wg.Wait()
+}
+
+// handleBridgeError handles errors during bridge polling, either skipping or emitting empty telemetry
+func (s *Service) handleBridgeError(ctx context.Context, bridgeName string, jobs []JobInfo, logMsg string, logFields ...any) {
+ s.eng.Debugw(logMsg, logFields...)
+ if s.config.IgnoreInvalidBridges() {
+ return
+ }
+ // If not ignoring invalid bridges, still emit empty telemetry
+ s.emitBridgeStatus(ctx, bridgeName, EAResponse{}, jobs)
+}
+
+// pollBridge polls a single bridge's status endpoint
+func (s *Service) pollBridge(ctx context.Context, bridgeName string, bridgeURL string) {
+ s.eng.Debugw("Polling bridge", "bridge", bridgeName, "url", bridgeURL)
+
+ // Look up jobs associated with this bridge first
+ jobs, err := s.findJobsForBridge(ctx, bridgeName)
+ if err != nil {
+ s.eng.Warnw("Failed to find jobs for bridge", "bridge", bridgeName, "error", err)
+ jobs = []JobInfo{}
+ }
+
+ // Skip bridge if it has no jobs and ignoreJoblessBridges is enabled
+ if s.config.IgnoreJoblessBridges() && len(jobs) == 0 {
+ s.eng.Debugw("Skipping bridge with no jobs", "bridge", bridgeName, "ignoreJoblessBridges", true)
+ return
+ }
+
+ // Parse bridge URL and construct status endpoint
+ parsedURL, err := url.Parse(bridgeURL)
+ if err != nil {
+ s.handleBridgeError(ctx, bridgeName, jobs, "Failed to parse bridge URL", "bridge", bridgeName, "url", bridgeURL, "error", err)
+ return
+ }
+
+ // Construct status endpoint URL (bridge::8080/status)
+ statusURL := &url.URL{
+ Scheme: parsedURL.Scheme,
+ Host: parsedURL.Host,
+ Path: s.config.StatusPath(),
+ }
+
+ // Make HTTP request
+ req, err := http.NewRequestWithContext(ctx, "GET", statusURL.String(), nil)
+ if err != nil {
+ s.handleBridgeError(ctx, bridgeName, jobs, "Failed to create request for Bridge Status Reporter status", "bridge", bridgeName, "url", statusURL.String(), "error", err)
+ return
+ }
+
+ resp, err := s.httpClient.Do(req)
+ if err != nil {
+ s.handleBridgeError(ctx, bridgeName, jobs, "Failed to fetch Bridge Status Reporter status", "bridge", bridgeName, "url", statusURL.String(), "error", err)
+ return
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ s.handleBridgeError(ctx, bridgeName, jobs, "Bridge Status Reporter status endpoint returned non-200 status", "bridge", bridgeName, "url", statusURL.String(), "status", resp.StatusCode)
+ return
+ }
+
+ // Parse response
+ var status EAResponse
+ if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
+ s.handleBridgeError(ctx, bridgeName, jobs, "Failed to decode Bridge Status Reporter status", "bridge", bridgeName, "url", statusURL.String(), "error", err)
+ return
+ }
+
+ s.eng.Debugw("Successfully fetched Bridge Status Reporter status", "bridge", bridgeName, "adapter", status.Adapter.Name, "version", status.Adapter.Version)
+
+ // Emit telemetry to Beholder
+ s.emitBridgeStatus(ctx, bridgeName, status, jobs)
+}
+
+// emitBridgeStatus sends Bridge Status Reporter data to Beholder
+func (s *Service) emitBridgeStatus(ctx context.Context, bridgeName string, status EAResponse, jobs []JobInfo) {
+ // Convert runtime info
+ runtime := &events.RuntimeInfo{
+ NodeVersion: status.Runtime.NodeVersion,
+ Platform: status.Runtime.Platform,
+ Architecture: status.Runtime.Architecture,
+ Hostname: status.Runtime.Hostname,
+ }
+
+ // Convert metrics info
+ metrics := &events.MetricsInfo{
+ Enabled: status.Metrics.Enabled,
+ }
+
+ // Convert endpoints
+ endpointsProto := make([]*events.EndpointInfo, len(status.Endpoints))
+ for i, endpoint := range status.Endpoints {
+ endpointsProto[i] = &events.EndpointInfo{
+ Name: endpoint.Name,
+ Aliases: endpoint.Aliases,
+ Transports: endpoint.Transports,
+ }
+ }
+
+ // Convert configuration including values
+ configProto := make([]*events.ConfigurationItem, len(status.Configuration))
+ for i, config := range status.Configuration {
+ // Helper function to convert values to strings, handling nil (ternary-style)
+ safeString := func(v any) string {
+ if v == nil {
+ return ""
+ }
+ return fmt.Sprintf("%v", v)
+ }
+
+ configProto[i] = &events.ConfigurationItem{
+ Name: config.Name,
+ Value: safeString(config.Value),
+ Type: config.Type,
+ Description: config.Description,
+ Required: config.Required,
+ DefaultValue: safeString(config.Default),
+ CustomSetting: config.CustomSetting,
+ EnvDefaultOverride: safeString(config.EnvDefaultOverride),
+ }
+ }
+
+ // Convert jobs to protobuf JobInfo structs
+ jobsProto := make([]*events.JobInfo, 0, len(jobs))
+ for _, job := range jobs {
+ jobsProto = append(jobsProto, &events.JobInfo{
+ ExternalJobId: job.ExternalJobID,
+ JobName: job.Name,
+ })
+ }
+
+ // Create the protobuf event
+ event := &events.BridgeStatusEvent{
+ BridgeName: bridgeName,
+ AdapterName: status.Adapter.Name,
+ AdapterVersion: status.Adapter.Version,
+ AdapterUptimeSeconds: status.Adapter.UptimeSeconds,
+ DefaultEndpoint: status.DefaultEndpoint,
+ Runtime: runtime,
+ Metrics: metrics,
+ Endpoints: endpointsProto,
+ Configuration: configProto,
+ Jobs: jobsProto,
+ }
+
+ // Emit the protobuf event through the configured emitter
+ if err := events.EmitBridgeStatusEvent(ctx, s.emitter, event); err != nil {
+ s.eng.Warnw("Failed to emit Bridge Status Reporter protobuf data to Beholder", "bridge", bridgeName, "error", err)
+ return
+ }
+
+ s.eng.Debugw("Successfully emitted Bridge Status Reporter protobuf data to Beholder",
+ "bridge", bridgeName,
+ "adapter", status.Adapter.Name,
+ "version", status.Adapter.Version,
+ )
+}
+
+// findJobsForBridge finds jobs associated with a bridge name
+func (s *Service) findJobsForBridge(ctx context.Context, bridgeName string) ([]JobInfo, error) {
+ // Find job IDs that use this bridge
+ jobIDs, err := s.jobORM.FindJobIDsWithBridge(ctx, bridgeName)
+ if err != nil {
+ return nil, fmt.Errorf("failed to find jobs with bridge %s: %w", bridgeName, err)
+ }
+
+ if len(jobIDs) == 0 {
+ s.eng.Debugw("No jobs found for bridge", "bridge", bridgeName)
+ return []JobInfo{}, nil
+ }
+
+ // Convert job IDs to job info
+ jobs := make([]JobInfo, 0, len(jobIDs))
+ for _, jobID := range jobIDs {
+ job, err := s.jobORM.FindJob(ctx, jobID)
+ if err != nil {
+ s.eng.Debugw("Failed to find job", "jobID", jobID, "bridge", bridgeName, "error", err)
+ continue
+ }
+
+ // Get job name, use a default if not set
+ jobName := "unknown"
+ if job.Name.Valid && job.Name.String != "" {
+ jobName = job.Name.String
+ }
+
+ jobs = append(jobs, JobInfo{
+ ExternalJobID: job.ExternalJobID.String(),
+ Name: jobName,
+ })
+ }
+
+ s.eng.Debugw("Found jobs for bridge", "bridge", bridgeName, "count", len(jobs))
+
+ return jobs, nil
+}
diff --git a/core/services/nodestatusreporter/bridgestatus/bridge_status_reporter_test.go b/core/services/nodestatusreporter/bridgestatus/bridge_status_reporter_test.go
new file mode 100644
index 00000000000..e51c822839d
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/bridge_status_reporter_test.go
@@ -0,0 +1,1011 @@
+package bridgestatus
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "os"
+ "path/filepath"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+ "go.uber.org/zap/zapcore"
+ "google.golang.org/protobuf/proto"
+ "gopkg.in/guregu/null.v4"
+
+ "github.com/smartcontractkit/chainlink/v2/core/bridges"
+ bridgeMocks "github.com/smartcontractkit/chainlink/v2/core/bridges/mocks"
+ "github.com/smartcontractkit/chainlink/v2/core/logger"
+ "github.com/smartcontractkit/chainlink/v2/core/services/job"
+ jobMocks "github.com/smartcontractkit/chainlink/v2/core/services/job/mocks"
+ "github.com/smartcontractkit/chainlink/v2/core/services/nodestatusreporter/bridgestatus/events"
+ "github.com/smartcontractkit/chainlink/v2/core/services/nodestatusreporter/bridgestatus/mocks"
+ "github.com/smartcontractkit/chainlink/v2/core/store/models"
+)
+
+// Test constants and fixtures
+const (
+ testStatusPath = "/status"
+ testPollingInterval = 5 * time.Minute
+ testBridgeName1 = "bridge1"
+ testBridgeName2 = "bridge2"
+ testBridgeURL1 = "http://bridge1.example.com"
+ testBridgeURL2 = "http://bridge2.example.com"
+)
+
+// loadFixture loads a JSON fixture file
+func loadFixture(t *testing.T, filename string) string {
+ t.Helper()
+
+ fixturePath := filepath.Join("fixtures", filename)
+ data, err := os.ReadFile(fixturePath)
+ require.NoError(t, err, "Failed to read fixture file: %s", fixturePath)
+
+ return string(data)
+}
+
+// loadFixtureAsEAResponse loads and unmarshals fixture data
+func loadFixtureAsEAResponse(t *testing.T, filename string) EAResponse {
+ fixtureData := loadFixture(t, filename)
+
+ var status EAResponse
+ err := json.Unmarshal([]byte(fixtureData), &status)
+ require.NoError(t, err, "Failed to unmarshal test fixture")
+
+ return status
+}
+
+// parseWebURL creates WebURL from string
+func parseWebURL(s string) models.WebURL {
+ u, err := url.Parse(s)
+ if err != nil {
+ panic(err)
+ }
+ return models.WebURL(*u)
+}
+
+// Test fixtures
+var (
+ testBridge1 = bridges.BridgeType{
+ Name: bridges.MustParseBridgeName(testBridgeName1),
+ URL: parseWebURL(testBridgeURL1),
+ }
+ testBridge2 = bridges.BridgeType{
+ Name: bridges.MustParseBridgeName(testBridgeName2),
+ URL: parseWebURL(testBridgeURL2),
+ }
+ testBridges = []bridges.BridgeType{testBridge1, testBridge2}
+)
+
+// setupTestService creates a test service with mocks
+func setupTestService(t *testing.T, enabled bool, pollingInterval time.Duration, httpClient *http.Client) (*Service, *bridgeMocks.ORM, *jobMocks.ORM, *mocks.BeholderEmitter) {
+ t.Helper()
+
+ bridgeStatusConfig := mocks.NewTestBridgeStatusReporterConfig(enabled, testStatusPath, pollingInterval)
+
+ bridgeORM := bridgeMocks.NewORM(t)
+ jobORM := jobMocks.NewORM(t)
+ emitter := mocks.NewBeholderEmitter()
+ lggr := logger.TestLogger(t)
+
+ // Reduce log noise
+ lggr.SetLogLevel(zapcore.ErrorLevel)
+
+ service := NewBridgeStatusReporter(bridgeStatusConfig, bridgeORM, jobORM, httpClient, emitter, lggr)
+
+ return service, bridgeORM, jobORM, emitter
+}
+
+// setupTestServiceWithIgnoreFlags creates a test service with custom ignore flag settings
+func setupTestServiceWithIgnoreFlags(t *testing.T, enabled bool, pollingInterval time.Duration, httpClient *http.Client, ignoreInvalidBridges, ignoreJoblessBridges bool) (*Service, *bridgeMocks.ORM, *jobMocks.ORM, *mocks.BeholderEmitter) {
+ t.Helper()
+
+ bridgeStatusConfig := mocks.NewTestBridgeStatusReporterConfigWithSkip(enabled, testStatusPath, pollingInterval, ignoreInvalidBridges, ignoreJoblessBridges)
+
+ bridgeORM := bridgeMocks.NewORM(t)
+ jobORM := jobMocks.NewORM(t)
+ emitter := mocks.NewBeholderEmitter()
+ lggr := logger.TestLogger(t)
+
+ // Reduce log noise
+ lggr.SetLogLevel(zapcore.ErrorLevel)
+
+ service := NewBridgeStatusReporter(bridgeStatusConfig, bridgeORM, jobORM, httpClient, emitter, lggr)
+
+ return service, bridgeORM, jobORM, emitter
+}
+
+func TestNewBridgeStatusReporter(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, _, _ := setupTestService(t, true, testPollingInterval, httpClient)
+
+ assert.NotNil(t, service)
+ assert.Equal(t, ServiceName, service.Name())
+}
+
+func TestService_Start_Disabled(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, _, _ := setupTestService(t, false, testPollingInterval, httpClient)
+
+ ctx := context.Background()
+ err := service.Start(ctx)
+ require.NoError(t, err)
+
+ err = service.Close()
+ require.NoError(t, err)
+}
+
+func TestService_Start_Enabled(t *testing.T) {
+ httpClient := &http.Client{}
+ service, bridgeORM, jobORM, emitter := setupTestService(t, true, 100*time.Millisecond, httpClient)
+
+ // Mock the calls that will be triggered by the polling ticker
+ bridgeORM.On("BridgeTypes", mock.Anything, mock.Anything, mock.Anything).Return([]bridges.BridgeType{}, 0, nil).Maybe()
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, mock.AnythingOfType("string")).Return([]int32{}, nil).Maybe()
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
+
+ ctx := context.Background()
+ err := service.Start(ctx)
+ require.NoError(t, err)
+
+ err = service.Close()
+ require.NoError(t, err)
+}
+
+func TestService_HealthReport(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, _, _ := setupTestService(t, true, testPollingInterval, httpClient)
+
+ health := service.HealthReport()
+ assert.Contains(t, health, service.Name())
+}
+
+func TestService_pollAllBridges_NoBridges(t *testing.T) {
+ httpClient := &http.Client{}
+ service, bridgeORM, _, _ := setupTestService(t, true, testPollingInterval, httpClient)
+
+ bridgeORM.On("BridgeTypes", mock.Anything, 0, 1000).Return([]bridges.BridgeType{}, 0, nil)
+
+ ctx := context.Background()
+
+ // Should handle no bridges gracefully
+ assert.NotPanics(t, func() {
+ service.pollAllBridges(ctx)
+ })
+
+ bridgeORM.AssertExpectations(t)
+}
+
+func TestService_pollAllBridges_WithBridges(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient(loadFixture(t, "bridge_status_response.json"), http.StatusOK)
+ service, bridgeORM, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ bridgeORM.On("BridgeTypes", mock.Anything, 0, 1000).Return(testBridges, len(testBridges), nil)
+
+ // Mock job ORM calls for finding external job IDs
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, mock.AnythingOfType("string")).Return([]int32{}, nil)
+
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil)
+
+ ctx := context.Background()
+ service.pollAllBridges(ctx)
+
+ bridgeORM.AssertExpectations(t)
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+func TestService_pollAllBridges_FetchError(t *testing.T) {
+ httpClient := &http.Client{}
+ service, bridgeORM, _, _ := setupTestService(t, true, testPollingInterval, httpClient)
+
+ bridgeORM.On("BridgeTypes", mock.Anything, 0, 1000).Return([]bridges.BridgeType{}, 0, assert.AnError)
+
+ ctx := context.Background()
+
+ // Should handle bridge ORM error gracefully
+ assert.NotPanics(t, func() {
+ service.pollAllBridges(ctx)
+ })
+
+ bridgeORM.AssertExpectations(t)
+}
+
+func TestService_pollBridge_Success(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient(loadFixture(t, "bridge_status_response.json"), http.StatusOK)
+ service, _, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Mock job ORM calls for finding external job IDs
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "test-bridge").Return([]int32{}, nil)
+
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "test-bridge", "http://example.com")
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+func TestService_pollBridge_HTTPError(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Mock job ORM call that now happens at the start of pollBridge
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "test-bridge").Return([]int32{}, nil)
+
+ ctx := context.Background()
+
+ // Should handle HTTP error gracefully
+ assert.NotPanics(t, func() {
+ service.pollBridge(ctx, "test-bridge", "http://invalid.invalid:8080")
+ })
+
+ emitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
+ emitter.AssertNotCalled(t, "With", mock.Anything)
+ jobORM.AssertExpectations(t)
+}
+
+func TestService_pollBridge_InvalidJSON(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient("invalid json", http.StatusOK)
+ service, _, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Mock job ORM call that now happens at the start of pollBridge
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "test-bridge").Return([]int32{}, nil)
+
+ ctx := context.Background()
+
+ assert.NotPanics(t, func() {
+ service.pollBridge(ctx, "test-bridge", "http://invalid.invalid:8080")
+ })
+ emitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
+ emitter.AssertNotCalled(t, "With", mock.Anything)
+
+ jobORM.AssertExpectations(t)
+}
+
+func TestService_pollBridge_InvalidURL(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Mock job ORM call that now happens at the start of pollBridge
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "test-bridge").Return([]int32{}, nil)
+
+ ctx := context.Background()
+
+ assert.NotPanics(t, func() {
+ service.pollBridge(ctx, "test-bridge", "://invalid-url")
+ })
+
+ emitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
+ emitter.AssertNotCalled(t, "With", mock.Anything)
+
+ jobORM.AssertExpectations(t)
+}
+
+func TestService_pollBridge_Non200Status(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient("Not Found", http.StatusNotFound)
+ service, _, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Mock job ORM call that now happens at the start of pollBridge
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "test-bridge").Return([]int32{}, nil)
+
+ ctx := context.Background()
+
+ assert.NotPanics(t, func() {
+ service.pollBridge(ctx, "test-bridge", "http://example.com")
+ })
+
+ emitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
+ emitter.AssertNotCalled(t, "With", mock.Anything)
+ jobORM.AssertExpectations(t)
+}
+
+func TestService_emitBridgeStatus_Success(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, _, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil)
+
+ ctx := context.Background()
+ service.emitBridgeStatus(ctx, "test-bridge", loadFixtureAsEAResponse(t, "bridge_status_response.json"), []JobInfo{})
+
+ emitter.AssertExpectations(t)
+}
+
+func TestService_pollAllBridges_RefreshError(t *testing.T) {
+ httpClient := &http.Client{}
+ service, bridgeORM, _, _ := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Setup bridge ORM mock to return error
+ bridgeORM.On("BridgeTypes", mock.Anything, 0, 1000).Return([]bridges.BridgeType{}, 0, assert.AnError)
+
+ ctx := context.Background()
+
+ // Should handle bridge refresh error gracefully (no panic)
+ assert.NotPanics(t, func() {
+ service.pollAllBridges(ctx)
+ })
+
+ bridgeORM.AssertExpectations(t)
+}
+
+func TestService_pollAllBridges_MultipleBridges(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient(loadFixture(t, "bridge_status_response.json"), http.StatusOK)
+ service, bridgeORM, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Setup bridge ORM mock to return our test bridges
+ bridgeORM.On("BridgeTypes", mock.Anything, 0, 1000).Return(testBridges, len(testBridges), nil)
+
+ // Mock job ORM calls for finding external job IDs
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, mock.AnythingOfType("string")).Return([]int32{}, nil)
+
+ // Track emitted bridge names from protobuf events
+ var emittedBridgeNamesMutex sync.Mutex
+ emittedBridgeNames := []string{}
+
+ // Setup emitter mock to capture protobuf events and extract bridge names
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
+ // Unmarshal protobuf to extract bridge name
+ protobufBytes := args.Get(1).([]byte)
+ var event events.BridgeStatusEvent
+ if err := proto.Unmarshal(protobufBytes, &event); err == nil {
+ emittedBridgeNamesMutex.Lock()
+ emittedBridgeNames = append(emittedBridgeNames, event.BridgeName)
+ emittedBridgeNamesMutex.Unlock()
+ }
+ })
+
+ ctx := context.Background()
+ service.pollAllBridges(ctx)
+
+ bridgeORM.AssertExpectations(t)
+ jobORM.AssertExpectations(t)
+
+ // Verify we emitted events for both bridges
+ expectedBridgeNames := []string{testBridgeName1, testBridgeName2}
+ assert.ElementsMatch(t, expectedBridgeNames, emittedBridgeNames, "Should emit telemetry for each bridge")
+
+ emitter.AssertExpectations(t)
+}
+
+func TestService_emitBridgeStatus_CaptureOutput(t *testing.T) {
+ emitter := mocks.NewBeholderEmitter()
+ var capturedProtobufBytes []byte
+
+ // Capture protobuf metadata labels
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
+ capturedProtobufBytes = args.Get(1).([]byte)
+ })
+
+ config := mocks.NewTestBridgeStatusReporterConfig(true, "/status", 5*time.Minute)
+ service := NewBridgeStatusReporter(
+ config,
+ nil, // bridgeORM not needed for this test
+ nil, // jobORM not needed for this test
+ nil, // httpClient not needed for this test
+ emitter,
+ logger.TestLogger(t),
+ )
+
+ // Load fixture and emit
+ ctx := context.Background()
+ status := loadFixtureAsEAResponse(t, "bridge_status_response.json")
+ service.emitBridgeStatus(ctx, "test-bridge", status, []JobInfo{})
+
+ // Unmarshal and verify protobuf matches fixture values
+ require.NotEmpty(t, capturedProtobufBytes)
+ var event events.BridgeStatusEvent
+ err := proto.Unmarshal(capturedProtobufBytes, &event)
+ require.NoError(t, err)
+
+ // Verify key fields match fixture
+ assert.Equal(t, "test-bridge", event.BridgeName)
+ assert.Equal(t, status.Adapter.Name, event.AdapterName)
+ assert.Equal(t, status.Adapter.Version, event.AdapterVersion)
+ assert.InDelta(t, status.Adapter.UptimeSeconds, event.AdapterUptimeSeconds, 0.001)
+
+ // Verify Endpoints
+ for i, endpoint := range status.Endpoints {
+ assert.Equal(t, endpoint.Name, event.Endpoints[i].Name)
+ assert.Equal(t, endpoint.Aliases, event.Endpoints[i].Aliases)
+ assert.Equal(t, endpoint.Transports, event.Endpoints[i].Transports)
+ }
+
+ // Verify Default Endpoint
+ assert.Equal(t, status.DefaultEndpoint, event.DefaultEndpoint)
+
+ // Verify configuration
+ // Helper function to safely convert values to strings, handling nil (same as in production code)
+ safeString := func(v any) string {
+ if v == nil {
+ return ""
+ }
+ return fmt.Sprintf("%v", v)
+ }
+
+ for i, configuration := range status.Configuration {
+ assert.Equal(t, configuration.Name, event.Configuration[i].Name)
+ assert.Equal(t, safeString(configuration.Value), event.Configuration[i].Value) // Values are converted to strings
+ assert.Equal(t, configuration.Type, event.Configuration[i].Type)
+ assert.Equal(t, configuration.Description, event.Configuration[i].Description)
+ assert.Equal(t, configuration.Required, event.Configuration[i].Required)
+ assert.Equal(t, safeString(configuration.Default), event.Configuration[i].DefaultValue) // Defaults converted to strings
+ assert.Equal(t, configuration.CustomSetting, event.Configuration[i].CustomSetting)
+ assert.Equal(t, safeString(configuration.EnvDefaultOverride), event.Configuration[i].EnvDefaultOverride) // Overrides converted to strings
+ }
+
+ // Verify Runtime
+ assert.Equal(t, status.Runtime.NodeVersion, event.Runtime.NodeVersion)
+ assert.Equal(t, status.Runtime.Platform, event.Runtime.Platform)
+ assert.Equal(t, status.Runtime.Architecture, event.Runtime.Architecture)
+ assert.Equal(t, status.Runtime.Hostname, event.Runtime.Hostname)
+
+ // Verify Metrics
+ assert.Equal(t, status.Metrics.Enabled, event.Metrics.Enabled)
+
+ emitter.AssertExpectations(t)
+}
+
+func TestService_Start_AlreadyStarted(t *testing.T) {
+ httpClient := &http.Client{}
+ service, bridgeORM, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Mock the calls that will be triggered by the polling ticker
+ bridgeORM.On("BridgeTypes", mock.Anything, mock.Anything, mock.Anything).Return([]bridges.BridgeType{}, 0, nil).Maybe()
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, mock.AnythingOfType("string")).Return([]int32{}, nil).Maybe()
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
+
+ ctx := context.Background()
+
+ err := service.Start(ctx)
+ require.NoError(t, err)
+ err = service.Start(ctx)
+ // services.StateMachine prevents double start, should return error
+ require.Error(t, err)
+
+ // Clean up
+ err = service.Close()
+ require.NoError(t, err)
+}
+
+func TestService_Close_AlreadyClosed(t *testing.T) {
+ httpClient := &http.Client{}
+ service, bridgeORM, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Mock the calls that will be triggered by the polling ticker
+ bridgeORM.On("BridgeTypes", mock.Anything, mock.Anything, mock.Anything).Return([]bridges.BridgeType{}, 0, nil).Maybe()
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, mock.AnythingOfType("string")).Return([]int32{}, nil).Maybe()
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
+
+ ctx := context.Background()
+
+ err := service.Start(ctx)
+ require.NoError(t, err)
+
+ err = service.Close()
+ require.NoError(t, err)
+ err = service.Close()
+
+ // services.StateMachine prevents double close, should return error
+ require.Error(t, err)
+}
+
+func TestService_PollAllBridges_3000Bridges(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient(loadFixture(t, "bridge_status_response.json"), http.StatusOK)
+ service, mockORM, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ numBridges := 3000
+ var allBridges []bridges.BridgeType
+ for i := 0; i < numBridges; i++ {
+ u, _ := url.Parse(fmt.Sprintf("http://bridge%d.example.com", i))
+ bridge := bridges.BridgeType{
+ Name: bridges.MustParseBridgeName(fmt.Sprintf("bridge%d", i)),
+ URL: models.WebURL(*u),
+ }
+ allBridges = append(allBridges, bridge)
+ }
+
+ // Page 1: bridges 0-999 (1000 bridges)
+ page1 := allBridges[0:1000]
+ mockORM.On("BridgeTypes", mock.Anything, 0, bridgePollPageSize).Return(page1, 3000, nil).Once()
+
+ // Page 2: bridges 1000-1999 (1000 bridges)
+ page2 := allBridges[1000:2000]
+ mockORM.On("BridgeTypes", mock.Anything, 1000, bridgePollPageSize).Return(page2, 3000, nil).Once()
+
+ // Page 3: bridges 2000-2999 (1000 bridges)
+ page3 := allBridges[2000:3000]
+ mockORM.On("BridgeTypes", mock.Anything, 2000, bridgePollPageSize).Return(page3, 3000, nil).Once()
+
+ // Page 4: empty (end of results)
+ mockORM.On("BridgeTypes", mock.Anything, 3000, bridgePollPageSize).Return([]bridges.BridgeType{}, 3000, nil).Once()
+
+ // Mock job ORM calls for finding external job IDs for all bridges
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, mock.AnythingOfType("string")).Return([]int32{}, nil).Times(numBridges)
+
+ // Expect 3000 telemetry emissions
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(numBridges)
+
+ ctx := context.Background()
+
+ service.pollAllBridges(ctx)
+ mockORM.AssertExpectations(t)
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+func TestService_PollAllBridges_ContextTimeout(t *testing.T) {
+ httpClient := &http.Client{}
+ service, mockORM, jobORM, _ := setupTestService(t, true, testPollingInterval, httpClient)
+
+ numBridges := 5
+ var allBridges []bridges.BridgeType
+ for i := 0; i < numBridges; i++ {
+ u, _ := url.Parse(fmt.Sprintf("http://bridge%d.example.com", i))
+ bridge := bridges.BridgeType{
+ Name: bridges.MustParseBridgeName(fmt.Sprintf("bridge%d", i)),
+ URL: models.WebURL(*u),
+ }
+ allBridges = append(allBridges, bridge)
+ }
+
+ mockORM.On("BridgeTypes", mock.Anything, 0, bridgePollPageSize).Return(allBridges, numBridges, nil).Once()
+
+ // Mock job ORM calls for each bridge
+ for i := 0; i < numBridges; i++ {
+ bridgeName := fmt.Sprintf("bridge%d", i)
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, bridgeName).Return([]int32{}, nil)
+ }
+
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ assert.Fail(t, "HTTP handler should not complete due to context cancellation")
+ }))
+ defer server.Close()
+
+ serverURL, _ := url.Parse(server.URL)
+ for i := range allBridges {
+ allBridges[i].URL = models.WebURL(*serverURL)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ service.pollAllBridges(ctx)
+ mockORM.AssertExpectations(t)
+}
+
+func TestService_emitBridgeStatus_EmptyFields(t *testing.T) {
+ emitter := mocks.NewBeholderEmitter()
+ var capturedProtobufBytes []byte
+
+ // Capture protobuf metadata labels
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
+ capturedProtobufBytes = args.Get(1).([]byte)
+ })
+
+ config := mocks.NewTestBridgeStatusReporterConfig(true, "/status", 5*time.Minute)
+ service := NewBridgeStatusReporter(
+ config,
+ nil, // bridgeORM not needed for this test
+ nil, // jobORM not needed for this test
+ nil, // httpClient not needed for this test
+ emitter,
+ logger.TestLogger(t),
+ )
+
+ // Load empty fixture and emit
+ ctx := context.Background()
+ status := loadFixtureAsEAResponse(t, "bridge_status_empty.json")
+ service.emitBridgeStatus(ctx, "empty-bridge", status, []JobInfo{})
+
+ // Unmarshal and verify protobuf handles empty values correctly
+ require.NotEmpty(t, capturedProtobufBytes)
+ var event events.BridgeStatusEvent
+ err := proto.Unmarshal(capturedProtobufBytes, &event)
+ require.NoError(t, err)
+
+ // Verify empty/minimal values are handled correctly
+ assert.Equal(t, "empty-bridge", event.BridgeName)
+ assert.Empty(t, event.AdapterName)
+ assert.Empty(t, event.AdapterVersion)
+ assert.InDelta(t, float64(0), event.AdapterUptimeSeconds, 0.001)
+ assert.Empty(t, event.DefaultEndpoint)
+
+ // Verify empty runtime info
+ require.NotNil(t, event.Runtime)
+ assert.Empty(t, event.Runtime.NodeVersion)
+ assert.Empty(t, event.Runtime.Platform)
+ assert.Empty(t, event.Runtime.Architecture)
+ assert.Empty(t, event.Runtime.Hostname)
+
+ // Verify metrics with false enabled
+ require.NotNil(t, event.Metrics)
+ assert.False(t, event.Metrics.Enabled)
+
+ // Verify empty arrays
+ assert.Empty(t, event.Endpoints)
+ assert.Empty(t, event.Configuration)
+
+ emitter.AssertExpectations(t)
+}
+
+// Test for external job IDs and job names functionality
+func TestService_pollBridge_WithJobInfo(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient(loadFixture(t, "bridge_status_response.json"), http.StatusOK)
+ service, _, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Create test job IDs and external job UUIDs
+ testJobIDs := []int32{1, 2}
+
+ testJob1 := job.Job{
+ ID: 1,
+ ExternalJobID: uuid.New(),
+ Name: null.StringFrom("BTC/USD Price Feed"),
+ }
+ testJob2 := job.Job{
+ ID: 2,
+ ExternalJobID: uuid.New(),
+ Name: null.StringFrom("ETH/USD Price Feed"),
+ }
+
+ // Mock job ORM calls
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "test-bridge").Return(testJobIDs, nil)
+ jobORM.On("FindJob", mock.Anything, int32(1)).Return(testJob1, nil)
+ jobORM.On("FindJob", mock.Anything, int32(2)).Return(testJob2, nil)
+
+ // Capture the emitted protobuf to verify job information
+ var capturedProtobufBytes []byte
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
+ capturedProtobufBytes = args.Get(1).([]byte)
+ })
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "test-bridge", "http://example.com")
+
+ // Verify the job information (IDs and names) were included in the protobuf
+ require.NotEmpty(t, capturedProtobufBytes)
+ var event events.BridgeStatusEvent
+ err := proto.Unmarshal(capturedProtobufBytes, &event)
+ require.NoError(t, err)
+
+ require.Len(t, event.Jobs, 2)
+ expectedJobs := []*events.JobInfo{
+ {ExternalJobId: testJob1.ExternalJobID.String(), JobName: "BTC/USD Price Feed"},
+ {ExternalJobId: testJob2.ExternalJobID.String(), JobName: "ETH/USD Price Feed"},
+ }
+ assert.ElementsMatch(t, expectedJobs, event.Jobs)
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+func TestService_pollBridge_JobORMError(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient(loadFixture(t, "bridge_status_response.json"), http.StatusOK)
+ service, _, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Mock job ORM to return error
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "test-bridge").Return([]int32{}, assert.AnError)
+
+ // Should still emit telemetry with empty external job IDs
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "test-bridge", "http://example.com")
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+// Test ignoreJoblessBridges functionality
+func TestService_pollBridge_IgnoreJoblessBridges_Enabled(t *testing.T) {
+ // Use a nil httpClient since no HTTP request should be made when bridge is skipped for having no jobs
+ httpClient := &http.Client{}
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, true, true)
+
+ // Mock job ORM to return no job IDs (jobless bridge)
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "jobless-bridge").Return([]int32{}, nil)
+
+ // Should NOT emit telemetry for jobless bridge when ignoreJoblessBridges is true
+ emitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "jobless-bridge", "http://example.com")
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+func TestService_pollBridge_IgnoreJoblessBridges_Disabled(t *testing.T) {
+ // Use valid HTTP client with successful response since we want the full flow when ignoreJoblessBridges is false
+ httpClient := mocks.NewMockHTTPClient(loadFixture(t, "bridge_status_response.json"), http.StatusOK)
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, true, false)
+
+ // Mock job ORM to return no job IDs (jobless bridge)
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "jobless-bridge").Return([]int32{}, nil)
+
+ // Should emit telemetry for jobless bridge when ignoreJoblessBridges is false
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "jobless-bridge", "http://example.com")
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+// Test ignoreInvalidBridges functionality - HTTP error
+func TestService_pollBridge_IgnoreInvalidBridges_HTTPError_Enabled(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, true, false)
+
+ // Create test job and external job UUID for FindJob mock
+ testJob := job.Job{ID: 1, ExternalJobID: uuid.New(), Name: null.StringFrom("BTC/USD Price Feed")}
+
+ // Mock job ORM calls
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "invalid-bridge").Return([]int32{1}, nil)
+ jobORM.On("FindJob", mock.Anything, int32(1)).Return(testJob, nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "invalid-bridge", "http://invalid.invalid:8080")
+
+ // Should NOT emit telemetry for invalid bridge when ignoreInvalidBridges is true
+ emitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
+ emitter.AssertNotCalled(t, "With", mock.Anything)
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+func TestService_pollBridge_IgnoreInvalidBridges_HTTPError_Disabled(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, false, false)
+
+ // Create test job and external job UUID for FindJob mock
+ testJob := job.Job{ID: 1, ExternalJobID: uuid.New(), Name: null.StringFrom("BTC/USD Price Feed")}
+
+ // Mock job ORM calls
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "invalid-bridge").Return([]int32{1}, nil)
+ jobORM.On("FindJob", mock.Anything, int32(1)).Return(testJob, nil)
+
+ // Should emit empty telemetry for invalid bridge when ignoreInvalidBridges is false
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "invalid-bridge", "http://invalid.invalid:8080") // This will fail with HTTP error
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+// Test ignoreInvalidBridges functionality - Non-200 status
+func TestService_pollBridge_IgnoreInvalidBridges_Non200Status_Enabled(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient("Not Found", http.StatusNotFound)
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, true, false)
+
+ // Create test job and external job UUID for FindJob mock
+ testJob := job.Job{ID: 1, ExternalJobID: uuid.New(), Name: null.StringFrom("BTC/USD Price Feed")}
+
+ // Mock job ORM calls
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "invalid-bridge").Return([]int32{1}, nil)
+ jobORM.On("FindJob", mock.Anything, int32(1)).Return(testJob, nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "invalid-bridge", "http://example.com")
+
+ // Should NOT emit telemetry for invalid bridge when ignoreInvalidBridges is true
+ emitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
+ emitter.AssertNotCalled(t, "With", mock.Anything)
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+func TestService_pollBridge_IgnoreInvalidBridges_Non200Status_Disabled(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient("Not Found", http.StatusNotFound)
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, false, false)
+
+ // Create test job and external job UUID for FindJob mock
+ testJob := job.Job{ID: 1, ExternalJobID: uuid.New(), Name: null.StringFrom("BTC/USD Price Feed")}
+
+ // Mock job ORM calls
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "invalid-bridge").Return([]int32{1}, nil)
+ jobORM.On("FindJob", mock.Anything, int32(1)).Return(testJob, nil)
+
+ // Should emit empty telemetry for invalid bridge when ignoreInvalidBridges is false
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "invalid-bridge", "http://example.com")
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+// Test ignoreInvalidBridges functionality - Invalid JSON
+func TestService_pollBridge_IgnoreInvalidBridges_InvalidJSON_Enabled(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient("invalid json", http.StatusOK)
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, true, false)
+
+ // Create test job and external job UUID for FindJob mock
+ testJob := job.Job{ID: 1, ExternalJobID: uuid.New(), Name: null.StringFrom("BTC/USD Price Feed")}
+
+ // Mock job ORM calls
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "invalid-bridge").Return([]int32{1}, nil)
+ jobORM.On("FindJob", mock.Anything, int32(1)).Return(testJob, nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "invalid-bridge", "http://example.com")
+
+ // Should NOT emit telemetry for invalid bridge when ignoreInvalidBridges is true
+ emitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
+ emitter.AssertNotCalled(t, "With", mock.Anything)
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+func TestService_pollBridge_IgnoreInvalidBridges_InvalidJSON_Disabled(t *testing.T) {
+ httpClient := mocks.NewMockHTTPClient("invalid json", http.StatusOK)
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, false, false)
+
+ // Create test job and external job UUID for FindJob mock
+ testJob := job.Job{ID: 1, ExternalJobID: uuid.New(), Name: null.StringFrom("BTC/USD Price Feed")}
+
+ // Mock job ORM calls
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "invalid-bridge").Return([]int32{1}, nil)
+ jobORM.On("FindJob", mock.Anything, int32(1)).Return(testJob, nil)
+
+ // Should emit empty telemetry for invalid bridge when ignoreInvalidBridges is false
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "invalid-bridge", "http://example.com")
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+// Test combined functionality
+func TestService_pollBridge_BothIgnoreFlags_Enabled(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, true, true)
+
+ // Mock job ORM to return no job IDs (jobless bridge)
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "jobless-invalid-bridge").Return([]int32{}, nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "jobless-invalid-bridge", "http://invalid.invalid:8080") // This would fail with HTTP error too
+
+ // Should NOT emit telemetry - skipped because of no jobs (ignoreJoblessBridges)
+ emitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
+ emitter.AssertNotCalled(t, "With", mock.Anything)
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+func TestService_pollBridge_BothIgnoreFlags_Disabled(t *testing.T) {
+ httpClient := &http.Client{}
+ service, _, jobORM, emitter := setupTestServiceWithIgnoreFlags(t, true, testPollingInterval, httpClient, false, false)
+
+ // Mock job ORM to return no job IDs (jobless bridge)
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "jobless-invalid-bridge").Return([]int32{}, nil)
+
+ // Should emit empty telemetry even for jobless invalid bridge when both flags are false
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil)
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "jobless-invalid-bridge", "http://invalid.invalid:8080") // This will fail with HTTP error
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
+
+// TestService_pollBridge_EndToEnd_RealWebServer tests the complete flow with a real HTTP server
+func TestService_pollBridge_EndToEnd_RealWebServer(t *testing.T) {
+ // Create a test HTTP server that serves fixture data
+ fixtureData := loadFixture(t, "bridge_status_response.json")
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ // Verify the request path
+ assert.Equal(t, testStatusPath, r.URL.Path)
+ assert.Equal(t, "GET", r.Method)
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte(fixtureData))
+ }))
+ defer server.Close()
+
+ // Use real HTTP client (not mock)
+ httpClient := &http.Client{}
+ service, _, jobORM, emitter := setupTestService(t, true, testPollingInterval, httpClient)
+
+ // Mock job ORM calls
+ jobORM.On("FindJobIDsWithBridge", mock.Anything, "test-bridge").Return([]int32{}, nil)
+
+ // Capture the emitted protobuf to verify end-to-end flow
+ var capturedProtobufBytes []byte
+ emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
+ capturedProtobufBytes = args.Get(1).([]byte)
+ })
+
+ ctx := context.Background()
+ service.pollBridge(ctx, "test-bridge", server.URL)
+
+ // Verify the complete end-to-end flow worked
+ require.NotEmpty(t, capturedProtobufBytes, "Should have emitted protobuf data")
+
+ var event events.BridgeStatusEvent
+ err := proto.Unmarshal(capturedProtobufBytes, &event)
+ require.NoError(t, err, "Should be able to unmarshal protobuf")
+
+ // Verify the data from fixture made it through the complete pipeline
+ expectedStatus := loadFixtureAsEAResponse(t, "bridge_status_response.json")
+
+ // Verify basic bridge info
+ assert.Equal(t, "test-bridge", event.BridgeName)
+ assert.Equal(t, expectedStatus.Adapter.Name, event.AdapterName)
+ assert.Equal(t, expectedStatus.Adapter.Version, event.AdapterVersion)
+ assert.InDelta(t, expectedStatus.Adapter.UptimeSeconds, event.AdapterUptimeSeconds, 0.001)
+ assert.Equal(t, expectedStatus.DefaultEndpoint, event.DefaultEndpoint)
+
+ // Verify endpoints - loop through fixture and compare with protobuf
+ require.Len(t, event.Endpoints, len(expectedStatus.Endpoints))
+ for i, expectedEndpoint := range expectedStatus.Endpoints {
+ actualEndpoint := event.Endpoints[i]
+ assert.Equal(t, expectedEndpoint.Name, actualEndpoint.Name)
+ assert.Equal(t, expectedEndpoint.Aliases, actualEndpoint.Aliases)
+ assert.Equal(t, expectedEndpoint.Transports, actualEndpoint.Transports)
+ }
+
+ // Verify configuration - loop through fixture and compare with protobuf
+ // Helper function to safely convert values to strings, handling nil (same as in production code)
+ safeString := func(v any) string {
+ if v == nil {
+ return ""
+ }
+ return fmt.Sprintf("%v", v)
+ }
+
+ require.Len(t, event.Configuration, len(expectedStatus.Configuration))
+ for i, expectedConfig := range expectedStatus.Configuration {
+ actualConfig := event.Configuration[i]
+ assert.Equal(t, expectedConfig.Name, actualConfig.Name)
+ assert.Equal(t, safeString(expectedConfig.Value), actualConfig.Value)
+ assert.Equal(t, expectedConfig.Type, actualConfig.Type)
+ assert.Equal(t, expectedConfig.Description, actualConfig.Description)
+ assert.Equal(t, expectedConfig.Required, actualConfig.Required)
+ assert.Equal(t, safeString(expectedConfig.Default), actualConfig.DefaultValue)
+ assert.Equal(t, expectedConfig.CustomSetting, actualConfig.CustomSetting)
+ assert.Equal(t, safeString(expectedConfig.EnvDefaultOverride), actualConfig.EnvDefaultOverride)
+ }
+
+ // Verify runtime info
+ require.NotNil(t, event.Runtime)
+ assert.Equal(t, expectedStatus.Runtime.NodeVersion, event.Runtime.NodeVersion)
+ assert.Equal(t, expectedStatus.Runtime.Platform, event.Runtime.Platform)
+ assert.Equal(t, expectedStatus.Runtime.Architecture, event.Runtime.Architecture)
+ assert.Equal(t, expectedStatus.Runtime.Hostname, event.Runtime.Hostname)
+
+ // Verify metrics info
+ require.NotNil(t, event.Metrics)
+ assert.Equal(t, expectedStatus.Metrics.Enabled, event.Metrics.Enabled)
+
+ // Verify job info is included
+ assert.Empty(t, event.Jobs)
+
+ // Verify timestamp is set
+ assert.NotEmpty(t, event.Timestamp)
+
+ jobORM.AssertExpectations(t)
+ emitter.AssertExpectations(t)
+}
diff --git a/core/services/nodestatusreporter/bridgestatus/events/bridge_status.pb.go b/core/services/nodestatusreporter/bridgestatus/events/bridge_status.pb.go
new file mode 100644
index 00000000000..c0ed96ebf06
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/events/bridge_status.pb.go
@@ -0,0 +1,582 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.36.6
+// protoc v5.29.3
+// source: bridge_status.proto
+
+package events
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+ unsafe "unsafe"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// JobInfo represents job information associated with a bridge
+type JobInfo struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ ExternalJobId string `protobuf:"bytes,1,opt,name=external_job_id,json=externalJobId,proto3" json:"external_job_id,omitempty"`
+ JobName string `protobuf:"bytes,2,opt,name=job_name,json=jobName,proto3" json:"job_name,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *JobInfo) Reset() {
+ *x = JobInfo{}
+ mi := &file_bridge_status_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *JobInfo) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*JobInfo) ProtoMessage() {}
+
+func (x *JobInfo) ProtoReflect() protoreflect.Message {
+ mi := &file_bridge_status_proto_msgTypes[0]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use JobInfo.ProtoReflect.Descriptor instead.
+func (*JobInfo) Descriptor() ([]byte, []int) {
+ return file_bridge_status_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *JobInfo) GetExternalJobId() string {
+ if x != nil {
+ return x.ExternalJobId
+ }
+ return ""
+}
+
+func (x *JobInfo) GetJobName() string {
+ if x != nil {
+ return x.JobName
+ }
+ return ""
+}
+
+// BridgeStatusEvent represents the status data from an External Adapter
+type BridgeStatusEvent struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ // Bridge and adapter identification
+ BridgeName string `protobuf:"bytes,1,opt,name=bridge_name,json=bridgeName,proto3" json:"bridge_name,omitempty"`
+ AdapterName string `protobuf:"bytes,2,opt,name=adapter_name,json=adapterName,proto3" json:"adapter_name,omitempty"`
+ AdapterVersion string `protobuf:"bytes,3,opt,name=adapter_version,json=adapterVersion,proto3" json:"adapter_version,omitempty"`
+ AdapterUptimeSeconds float64 `protobuf:"fixed64,4,opt,name=adapter_uptime_seconds,json=adapterUptimeSeconds,proto3" json:"adapter_uptime_seconds,omitempty"`
+ DefaultEndpoint string `protobuf:"bytes,5,opt,name=default_endpoint,json=defaultEndpoint,proto3" json:"default_endpoint,omitempty"`
+ // Runtime information
+ Runtime *RuntimeInfo `protobuf:"bytes,6,opt,name=runtime,proto3" json:"runtime,omitempty"`
+ // Metrics configuration
+ Metrics *MetricsInfo `protobuf:"bytes,7,opt,name=metrics,proto3" json:"metrics,omitempty"`
+ // Available endpoints as structured data
+ Endpoints []*EndpointInfo `protobuf:"bytes,8,rep,name=endpoints,proto3" json:"endpoints,omitempty"`
+ // Configuration items
+ Configuration []*ConfigurationItem `protobuf:"bytes,9,rep,name=configuration,proto3" json:"configuration,omitempty"`
+ // Jobs associated with this bridge
+ Jobs []*JobInfo `protobuf:"bytes,10,rep,name=jobs,proto3" json:"jobs,omitempty"`
+ // Event metadata
+ Timestamp string `protobuf:"bytes,11,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *BridgeStatusEvent) Reset() {
+ *x = BridgeStatusEvent{}
+ mi := &file_bridge_status_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *BridgeStatusEvent) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*BridgeStatusEvent) ProtoMessage() {}
+
+func (x *BridgeStatusEvent) ProtoReflect() protoreflect.Message {
+ mi := &file_bridge_status_proto_msgTypes[1]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use BridgeStatusEvent.ProtoReflect.Descriptor instead.
+func (*BridgeStatusEvent) Descriptor() ([]byte, []int) {
+ return file_bridge_status_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *BridgeStatusEvent) GetBridgeName() string {
+ if x != nil {
+ return x.BridgeName
+ }
+ return ""
+}
+
+func (x *BridgeStatusEvent) GetAdapterName() string {
+ if x != nil {
+ return x.AdapterName
+ }
+ return ""
+}
+
+func (x *BridgeStatusEvent) GetAdapterVersion() string {
+ if x != nil {
+ return x.AdapterVersion
+ }
+ return ""
+}
+
+func (x *BridgeStatusEvent) GetAdapterUptimeSeconds() float64 {
+ if x != nil {
+ return x.AdapterUptimeSeconds
+ }
+ return 0
+}
+
+func (x *BridgeStatusEvent) GetDefaultEndpoint() string {
+ if x != nil {
+ return x.DefaultEndpoint
+ }
+ return ""
+}
+
+func (x *BridgeStatusEvent) GetRuntime() *RuntimeInfo {
+ if x != nil {
+ return x.Runtime
+ }
+ return nil
+}
+
+func (x *BridgeStatusEvent) GetMetrics() *MetricsInfo {
+ if x != nil {
+ return x.Metrics
+ }
+ return nil
+}
+
+func (x *BridgeStatusEvent) GetEndpoints() []*EndpointInfo {
+ if x != nil {
+ return x.Endpoints
+ }
+ return nil
+}
+
+func (x *BridgeStatusEvent) GetConfiguration() []*ConfigurationItem {
+ if x != nil {
+ return x.Configuration
+ }
+ return nil
+}
+
+func (x *BridgeStatusEvent) GetJobs() []*JobInfo {
+ if x != nil {
+ return x.Jobs
+ }
+ return nil
+}
+
+func (x *BridgeStatusEvent) GetTimestamp() string {
+ if x != nil {
+ return x.Timestamp
+ }
+ return ""
+}
+
+type RuntimeInfo struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ NodeVersion string `protobuf:"bytes,1,opt,name=node_version,json=nodeVersion,proto3" json:"node_version,omitempty"`
+ Platform string `protobuf:"bytes,2,opt,name=platform,proto3" json:"platform,omitempty"`
+ Architecture string `protobuf:"bytes,3,opt,name=architecture,proto3" json:"architecture,omitempty"`
+ Hostname string `protobuf:"bytes,4,opt,name=hostname,proto3" json:"hostname,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *RuntimeInfo) Reset() {
+ *x = RuntimeInfo{}
+ mi := &file_bridge_status_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *RuntimeInfo) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*RuntimeInfo) ProtoMessage() {}
+
+func (x *RuntimeInfo) ProtoReflect() protoreflect.Message {
+ mi := &file_bridge_status_proto_msgTypes[2]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use RuntimeInfo.ProtoReflect.Descriptor instead.
+func (*RuntimeInfo) Descriptor() ([]byte, []int) {
+ return file_bridge_status_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *RuntimeInfo) GetNodeVersion() string {
+ if x != nil {
+ return x.NodeVersion
+ }
+ return ""
+}
+
+func (x *RuntimeInfo) GetPlatform() string {
+ if x != nil {
+ return x.Platform
+ }
+ return ""
+}
+
+func (x *RuntimeInfo) GetArchitecture() string {
+ if x != nil {
+ return x.Architecture
+ }
+ return ""
+}
+
+func (x *RuntimeInfo) GetHostname() string {
+ if x != nil {
+ return x.Hostname
+ }
+ return ""
+}
+
+type MetricsInfo struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *MetricsInfo) Reset() {
+ *x = MetricsInfo{}
+ mi := &file_bridge_status_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *MetricsInfo) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*MetricsInfo) ProtoMessage() {}
+
+func (x *MetricsInfo) ProtoReflect() protoreflect.Message {
+ mi := &file_bridge_status_proto_msgTypes[3]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use MetricsInfo.ProtoReflect.Descriptor instead.
+func (*MetricsInfo) Descriptor() ([]byte, []int) {
+ return file_bridge_status_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *MetricsInfo) GetEnabled() bool {
+ if x != nil {
+ return x.Enabled
+ }
+ return false
+}
+
+type EndpointInfo struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ Aliases []string `protobuf:"bytes,2,rep,name=aliases,proto3" json:"aliases,omitempty"`
+ Transports []string `protobuf:"bytes,3,rep,name=transports,proto3" json:"transports,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *EndpointInfo) Reset() {
+ *x = EndpointInfo{}
+ mi := &file_bridge_status_proto_msgTypes[4]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *EndpointInfo) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*EndpointInfo) ProtoMessage() {}
+
+func (x *EndpointInfo) ProtoReflect() protoreflect.Message {
+ mi := &file_bridge_status_proto_msgTypes[4]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use EndpointInfo.ProtoReflect.Descriptor instead.
+func (*EndpointInfo) Descriptor() ([]byte, []int) {
+ return file_bridge_status_proto_rawDescGZIP(), []int{4}
+}
+
+func (x *EndpointInfo) GetName() string {
+ if x != nil {
+ return x.Name
+ }
+ return ""
+}
+
+func (x *EndpointInfo) GetAliases() []string {
+ if x != nil {
+ return x.Aliases
+ }
+ return nil
+}
+
+func (x *EndpointInfo) GetTransports() []string {
+ if x != nil {
+ return x.Transports
+ }
+ return nil
+}
+
+type ConfigurationItem struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+ Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"`
+ Description string `protobuf:"bytes,4,opt,name=description,proto3" json:"description,omitempty"`
+ Required bool `protobuf:"varint,5,opt,name=required,proto3" json:"required,omitempty"`
+ DefaultValue string `protobuf:"bytes,6,opt,name=default_value,json=defaultValue,proto3" json:"default_value,omitempty"`
+ CustomSetting bool `protobuf:"varint,7,opt,name=custom_setting,json=customSetting,proto3" json:"custom_setting,omitempty"`
+ EnvDefaultOverride string `protobuf:"bytes,8,opt,name=env_default_override,json=envDefaultOverride,proto3" json:"env_default_override,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *ConfigurationItem) Reset() {
+ *x = ConfigurationItem{}
+ mi := &file_bridge_status_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *ConfigurationItem) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ConfigurationItem) ProtoMessage() {}
+
+func (x *ConfigurationItem) ProtoReflect() protoreflect.Message {
+ mi := &file_bridge_status_proto_msgTypes[5]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ConfigurationItem.ProtoReflect.Descriptor instead.
+func (*ConfigurationItem) Descriptor() ([]byte, []int) {
+ return file_bridge_status_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *ConfigurationItem) GetName() string {
+ if x != nil {
+ return x.Name
+ }
+ return ""
+}
+
+func (x *ConfigurationItem) GetValue() string {
+ if x != nil {
+ return x.Value
+ }
+ return ""
+}
+
+func (x *ConfigurationItem) GetType() string {
+ if x != nil {
+ return x.Type
+ }
+ return ""
+}
+
+func (x *ConfigurationItem) GetDescription() string {
+ if x != nil {
+ return x.Description
+ }
+ return ""
+}
+
+func (x *ConfigurationItem) GetRequired() bool {
+ if x != nil {
+ return x.Required
+ }
+ return false
+}
+
+func (x *ConfigurationItem) GetDefaultValue() string {
+ if x != nil {
+ return x.DefaultValue
+ }
+ return ""
+}
+
+func (x *ConfigurationItem) GetCustomSetting() bool {
+ if x != nil {
+ return x.CustomSetting
+ }
+ return false
+}
+
+func (x *ConfigurationItem) GetEnvDefaultOverride() string {
+ if x != nil {
+ return x.EnvDefaultOverride
+ }
+ return ""
+}
+
+var File_bridge_status_proto protoreflect.FileDescriptor
+
+const file_bridge_status_proto_rawDesc = "" +
+ "\n" +
+ "\x13bridge_status.proto\x12\x10bridge_status.v1\"L\n" +
+ "\aJobInfo\x12&\n" +
+ "\x0fexternal_job_id\x18\x01 \x01(\tR\rexternalJobId\x12\x19\n" +
+ "\bjob_name\x18\x02 \x01(\tR\ajobName\"\xa9\x04\n" +
+ "\x11BridgeStatusEvent\x12\x1f\n" +
+ "\vbridge_name\x18\x01 \x01(\tR\n" +
+ "bridgeName\x12!\n" +
+ "\fadapter_name\x18\x02 \x01(\tR\vadapterName\x12'\n" +
+ "\x0fadapter_version\x18\x03 \x01(\tR\x0eadapterVersion\x124\n" +
+ "\x16adapter_uptime_seconds\x18\x04 \x01(\x01R\x14adapterUptimeSeconds\x12)\n" +
+ "\x10default_endpoint\x18\x05 \x01(\tR\x0fdefaultEndpoint\x127\n" +
+ "\aruntime\x18\x06 \x01(\v2\x1d.bridge_status.v1.RuntimeInfoR\aruntime\x127\n" +
+ "\ametrics\x18\a \x01(\v2\x1d.bridge_status.v1.MetricsInfoR\ametrics\x12<\n" +
+ "\tendpoints\x18\b \x03(\v2\x1e.bridge_status.v1.EndpointInfoR\tendpoints\x12I\n" +
+ "\rconfiguration\x18\t \x03(\v2#.bridge_status.v1.ConfigurationItemR\rconfiguration\x12-\n" +
+ "\x04jobs\x18\n" +
+ " \x03(\v2\x19.bridge_status.v1.JobInfoR\x04jobs\x12\x1c\n" +
+ "\ttimestamp\x18\v \x01(\tR\ttimestamp\"\x8c\x01\n" +
+ "\vRuntimeInfo\x12!\n" +
+ "\fnode_version\x18\x01 \x01(\tR\vnodeVersion\x12\x1a\n" +
+ "\bplatform\x18\x02 \x01(\tR\bplatform\x12\"\n" +
+ "\farchitecture\x18\x03 \x01(\tR\farchitecture\x12\x1a\n" +
+ "\bhostname\x18\x04 \x01(\tR\bhostname\"'\n" +
+ "\vMetricsInfo\x12\x18\n" +
+ "\aenabled\x18\x01 \x01(\bR\aenabled\"\\\n" +
+ "\fEndpointInfo\x12\x12\n" +
+ "\x04name\x18\x01 \x01(\tR\x04name\x12\x18\n" +
+ "\aaliases\x18\x02 \x03(\tR\aaliases\x12\x1e\n" +
+ "\n" +
+ "transports\x18\x03 \x03(\tR\n" +
+ "transports\"\x8d\x02\n" +
+ "\x11ConfigurationItem\x12\x12\n" +
+ "\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n" +
+ "\x05value\x18\x02 \x01(\tR\x05value\x12\x12\n" +
+ "\x04type\x18\x03 \x01(\tR\x04type\x12 \n" +
+ "\vdescription\x18\x04 \x01(\tR\vdescription\x12\x1a\n" +
+ "\brequired\x18\x05 \x01(\bR\brequired\x12#\n" +
+ "\rdefault_value\x18\x06 \x01(\tR\fdefaultValue\x12%\n" +
+ "\x0ecustom_setting\x18\a \x01(\bR\rcustomSetting\x120\n" +
+ "\x14env_default_override\x18\b \x01(\tR\x12envDefaultOverrideB_Z]github.com/smartcontractkit/chainlink/v2/core/services/nodestatusreporter/bridgestatus/eventsb\x06proto3"
+
+var (
+ file_bridge_status_proto_rawDescOnce sync.Once
+ file_bridge_status_proto_rawDescData []byte
+)
+
+func file_bridge_status_proto_rawDescGZIP() []byte {
+ file_bridge_status_proto_rawDescOnce.Do(func() {
+ file_bridge_status_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_bridge_status_proto_rawDesc), len(file_bridge_status_proto_rawDesc)))
+ })
+ return file_bridge_status_proto_rawDescData
+}
+
+var file_bridge_status_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
+var file_bridge_status_proto_goTypes = []any{
+ (*JobInfo)(nil), // 0: bridge_status.v1.JobInfo
+ (*BridgeStatusEvent)(nil), // 1: bridge_status.v1.BridgeStatusEvent
+ (*RuntimeInfo)(nil), // 2: bridge_status.v1.RuntimeInfo
+ (*MetricsInfo)(nil), // 3: bridge_status.v1.MetricsInfo
+ (*EndpointInfo)(nil), // 4: bridge_status.v1.EndpointInfo
+ (*ConfigurationItem)(nil), // 5: bridge_status.v1.ConfigurationItem
+}
+var file_bridge_status_proto_depIdxs = []int32{
+ 2, // 0: bridge_status.v1.BridgeStatusEvent.runtime:type_name -> bridge_status.v1.RuntimeInfo
+ 3, // 1: bridge_status.v1.BridgeStatusEvent.metrics:type_name -> bridge_status.v1.MetricsInfo
+ 4, // 2: bridge_status.v1.BridgeStatusEvent.endpoints:type_name -> bridge_status.v1.EndpointInfo
+ 5, // 3: bridge_status.v1.BridgeStatusEvent.configuration:type_name -> bridge_status.v1.ConfigurationItem
+ 0, // 4: bridge_status.v1.BridgeStatusEvent.jobs:type_name -> bridge_status.v1.JobInfo
+ 5, // [5:5] is the sub-list for method output_type
+ 5, // [5:5] is the sub-list for method input_type
+ 5, // [5:5] is the sub-list for extension type_name
+ 5, // [5:5] is the sub-list for extension extendee
+ 0, // [0:5] is the sub-list for field type_name
+}
+
+func init() { file_bridge_status_proto_init() }
+func file_bridge_status_proto_init() {
+ if File_bridge_status_proto != nil {
+ return
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: unsafe.Slice(unsafe.StringData(file_bridge_status_proto_rawDesc), len(file_bridge_status_proto_rawDesc)),
+ NumEnums: 0,
+ NumMessages: 6,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_bridge_status_proto_goTypes,
+ DependencyIndexes: file_bridge_status_proto_depIdxs,
+ MessageInfos: file_bridge_status_proto_msgTypes,
+ }.Build()
+ File_bridge_status_proto = out.File
+ file_bridge_status_proto_goTypes = nil
+ file_bridge_status_proto_depIdxs = nil
+}
diff --git a/core/services/nodestatusreporter/bridgestatus/events/bridge_status.proto b/core/services/nodestatusreporter/bridgestatus/events/bridge_status.proto
new file mode 100644
index 00000000000..066c641b489
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/events/bridge_status.proto
@@ -0,0 +1,67 @@
+syntax = "proto3";
+
+option go_package = "github.com/smartcontractkit/chainlink/v2/core/services/nodestatusreporter/bridgestatus/events";
+
+package bridge_status.v1;
+
+// JobInfo represents job information associated with a bridge
+message JobInfo {
+ string external_job_id = 1;
+ string job_name = 2;
+}
+
+// BridgeStatusEvent represents the status data from an External Adapter
+message BridgeStatusEvent {
+ // Bridge and adapter identification
+ string bridge_name = 1;
+ string adapter_name = 2;
+ string adapter_version = 3;
+ double adapter_uptime_seconds = 4;
+ string default_endpoint = 5;
+
+ // Runtime information
+ RuntimeInfo runtime = 6;
+
+ // Metrics configuration
+ MetricsInfo metrics = 7;
+
+ // Available endpoints as structured data
+ repeated EndpointInfo endpoints = 8;
+
+ // Configuration items
+ repeated ConfigurationItem configuration = 9;
+
+ // Jobs associated with this bridge
+ repeated JobInfo jobs = 10;
+
+ // Event metadata
+ string timestamp = 11;
+}
+
+message RuntimeInfo {
+ string node_version = 1;
+ string platform = 2;
+ string architecture = 3;
+ string hostname = 4;
+}
+
+message MetricsInfo {
+ bool enabled = 1;
+}
+
+message EndpointInfo {
+ string name = 1;
+ repeated string aliases = 2;
+ repeated string transports = 3;
+}
+
+message ConfigurationItem {
+ string name = 1;
+ string value = 2;
+ string type = 3;
+ string description = 4;
+ bool required = 5;
+ string default_value = 6;
+ bool custom_setting = 7;
+ string env_default_override = 8;
+}
\ No newline at end of file
diff --git a/core/services/nodestatusreporter/bridgestatus/events/emit.go b/core/services/nodestatusreporter/bridgestatus/events/emit.go
new file mode 100644
index 00000000000..77cc9252b91
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/events/emit.go
@@ -0,0 +1,34 @@
+package events
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+
+ "github.com/smartcontractkit/chainlink-common/pkg/beholder"
+)
+
+// EmitBridgeStatusEvent emits a Bridge Status event through the provided custmsg.MessageEmitter
+func EmitBridgeStatusEvent(ctx context.Context, emitter beholder.Emitter, event *BridgeStatusEvent) error {
+ if event.Timestamp == "" {
+ event.Timestamp = time.Now().Format(time.RFC3339Nano)
+ }
+
+ eventBytes, err := proto.Marshal(event)
+ if err != nil {
+ return fmt.Errorf("failed to marshal BridgeStatusEvent: %w", err)
+ }
+
+ err = emitter.Emit(ctx, eventBytes,
+ "beholder_data_schema", SchemaBridgeStatus,
+ "beholder_domain", "platform",
+ "beholder_entity", fmt.Sprintf("%s.%s", ProtoPkg, BridgeStatusEventEntity),
+ )
+ if err != nil {
+ return fmt.Errorf("failed to emit BridgeStatusEvent: %w", err)
+ }
+
+ return nil
+}
diff --git a/core/services/nodestatusreporter/bridgestatus/events/generate.go b/core/services/nodestatusreporter/bridgestatus/events/generate.go
new file mode 100644
index 00000000000..e1e26457299
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/events/generate.go
@@ -0,0 +1,3 @@
+package events
+
+//go:generate protoc --go_out=. --go_opt=paths=source_relative bridge_status.proto
diff --git a/core/services/nodestatusreporter/bridgestatus/events/types.go b/core/services/nodestatusreporter/bridgestatus/events/types.go
new file mode 100644
index 00000000000..c15856283a2
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/events/types.go
@@ -0,0 +1,9 @@
+package events
+
+const (
+ ProtoPkg = "bridge_status.v1"
+ // BridgeStatusEventEntity represents a Bridge Status event
+ BridgeStatusEventEntity string = "BridgeStatusEvent"
+ // SchemaBridgeStatus represents the schema for Bridge Status events
+ SchemaBridgeStatus string = "/bridge-status-events/v1"
+)
diff --git a/core/services/nodestatusreporter/bridgestatus/fixtures/bridge_status_empty.json b/core/services/nodestatusreporter/bridgestatus/fixtures/bridge_status_empty.json
new file mode 100644
index 00000000000..76e9d6dffd7
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/fixtures/bridge_status_empty.json
@@ -0,0 +1,19 @@
+{
+ "adapter": {
+ "name": "",
+ "version": "",
+ "uptimeSeconds": 0
+ },
+ "endpoints": [],
+ "defaultEndpoint": "",
+ "configuration": [],
+ "runtime": {
+ "nodeVersion": "",
+ "platform": "",
+ "architecture": "",
+ "hostname": ""
+ },
+ "metrics": {
+ "enabled": false
+ }
+}
\ No newline at end of file
diff --git a/core/services/nodestatusreporter/bridgestatus/fixtures/bridge_status_response.json b/core/services/nodestatusreporter/bridgestatus/fixtures/bridge_status_response.json
new file mode 100644
index 00000000000..16b970fdaa5
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/fixtures/bridge_status_response.json
@@ -0,0 +1,63 @@
+{
+ "adapter": {
+ "name": "test-adapter",
+ "version": "1.0.0",
+ "uptimeSeconds": 3600
+ },
+ "endpoints": [
+ {
+ "name": "price",
+ "aliases": ["crypto", "market"],
+ "transports": ["rest", "websocket"]
+ },
+ {
+ "name": "volume",
+ "aliases": ["trading"],
+ "transports": ["rest"]
+ }
+ ],
+ "defaultEndpoint": "price",
+ "configuration": [
+ {
+ "name": "API_KEY",
+ "value": "***REDACTED***",
+ "type": "string",
+ "description": "API key for external service",
+ "required": true,
+ "default": null,
+ "customSetting": true,
+ "envDefaultOverride": null
+ },
+ {
+ "name": "TIMEOUT",
+ "value": 30000,
+ "type": "number",
+ "description": "Request timeout in milliseconds",
+ "required": false,
+ "default": 30000,
+ "customSetting": false,
+ "envDefaultOverride": 45000
+ },
+ {
+ "name": "CACHE_ENABLED",
+ "value": true,
+ "type": "boolean",
+ "description": "Enable response caching",
+ "required": false,
+ "default": true,
+ "customSetting": false,
+ "envDefaultOverride": null
+ }
+ ],
+ "runtime": {
+ "nodeVersion": "18.17.0",
+ "platform": "linux",
+ "architecture": "x64",
+ "hostname": "ea-adapter-01"
+ },
+ "metrics": {
+ "enabled": true,
+ "port": 9090,
+ "endpoint": "/metrics"
+ }
+}
\ No newline at end of file
diff --git a/core/services/nodestatusreporter/bridgestatus/mocks/beholder_emitter.go b/core/services/nodestatusreporter/bridgestatus/mocks/beholder_emitter.go
new file mode 100644
index 00000000000..0afb0d94b54
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/mocks/beholder_emitter.go
@@ -0,0 +1,22 @@
+package mocks
+
+import (
+ "context"
+
+ "github.com/stretchr/testify/mock"
+)
+
+// BeholderEmitter is a mock implementation of beholder.Emitter for testing
+type BeholderEmitter struct {
+ mock.Mock
+}
+
+func (m *BeholderEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
+ args := m.Called(ctx, body, attrKVs)
+ return args.Error(0)
+}
+
+// NewBeholderEmitter creates a new beholder emitter mock for testing
+func NewBeholderEmitter() *BeholderEmitter {
+ return &BeholderEmitter{}
+}
diff --git a/core/services/nodestatusreporter/bridgestatus/mocks/config.go b/core/services/nodestatusreporter/bridgestatus/mocks/config.go
new file mode 100644
index 00000000000..386e6451397
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/mocks/config.go
@@ -0,0 +1,54 @@
+package mocks
+
+import (
+ "time"
+)
+
+// TestBridgeStatusReporterConfig implements config.BridgeStatusReporter for testing
+type TestBridgeStatusReporterConfig struct {
+ enabled bool
+ statusPath string
+ pollingInterval time.Duration
+ ignoreInvalidBridges bool
+ ignoreJoblessBridges bool
+}
+
+func NewTestBridgeStatusReporterConfig(enabled bool, statusPath string, pollingInterval time.Duration) *TestBridgeStatusReporterConfig {
+ return &TestBridgeStatusReporterConfig{
+ enabled: enabled,
+ statusPath: statusPath,
+ pollingInterval: pollingInterval,
+ ignoreInvalidBridges: true,
+ ignoreJoblessBridges: false,
+ }
+}
+
+func NewTestBridgeStatusReporterConfigWithSkip(enabled bool, statusPath string, pollingInterval time.Duration, ignoreInvalidBridges bool, ignoreJoblessBridges bool) *TestBridgeStatusReporterConfig {
+ return &TestBridgeStatusReporterConfig{
+ enabled: enabled,
+ statusPath: statusPath,
+ pollingInterval: pollingInterval,
+ ignoreInvalidBridges: ignoreInvalidBridges,
+ ignoreJoblessBridges: ignoreJoblessBridges,
+ }
+}
+
+func (e *TestBridgeStatusReporterConfig) Enabled() bool {
+ return e.enabled
+}
+
+func (e *TestBridgeStatusReporterConfig) StatusPath() string {
+ return e.statusPath
+}
+
+func (e *TestBridgeStatusReporterConfig) PollingInterval() time.Duration {
+ return e.pollingInterval
+}
+
+func (e *TestBridgeStatusReporterConfig) IgnoreInvalidBridges() bool {
+ return e.ignoreInvalidBridges
+}
+
+func (e *TestBridgeStatusReporterConfig) IgnoreJoblessBridges() bool {
+ return e.ignoreJoblessBridges
+}
diff --git a/core/services/nodestatusreporter/bridgestatus/mocks/http_client.go b/core/services/nodestatusreporter/bridgestatus/mocks/http_client.go
new file mode 100644
index 00000000000..72b64b5844a
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/mocks/http_client.go
@@ -0,0 +1,39 @@
+package mocks
+
+import (
+ "io"
+ "net/http"
+ "strings"
+)
+
+// HTTPRoundTripper is a mock implementation of http.RoundTripper for testing
+type HTTPRoundTripper struct {
+ Response *http.Response
+ ResponseBody string
+ Error error
+}
+
+func (m *HTTPRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
+ if m.Error != nil {
+ return nil, m.Error
+ }
+
+ response := *m.Response
+ response.Body = io.NopCloser(strings.NewReader(m.ResponseBody))
+ response.Request = req
+ return &response, nil
+}
+
+// NewMockHTTPClient creates a new HTTP client with mock transport for testing
+func NewMockHTTPClient(responseBody string, statusCode int) *http.Client {
+ return &http.Client{
+ Transport: &HTTPRoundTripper{
+ Response: &http.Response{
+ StatusCode: statusCode,
+ Header: make(http.Header),
+ Body: http.NoBody,
+ },
+ ResponseBody: responseBody,
+ },
+ }
+}
diff --git a/core/services/nodestatusreporter/bridgestatus/types.go b/core/services/nodestatusreporter/bridgestatus/types.go
new file mode 100644
index 00000000000..9e9d382d235
--- /dev/null
+++ b/core/services/nodestatusreporter/bridgestatus/types.go
@@ -0,0 +1,43 @@
+package bridgestatus
+
+// EAResponse represents the response schema from External Adapter status endpoint
+type EAResponse struct {
+ Adapter struct {
+ Name string `json:"name"`
+ Version string `json:"version"`
+ UptimeSeconds float64 `json:"uptimeSeconds"`
+ } `json:"adapter"`
+ Endpoints []struct {
+ Name string `json:"name"`
+ Aliases []string `json:"aliases"`
+ Transports []string `json:"transports"`
+ } `json:"endpoints"`
+ DefaultEndpoint string `json:"defaultEndpoint"`
+ Configuration []struct {
+ Name string `json:"name"`
+ Value any `json:"value"`
+ Type string `json:"type"`
+ Description string `json:"description"`
+ Required bool `json:"required"`
+ Default any `json:"default"`
+ CustomSetting bool `json:"customSetting"`
+ EnvDefaultOverride any `json:"envDefaultOverride"`
+ } `json:"configuration"`
+ Runtime struct {
+ NodeVersion string `json:"nodeVersion"`
+ Platform string `json:"platform"`
+ Architecture string `json:"architecture"`
+ Hostname string `json:"hostname"`
+ } `json:"runtime"`
+ Metrics struct {
+ Enabled bool `json:"enabled"`
+ Port *int `json:"port,omitempty"`
+ Endpoint *string `json:"endpoint,omitempty"`
+ } `json:"metrics"`
+}
+
+// JobInfo represents job information for a bridge
+type JobInfo struct {
+ ExternalJobID string
+ Name string
+}
diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml
index 3f838ad1d93..57a0881fb4d 100644
--- a/core/web/resolver/testdata/config-empty-effective.toml
+++ b/core/web/resolver/testdata/config-empty-effective.toml
@@ -346,3 +346,10 @@ URL = ''
[Billing]
URL = 'localhost:4319'
TLSEnabled = true
+
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml
index 02c804c94c3..c2aea0510fb 100644
--- a/core/web/resolver/testdata/config-full.toml
+++ b/core/web/resolver/testdata/config-full.toml
@@ -361,6 +361,13 @@ URL = 'https://workflow.fetcher.url'
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
Enabled = false
diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml
index 66487c8442b..dd8c42c8965 100644
--- a/core/web/resolver/testdata/config-multi-chain-effective.toml
+++ b/core/web/resolver/testdata/config-multi-chain-effective.toml
@@ -347,6 +347,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
AutoCreateKey = true
diff --git a/core/web/testdata/body/health.html b/core/web/testdata/body/health.html
index c3246efee4a..06d3be374e3 100644
--- a/core/web/testdata/body/health.html
+++ b/core/web/testdata/body/health.html
@@ -32,6 +32,9 @@
color: rgba(100,101,10,0);
}
+
+ BridgeStatusReporter
+
EVM
diff --git a/core/web/testdata/body/health.json b/core/web/testdata/body/health.json
index 396202c8205..4dfd12e3af7 100644
--- a/core/web/testdata/body/health.json
+++ b/core/web/testdata/body/health.json
@@ -1,5 +1,14 @@
{
"data": [
+ {
+ "type": "checks",
+ "id": "BridgeStatusReporter",
+ "attributes": {
+ "name": "BridgeStatusReporter",
+ "status": "passing",
+ "output": ""
+ }
+ },
{
"type": "checks",
"id": "EVM.0",
diff --git a/core/web/testdata/body/health.txt b/core/web/testdata/body/health.txt
index c5a5161f2bf..6d350988680 100644
--- a/core/web/testdata/body/health.txt
+++ b/core/web/testdata/body/health.txt
@@ -1,3 +1,4 @@
+ok BridgeStatusReporter
ok EVM.0
ok EVM.0.BalanceMonitor
ok EVM.0.HeadBroadcaster
diff --git a/docs/CONFIG.md b/docs/CONFIG.md
index f65e3eb2a70..452ad77a14e 100644
--- a/docs/CONFIG.md
+++ b/docs/CONFIG.md
@@ -2315,6 +2315,47 @@ TLSEnabled = true # Default
```
TLSEnabled enables TLS to be used to secure communication with the billing service. This is enabled by default.
+## BridgeStatusReporter
+```toml
+[BridgeStatusReporter]
+Enabled = false # Default
+StatusPath = "/status" # Default
+PollingInterval = "5m" # Default
+IgnoreInvalidBridges = true # Default
+IgnoreJoblessBridges = false # Default
+```
+BridgeStatusReporter holds settings for the Bridge Status Reporter service.
+
+### Enabled
+```toml
+Enabled = false # Default
+```
+Enabled enables the Bridge Status Reporter service that polls bridge status endpoints.
+
+### StatusPath
+```toml
+StatusPath = "/status" # Default
+```
+StatusPath is the path to append to bridge URLs for status polling.
+
+### PollingInterval
+```toml
+PollingInterval = "5m" # Default
+```
+PollingInterval is how often to poll bridge status endpoints for status.
+
+### IgnoreInvalidBridges
+```toml
+IgnoreInvalidBridges = true # Default
+```
+IgnoreInvalidBridges skips bridges that return HTTP errors or invalid responses.
+
+### IgnoreJoblessBridges
+```toml
+IgnoreJoblessBridges = false # Default
+```
+IgnoreJoblessBridges skips bridges that have no associated jobs.
+
## EVM
EVM defaults depend on ChainID:
diff --git a/docs/services/BRIDGE_STATUS.md b/docs/services/BRIDGE_STATUS.md
new file mode 100644
index 00000000000..6a62c3c9766
--- /dev/null
+++ b/docs/services/BRIDGE_STATUS.md
@@ -0,0 +1,107 @@
+# Bridge Status Reporter Documentation
+
+## Overview
+
+The Bridge Status Reporter is a service that continuously monitors External Adapter health by polling their status endpoints and emitting telemetry events. This provides visibility into bridge connectivity, performance, and configuration across your Chainlink network.
+
+## Configuration
+
+### Node Configuration
+
+Add the following to your node's TOML configuration:
+
+```toml
+[BridgeStatusReporter]
+Enabled = true # Enable the service
+StatusPath = "/status" # Path to append to bridge URLs
+PollingInterval = "5m" # How often to poll bridges
+IgnoreInvalidBridges = true # Skip bridges with HTTP errors
+IgnoreJoblessBridges = false # Skip bridges with no associated jobs
+```
+
+### External Adapter Requirements
+
+Your External Adapters must implement a `/status` endpoint (or the path specified in `StatusPath`) that returns bridge status information.
+
+#### Example Status Endpoint Response
+
+```json
+{
+ "bridge_name": "my-bridge",
+ "adapter_name": "crypto-price-adapter",
+ "adapter_version": "1.2.3",
+ "adapter_uptime_seconds": 86400.5,
+ "default_endpoint": "crypto",
+ "runtime": {
+ "node_version": "18.19.0",
+ "platform": "linux",
+ "architecture": "x64",
+ "hostname": "adapter-server-01"
+ },
+ "metrics": {
+ "enabled": true
+ },
+ "endpoints": [
+ {
+ "name": "crypto",
+ "aliases": ["price", "market"],
+ "transports": ["http", "https"]
+ }
+ ],
+ "configuration": [
+ {
+ "name": "API_KEY",
+ "value": "[REDACTED]",
+ "type": "string",
+ "description": "API key for data provider",
+ "required": true,
+ "default_value": "",
+ "custom_setting": false,
+ "env_default_override": "CRYPTO_API_KEY"
+ }
+ ]
+}
+```
+
+## Bridge Registration
+
+Bridges are automatically discovered from your node's bridge registry. The service will:
+
+1. **Query all registered bridges** from the database
+2. **Filter active bridges** (unless `IgnoreJoblessBridges` is false)
+3. **Poll each bridge's status endpoint** at the configured interval
+4. **Emit telemetry events** for successful responses
+5. **Log errors** for failed requests (optionally ignored with `IgnoreInvalidBridges`)
+
+## Telemetry Events
+
+The service emits `BridgeStatusEvent` protobuf messages containing:
+
+### Bridge Identification
+- `bridge_name` - Name from bridge registry
+- `adapter_name` - External adapter identifier
+- `adapter_version` - Version string
+- `default_endpoint` - Primary endpoint name
+
+### Runtime Information
+- `node_version` - Runtime version (Node.js, etc.)
+- `platform` - Operating system
+- `architecture` - CPU architecture
+- `hostname` - Server hostname
+
+### Operational Data
+- `adapter_uptime_seconds` - Adapter uptime in seconds
+- `endpoints` - Available endpoints with aliases and transports
+- `configuration` - Configuration parameters (values may be redacted)
+- `jobs` - Associated Chainlink jobs using this bridge
+- `metrics` - Metrics collection status
+
+## Troubleshooting
+
+### Common Issues
+
+**Bridge Not Appearing in Telemetry**
+- Verify bridge is registered: Check `/v2/bridges` API endpoint
+- Check bridge has associated jobs (if `IgnoreJoblessBridges = false`)
+- Ensure bridge URL is accessible from the node and returning data
+- Check bridge is correctly configured and logging errors
\ No newline at end of file
diff --git a/testdata/scripts/config/merge_raw_configs.txtar b/testdata/scripts/config/merge_raw_configs.txtar
index e09b31a5ae6..8b6b0c6ec9d 100644
--- a/testdata/scripts/config/merge_raw_configs.txtar
+++ b/testdata/scripts/config/merge_raw_configs.txtar
@@ -494,6 +494,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[Aptos]]
ChainID = '1'
Enabled = false
diff --git a/testdata/scripts/health/default.txtar b/testdata/scripts/health/default.txtar
index 95ecee1f013..521b5ca2d22 100644
--- a/testdata/scripts/health/default.txtar
+++ b/testdata/scripts/health/default.txtar
@@ -31,6 +31,7 @@ fj293fbBnlQ!f9vNs
HTTPPort = $PORT
-- out.txt --
+ok BridgeStatusReporter
ok HeadReporter
ok Heartbeat
ok JobSpawner
@@ -48,6 +49,15 @@ ok WorkflowStore
-- out.json --
{
"data": [
+ {
+ "type": "checks",
+ "id": "BridgeStatusReporter",
+ "attributes": {
+ "name": "BridgeStatusReporter",
+ "status": "passing",
+ "output": ""
+ }
+ },
{
"type": "checks",
"id": "HeadReporter",
diff --git a/testdata/scripts/health/multi-chain-loopp.txtar b/testdata/scripts/health/multi-chain-loopp.txtar
index 815d646fe0b..2bfffd9de00 100644
--- a/testdata/scripts/health/multi-chain-loopp.txtar
+++ b/testdata/scripts/health/multi-chain-loopp.txtar
@@ -72,6 +72,7 @@ Name = 'primary'
URL = 'http://stark.node'
-- out.txt --
+ok BridgeStatusReporter
ok Cosmos.Foo.RelayerService
ok Cosmos.Foo.RelayerService.PluginRelayerClient
ok Cosmos.Foo.RelayerService.PluginRelayerClient.PluginCosmos
@@ -132,6 +133,15 @@ ok WorkflowStore
-- out.json --
{
"data": [
+ {
+ "type": "checks",
+ "id": "BridgeStatusReporter",
+ "attributes": {
+ "name": "BridgeStatusReporter",
+ "status": "passing",
+ "output": ""
+ }
+ },
{
"type": "checks",
"id": "Cosmos.Foo.RelayerService",
diff --git a/testdata/scripts/health/multi-chain.txtar b/testdata/scripts/health/multi-chain.txtar
index 7a522fb6456..16a8f521fd8 100644
--- a/testdata/scripts/health/multi-chain.txtar
+++ b/testdata/scripts/health/multi-chain.txtar
@@ -54,6 +54,7 @@ Name = 'primary'
URL = 'http://solana.web'
-- out.txt --
+ok BridgeStatusReporter
ok EVM.1
ok EVM.1.BalanceMonitor
ok EVM.1.HeadBroadcaster
@@ -93,6 +94,15 @@ ok WorkflowStore
-- out.json --
{
"data": [
+ {
+ "type": "checks",
+ "id": "BridgeStatusReporter",
+ "attributes": {
+ "name": "BridgeStatusReporter",
+ "status": "passing",
+ "output": ""
+ }
+ },
{
"type": "checks",
"id": "EVM.1",
diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar
index d387f98af55..a24237bcb5f 100644
--- a/testdata/scripts/node/validate/default.txtar
+++ b/testdata/scripts/node/validate/default.txtar
@@ -359,6 +359,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
Invalid configuration: invalid secrets: 2 errors:
- Database.URL: empty: must be provided and non-empty
- Password.Keystore: empty: must be provided and non-empty
diff --git a/testdata/scripts/node/validate/defaults-override.txtar b/testdata/scripts/node/validate/defaults-override.txtar
index 7bc9319d983..439d7c38a1f 100644
--- a/testdata/scripts/node/validate/defaults-override.txtar
+++ b/testdata/scripts/node/validate/defaults-override.txtar
@@ -420,6 +420,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
AutoCreateKey = true
diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar
index 927a776ab3c..5135c9511ae 100644
--- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar
+++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar
@@ -403,6 +403,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
AutoCreateKey = true
diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar
index 0bf18caeffa..02277482f32 100644
--- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar
+++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar
@@ -403,6 +403,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
AutoCreateKey = true
diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar
index e662cd5fba7..d2039db3510 100644
--- a/testdata/scripts/node/validate/disk-based-logging.txtar
+++ b/testdata/scripts/node/validate/disk-based-logging.txtar
@@ -403,6 +403,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
AutoCreateKey = true
diff --git a/testdata/scripts/node/validate/fallback-override.txtar b/testdata/scripts/node/validate/fallback-override.txtar
index 36cbb78e5ab..dbd154a2671 100644
--- a/testdata/scripts/node/validate/fallback-override.txtar
+++ b/testdata/scripts/node/validate/fallback-override.txtar
@@ -501,6 +501,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
AutoCreateKey = true
diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar
index 36528f22cdc..fe0e8eb0a3a 100644
--- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar
+++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar
@@ -388,6 +388,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
Invalid configuration: invalid configuration: P2P.V2.Enabled: invalid value (false): P2P required for OCR or OCR2. Please enable P2P or disable OCR/OCR2.
-- err.txt --
diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar
index e461fe82cdb..af7bed25ecc 100644
--- a/testdata/scripts/node/validate/invalid.txtar
+++ b/testdata/scripts/node/validate/invalid.txtar
@@ -393,6 +393,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
AutoCreateKey = true
diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar
index af804fd2e87..c93cb639256 100644
--- a/testdata/scripts/node/validate/valid.txtar
+++ b/testdata/scripts/node/validate/valid.txtar
@@ -400,6 +400,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
[[EVM]]
ChainID = '1'
AutoCreateKey = true
diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar
index 55b54c0b285..4329dea6673 100644
--- a/testdata/scripts/node/validate/warnings.txtar
+++ b/testdata/scripts/node/validate/warnings.txtar
@@ -382,6 +382,13 @@ URL = ''
URL = 'localhost:4319'
TLSEnabled = true
+[BridgeStatusReporter]
+Enabled = false
+StatusPath = '/status'
+PollingInterval = '5m0s'
+IgnoreInvalidBridges = true
+IgnoreJoblessBridges = false
+
# Configuration warning:
Tracing.TLSCertPath: invalid value (something): must be empty when Tracing.Mode is 'unencrypted'
Valid configuration.