Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/chains/evm/config/mocks/chain_scoped_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/config/envvar/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ConfigSchema struct {
TelemetryIngressBufferSize uint `env:"TELEMETRY_INGRESS_BUFFER_SIZE" default:"100"`
TelemetryIngressMaxBatchSize uint `env:"TELEMETRY_INGRESS_MAX_BATCH_SIZE" default:"50"`
TelemetryIngressSendInterval time.Duration `env:"TELEMETRY_INGRESS_SEND_INTERVAL" default:"500ms"`
TelemetryIngressSendTimeout time.Duration `env:"TELEMETRY_INGRESS_SEND_TIMEOUT" default:"10s"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New env variable shall be added to the release notes.

TelemetryIngressUseBatchSend bool `env:"TELEMETRY_INGRESS_USE_BATCH_SEND" default:"true"`
ShutdownGracePeriod time.Duration `env:"SHUTDOWN_GRACE_PERIOD" default:"5s"`

Expand Down
1 change: 1 addition & 0 deletions core/config/envvar/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func TestConfigSchema(t *testing.T) {
"TelemetryIngressLogging": "TELEMETRY_INGRESS_LOGGING",
"TelemetryIngressMaxBatchSize": "TELEMETRY_INGRESS_MAX_BATCH_SIZE",
"TelemetryIngressSendInterval": "TELEMETRY_INGRESS_SEND_INTERVAL",
"TelemetryIngressSendTimeout": "TELEMETRY_INGRESS_SEND_TIMEOUT",
"TelemetryIngressServerPubKey": "TELEMETRY_INGRESS_SERVER_PUB_KEY",
"TelemetryIngressURL": "TELEMETRY_INGRESS_URL",
"TelemetryIngressUseBatchSend": "TELEMETRY_INGRESS_USE_BATCH_SEND",
Expand Down
6 changes: 6 additions & 0 deletions core/config/general_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type GeneralOnlyConfig interface {
TelemetryIngressBufferSize() uint
TelemetryIngressMaxBatchSize() uint
TelemetryIngressSendInterval() time.Duration
TelemetryIngressSendTimeout() time.Duration
TelemetryIngressUseBatchSend() bool
TriggerFallbackDBPollInterval() time.Duration
UnAuthenticatedRateLimit() int64
Expand Down Expand Up @@ -885,6 +886,11 @@ func (c *generalConfig) TelemetryIngressSendInterval() time.Duration {
return c.getDuration("TelemetryIngressSendInterval")
}

// TelemetryIngressSendTimeout is the max duration to wait for the request to complete when sending batch telemetry
func (c *generalConfig) TelemetryIngressSendTimeout() time.Duration {
return c.getDuration("TelemetryIngressSendTimeout")
}

// TelemetryIngressUseBatchSend toggles sending telemetry using the batch client to the ingress server
func (c *generalConfig) TelemetryIngressUseBatchSend() bool {
return c.viper.GetBool(envvar.Name("TelemetryIngressUseBatchSend"))
Expand Down
14 changes: 14 additions & 0 deletions core/config/mocks/general_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
if cfg.ExplorerURL() == nil && cfg.TelemetryIngressURL() != nil {
if cfg.TelemetryIngressUseBatchSend() {
telemetryIngressBatchClient = synchronization.NewTelemetryIngressBatchClient(cfg.TelemetryIngressURL(),
cfg.TelemetryIngressServerPubKey(), keyStore.CSA(), cfg.TelemetryIngressLogging(), globalLogger, cfg.TelemetryIngressBufferSize(), cfg.TelemetryIngressMaxBatchSize(), cfg.TelemetryIngressSendInterval())
cfg.TelemetryIngressServerPubKey(), keyStore.CSA(), cfg.TelemetryIngressLogging(), globalLogger, cfg.TelemetryIngressBufferSize(), cfg.TelemetryIngressMaxBatchSize(), cfg.TelemetryIngressSendInterval(), cfg.TelemetryIngressSendTimeout())
monitoringEndpointGen = telemetry.NewIngressAgentBatchWrapper(telemetryIngressBatchClient)

} else {
Expand Down
2 changes: 1 addition & 1 deletion core/services/synchronization/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewTestTelemetryIngressClient(t *testing.T, url *url.URL, serverPubKeyHex s

// NewTestTelemetryIngressBatchClient calls NewTelemetryIngressBatchClient and injects telemClient.
func NewTestTelemetryIngressBatchClient(t *testing.T, url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, telemClient telemPb.TelemClient, sendInterval time.Duration) TelemetryIngressBatchClient {
tc := NewTelemetryIngressBatchClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, 50, sendInterval)
tc := NewTelemetryIngressBatchClient(url, serverPubKeyHex, ks, logging, logger.TestLogger(t), 100, 50, sendInterval, time.Second)
tc.(*telemetryIngressBatchClient).telemClient = telemClient
return tc
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,20 @@ type telemetryIngressBatchClient struct {
telemBufferSize uint
telemMaxBatchSize uint
telemSendInterval time.Duration
telemSendTimeout time.Duration

workers map[string]*telemetryIngressBatchWorker
workersMutex sync.Mutex
}

// NewTelemetryIngressBatchClient returns a client backed by wsrpc that
// can send telemetry to the telemetry ingress server
func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration) TelemetryIngressBatchClient {
func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks keystore.CSA, logging bool, lggr logger.Logger, telemBufferSize uint, telemMaxBatchSize uint, telemSendInterval time.Duration, telemSendTimeout time.Duration) TelemetryIngressBatchClient {
return &telemetryIngressBatchClient{
telemBufferSize: telemBufferSize,
telemMaxBatchSize: telemMaxBatchSize,
telemSendInterval: telemSendInterval,
telemSendTimeout: telemSendTimeout,
url: url,
ks: ks,
serverPubKeyHex: serverPubKeyHex,
Expand Down Expand Up @@ -171,6 +173,7 @@ func (tc *telemetryIngressBatchClient) findOrCreateWorker(payload TelemPayload)
worker = NewTelemetryIngressBatchWorker(
tc.telemMaxBatchSize,
tc.telemSendInterval,
tc.telemSendTimeout,
tc.telemClient,
&tc.wgDone,
tc.chDone,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type telemetryIngressBatchWorker struct {
telemMaxBatchSize uint
telemSendInterval time.Duration
telemSendTimeout time.Duration
telemClient telemPb.TelemClient
wgDone *sync.WaitGroup
chDone chan struct{}
Expand All @@ -30,6 +31,7 @@ type telemetryIngressBatchWorker struct {
func NewTelemetryIngressBatchWorker(
telemMaxBatchSize uint,
telemSendInterval time.Duration,
telemSendTimeout time.Duration,
telemClient telemPb.TelemClient,
wgDone *sync.WaitGroup,
chDone chan struct{},
Expand All @@ -40,6 +42,7 @@ func NewTelemetryIngressBatchWorker(
) *telemetryIngressBatchWorker {
return &telemetryIngressBatchWorker{
telemSendInterval: telemSendInterval,
telemSendTimeout: telemSendTimeout,
telemMaxBatchSize: telemMaxBatchSize,
telemClient: telemClient,
wgDone: wgDone,
Expand Down Expand Up @@ -68,7 +71,9 @@ func (tw *telemetryIngressBatchWorker) Start() {

// Send batched telemetry to the ingress server, log any errors
telemBatchReq := tw.BuildTelemBatchReq()
_, err := tw.telemClient.TelemBatch(context.Background(), telemBatchReq)
ctx, cancel := context.WithTimeout(context.Background(), tw.telemSendTimeout)
_, err := tw.telemClient.TelemBatch(ctx, telemBatchReq)
cancel()

if err != nil {
tw.lggr.Warnf("Could not send telemetry: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestTelemetryIngressWorker_BuildTelemBatchReq(t *testing.T) {
worker := synchronization.NewTelemetryIngressBatchWorker(
uint(maxTelemBatchSize),
time.Millisecond*1,
time.Second,
new(mocks.TelemClient),
&sync.WaitGroup{},
make(chan struct{}),
Expand Down
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ New ENV vars:
- `TELEMETRY_INGRESS_BUFFER_SIZE` (default: 100) - the number of telemetry messages to buffer before dropping new ones
- `TELEMETRY_INGRESS_MAX_BATCH_SIZE` (default: 50) - the maximum number of messages to batch into one telemetry request
- `TELEMETRY_INGRESS_SEND_INTERVAL` (default: 500ms) - the cadence on which batched telemetry is sent to the ingress server
- `TELEMETRY_INGRESS_SEND_TIMEOUT` (default: 10s) - the max duration to wait for the request to complete when sending batch telemetry
- `TELEMETRY_INGRESS_USE_BATCH_SEND` (default: true) - toggles sending telemetry using the batch client to the ingress server

#### Bootstrap job
Expand Down