Skip to content

Commit

Permalink
Event latency detection should be using EventsTTL
Browse files Browse the repository at this point in the history
  • Loading branch information
jimbishopp committed Mar 20, 2022
1 parent 5722c84 commit 7a4d507
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 21 deletions.
2 changes: 1 addition & 1 deletion lib/backend/sqlbk/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (b *Backend) poll(fromEventID int64) (lastEventID int64, err error) {
}

timeNeeded := time.Duration(events.Remaining/limit) * b.PollStreamPeriod
if timeNeeded > b.PurgePeriod {
if timeNeeded > b.EventsTTL {
b.buf.Reset()
lastEventID, err := b.initLastEventID(b.closeCtx)
if err != nil { // err = closeCtx.Err()
Expand Down
47 changes: 28 additions & 19 deletions lib/backend/sqlbk/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

const (
// DefaultPurgePeriod is the default frequency for purging database records.
DefaultPurgePeriod = 10 * time.Second
DefaultPurgePeriod = 20 * time.Second

// DefaultDatabase is default name of the backend database.
DefaultDatabase = "teleport"
Expand Down Expand Up @@ -67,6 +67,9 @@ type Config struct {
// BufferSize is a default buffer size used to emit events.
BufferSize int `json:"buffer_size,omitempty"`

// EventsTTL is amount of time before an event is purged.
EventsTTL time.Duration `json:"events_ttl,omitempty"`

// PollStreamPeriod is the polling period for the event stream.
PollStreamPeriod time.Duration `json:"poll_stream_period,omitempty"`

Expand All @@ -91,30 +94,15 @@ type Config struct {
// CheckAndSetDefaults validates required fields and sets default
// values for fields that have not been set.
func (c *Config) CheckAndSetDefaults() error {
if c.Log == nil {
return trace.BadParameter("Log is required")
}
if c.Clock == nil {
return trace.BadParameter("Clock is required")
}
if c.Addr == "" {
return trace.BadParameter("Addr is required")
}
if c.TLS.CAFile == "" {
return trace.BadParameter("TLS.CAFile is required")
}
if c.TLS.ClientKeyFile == "" {
return trace.BadParameter("TLS.ClientKeyFile is required")
}
if c.TLS.ClientCertFile == "" {
return trace.BadParameter("TLS.ClientCertFile is required")
}
if c.Database == "" {
c.Database = DefaultDatabase
}
if c.BufferSize <= 0 {
c.BufferSize = backend.DefaultBufferCapacity
}
if c.EventsTTL == 0 {
c.EventsTTL = backend.DefaultEventsTTL
}
if c.PollStreamPeriod <= 0 {
c.PollStreamPeriod = backend.DefaultPollStreamPeriod
}
Expand All @@ -127,5 +115,26 @@ func (c *Config) CheckAndSetDefaults() error {
if c.RetryTimeout == 0 {
c.RetryTimeout = DefaultRetryTimeout
}
if c.EventsTTL < c.PollStreamPeriod {
return trace.BadParameter("PollStreamPeriod must be greater than EventsTTL to emit storage events")
}
if c.Log == nil {
return trace.BadParameter("Log is required")
}
if c.Clock == nil {
return trace.BadParameter("Clock is required")
}
if c.Addr == "" {
return trace.BadParameter("Addr is required")
}
if c.TLS.CAFile == "" {
return trace.BadParameter("TLS.CAFile is required")
}
if c.TLS.ClientKeyFile == "" {
return trace.BadParameter("TLS.ClientKeyFile is required")
}
if c.TLS.ClientCertFile == "" {
return trace.BadParameter("TLS.ClientCertFile is required")
}
return nil
}
3 changes: 2 additions & 1 deletion lib/backend/sqlbk/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ func TestDriver(t *testing.T, driver Driver) {
watcher := createWatcher()

// Update config to trigger buffer reset due to latency emitting events.
// Formula: <events remaining>/(BufferSize/2)*PollStreamPeriod > PurgePeriod
// Formula: <events remaining>/(BufferSize/2)*PollStreamPeriod > EventsTTL
bk.BufferSize = 2 // emit 1 event at a time
bk.EventsTTL = time.Second
bk.PurgePeriod = time.Second
bk.PollStreamPeriod = time.Second

Expand Down

0 comments on commit 7a4d507

Please sign in to comment.