diff --git a/core/chains/evm/config/mocks/chain_scoped_config.go b/core/chains/evm/config/mocks/chain_scoped_config.go index 38e08987fd4..7d57d95407e 100644 --- a/core/chains/evm/config/mocks/chain_scoped_config.go +++ b/core/chains/evm/config/mocks/chain_scoped_config.go @@ -3984,6 +3984,20 @@ func (_m *ChainScopedConfig) TelemetryIngressSendInterval() time.Duration { return r0 } +// TelemetryIngressSendTimeout provides a mock function with given fields: +func (_m *ChainScopedConfig) TelemetryIngressSendTimeout() time.Duration { + ret := _m.Called() + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + // TelemetryIngressServerPubKey provides a mock function with given fields: func (_m *ChainScopedConfig) TelemetryIngressServerPubKey() string { ret := _m.Called() diff --git a/core/config/envvar/schema.go b/core/config/envvar/schema.go index 858f50e33ac..00920b30cc7 100644 --- a/core/config/envvar/schema.go +++ b/core/config/envvar/schema.go @@ -39,6 +39,7 @@ type ConfigSchema struct { TelemetryIngressBufferSize uint `env:"TELEMETRY_INGRESS_BUFFER_SIZE" default:"100"` TelemetryIngressMaxBatchSize uint `env:"TELEMETRY_INGRESS_MAX_BATCH_SIZE" default:"50"` TelemetryIngressSendInterval time.Duration `env:"TELEMETRY_INGRESS_SEND_INTERVAL" default:"500ms"` + TelemetryIngressSendTimeout time.Duration `env:"TELEMETRY_INGRESS_SEND_TIMEOUT" default:"10s"` TelemetryIngressUseBatchSend bool `env:"TELEMETRY_INGRESS_USE_BATCH_SEND" default:"true"` ShutdownGracePeriod time.Duration `env:"SHUTDOWN_GRACE_PERIOD" default:"5s"` diff --git a/core/config/envvar/schema_test.go b/core/config/envvar/schema_test.go index 98bc6c289ff..7c6c6c17550 100644 --- a/core/config/envvar/schema_test.go +++ b/core/config/envvar/schema_test.go @@ -161,6 +161,7 @@ func TestConfigSchema(t *testing.T) { "TelemetryIngressLogging": "TELEMETRY_INGRESS_LOGGING", "TelemetryIngressMaxBatchSize": "TELEMETRY_INGRESS_MAX_BATCH_SIZE", "TelemetryIngressSendInterval": "TELEMETRY_INGRESS_SEND_INTERVAL", + "TelemetryIngressSendTimeout": "TELEMETRY_INGRESS_SEND_TIMEOUT", "TelemetryIngressServerPubKey": "TELEMETRY_INGRESS_SERVER_PUB_KEY", "TelemetryIngressURL": "TELEMETRY_INGRESS_URL", "TelemetryIngressUseBatchSend": "TELEMETRY_INGRESS_USE_BATCH_SEND", diff --git a/core/config/general_config.go b/core/config/general_config.go index 7194ddc54bc..847c92a8580 100644 --- a/core/config/general_config.go +++ b/core/config/general_config.go @@ -166,6 +166,7 @@ type GeneralOnlyConfig interface { TelemetryIngressBufferSize() uint TelemetryIngressMaxBatchSize() uint TelemetryIngressSendInterval() time.Duration + TelemetryIngressSendTimeout() time.Duration TelemetryIngressUseBatchSend() bool TriggerFallbackDBPollInterval() time.Duration UnAuthenticatedRateLimit() int64 @@ -885,6 +886,11 @@ func (c *generalConfig) TelemetryIngressSendInterval() time.Duration { return c.getDuration("TelemetryIngressSendInterval") } +// TelemetryIngressSendTimeout is the max duration to wait for the request to complete when sending batch telemetry +func (c *generalConfig) TelemetryIngressSendTimeout() time.Duration { + return c.getDuration("TelemetryIngressSendTimeout") +} + // TelemetryIngressUseBatchSend toggles sending telemetry using the batch client to the ingress server func (c *generalConfig) TelemetryIngressUseBatchSend() bool { return c.viper.GetBool(envvar.Name("TelemetryIngressUseBatchSend")) diff --git a/core/config/mocks/general_config.go b/core/config/mocks/general_config.go index cdc0f9d6a0a..1bdaf7ed776 100644 --- a/core/config/mocks/general_config.go +++ b/core/config/mocks/general_config.go @@ -3272,6 +3272,20 @@ func (_m *GeneralConfig) TelemetryIngressSendInterval() time.Duration { return r0 } +// TelemetryIngressSendTimeout provides a mock function with given fields: +func (_m *GeneralConfig) TelemetryIngressSendTimeout() time.Duration { + ret := _m.Called() + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + // TelemetryIngressServerPubKey provides a mock function with given fields: func (_m *GeneralConfig) TelemetryIngressServerPubKey() string { ret := _m.Called() diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index bd13795d402..8320578cd3a 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -197,7 +197,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { if cfg.ExplorerURL() == nil && cfg.TelemetryIngressURL() != nil { if cfg.TelemetryIngressUseBatchSend() { telemetryIngressBatchClient = synchronization.NewTelemetryIngressBatchClient(cfg.TelemetryIngressURL(), - cfg.TelemetryIngressServerPubKey(), keyStore.CSA(), cfg.TelemetryIngressLogging(), globalLogger, cfg.TelemetryIngressBufferSize(), cfg.TelemetryIngressMaxBatchSize(), cfg.TelemetryIngressSendInterval()) + cfg.TelemetryIngressServerPubKey(), keyStore.CSA(), cfg.TelemetryIngressLogging(), globalLogger, cfg.TelemetryIngressBufferSize(), cfg.TelemetryIngressMaxBatchSize(), cfg.TelemetryIngressSendInterval(), cfg.TelemetryIngressSendTimeout()) monitoringEndpointGen = telemetry.NewIngressAgentBatchWrapper(telemetryIngressBatchClient) } else { diff --git a/core/services/synchronization/helpers_test.go b/core/services/synchronization/helpers_test.go index 5fcbb1ccb68..538871a0f9c 100644 --- a/core/services/synchronization/helpers_test.go +++ b/core/services/synchronization/helpers_test.go @@ -19,7 +19,7 @@ func NewTestTelemetryIngressClient(t *testing.T, url *url.URL, serverPubKeyHex s // NewTestTelemetryIngressBatchClient calls NewTelemetryIngressBatchClient and injects telemClient. func NewTestTelemetryIngressBatchClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient, sendInterval time.Duration) TelemetryIngressBatchClient { - tc := NewTelemetryIngressBatchClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, 50, sendInterval) + tc := NewTelemetryIngressBatchClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, 50, sendInterval, time.Second) tc.(*telemetryIngressBatchClient).telemClient = telemClient return tc } diff --git a/core/services/synchronization/telemetry_ingress_batch_client.go b/core/services/synchronization/telemetry_ingress_batch_client.go index d3e6e6c4bb5..37406e560e6 100644 --- a/core/services/synchronization/telemetry_ingress_batch_client.go +++ b/core/services/synchronization/telemetry_ingress_batch_client.go @@ -62,6 +62,7 @@ type telemetryIngressBatchClient struct { telemBufferSize uint telemMaxBatchSize uint telemSendInterval time.Duration + telemSendTimeout time.Duration workers map[string]*telemetryIngressBatchWorker workersMutex sync.Mutex @@ -69,11 +70,12 @@ type telemetryIngressBatchClient struct { // NewTelemetryIngressBatchClient returns a client backed by wsrpc that // can send telemetry to the telemetry ingress server -func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration) TelemetryIngressBatchClient { +func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration) TelemetryIngressBatchClient { return &telemetryIngressBatchClient{ telemBufferSize: telemBufferSize, telemMaxBatchSize: telemMaxBatchSize, telemSendInterval: telemSendInterval, + telemSendTimeout: telemSendTimeout, url: url, ks: ks, serverPubKeyHex: serverPubKeyHex, @@ -171,6 +173,7 @@ func (tc *telemetryIngressBatchClient) findOrCreateWorker(payload TelemPayload) worker = NewTelemetryIngressBatchWorker( tc.telemMaxBatchSize, tc.telemSendInterval, + tc.telemSendTimeout, tc.telemClient, &tc.wgDone, tc.chDone, diff --git a/core/services/synchronization/telemetry_ingress_batch_worker.go b/core/services/synchronization/telemetry_ingress_batch_worker.go index 67a5fb59554..7180be4308f 100644 --- a/core/services/synchronization/telemetry_ingress_batch_worker.go +++ b/core/services/synchronization/telemetry_ingress_batch_worker.go @@ -15,6 +15,7 @@ import ( type telemetryIngressBatchWorker struct { telemMaxBatchSize uint telemSendInterval time.Duration + telemSendTimeout time.Duration telemClient telemPb.TelemClient wgDone *sync.WaitGroup chDone chan struct{} @@ -30,6 +31,7 @@ type telemetryIngressBatchWorker struct { func NewTelemetryIngressBatchWorker( telemMaxBatchSize uint, telemSendInterval time.Duration, + telemSendTimeout time.Duration, telemClient telemPb.TelemClient, wgDone *sync.WaitGroup, chDone chan struct{}, @@ -40,6 +42,7 @@ func NewTelemetryIngressBatchWorker( ) *telemetryIngressBatchWorker { return &telemetryIngressBatchWorker{ telemSendInterval: telemSendInterval, + telemSendTimeout: telemSendTimeout, telemMaxBatchSize: telemMaxBatchSize, telemClient: telemClient, wgDone: wgDone, @@ -68,7 +71,9 @@ func (tw *telemetryIngressBatchWorker) Start() { // Send batched telemetry to the ingress server, log any errors telemBatchReq := tw.BuildTelemBatchReq() - _, err := tw.telemClient.TelemBatch(context.Background(), telemBatchReq) + ctx, cancel := context.WithTimeout(context.Background(), tw.telemSendTimeout) + _, err := tw.telemClient.TelemBatch(ctx, telemBatchReq) + cancel() if err != nil { tw.lggr.Warnf("Could not send telemetry: %v", err) diff --git a/core/services/synchronization/telemetry_ingress_batch_worker_test.go b/core/services/synchronization/telemetry_ingress_batch_worker_test.go index ebbc06f6be9..528f4f6f563 100644 --- a/core/services/synchronization/telemetry_ingress_batch_worker_test.go +++ b/core/services/synchronization/telemetry_ingress_batch_worker_test.go @@ -24,6 +24,7 @@ func TestTelemetryIngressWorker_BuildTelemBatchReq(t *testing.T) { worker := synchronization.NewTelemetryIngressBatchWorker( uint(maxTelemBatchSize), time.Millisecond*1, + time.Second, new(mocks.TelemClient), &sync.WaitGroup{}, make(chan struct{}), diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ef3b8f6a1c9..4543bb80c62 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -28,6 +28,7 @@ New ENV vars: - `TELEMETRY_INGRESS_BUFFER_SIZE` (default: 100) - the number of telemetry messages to buffer before dropping new ones - `TELEMETRY_INGRESS_MAX_BATCH_SIZE` (default: 50) - the maximum number of messages to batch into one telemetry request - `TELEMETRY_INGRESS_SEND_INTERVAL` (default: 500ms) - the cadence on which batched telemetry is sent to the ingress server +- `TELEMETRY_INGRESS_SEND_TIMEOUT` (default: 10s) - the max duration to wait for the request to complete when sending batch telemetry - `TELEMETRY_INGRESS_USE_BATCH_SEND` (default: true) - toggles sending telemetry using the batch client to the ingress server #### Bootstrap job