diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ea5429fa324..104bedfd2c3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -165,6 +165,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192] - Log bad handshake details when websocket connection fails {pull}41300[41300] - Don't send redundant documents for non-modified entities in the Jamf and Active Directory entityanalytics input. {pull}41179[41179] +- Don't send redundant documents for non-modified entities in the Active Directory entityanalytics input. {pull}41179[41179] *Heartbeat* diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go index fce94c7fb4c..8534187f6ac 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go @@ -119,7 +119,6 @@ func (p *jamfInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C return err } - var last time.Time for { select { case <-inputCtx.Cancelation.Done(): @@ -127,7 +126,8 @@ func (p *jamfInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C return inputCtx.Cancelation.Err() } return nil - case start := <-syncTimer.C: + case <-syncTimer.C: + start := time.Now() if err := p.runFullSync(inputCtx, store, client); err != nil { p.logger.Errorw("Error running full sync", "error", err) p.metrics.syncError.Inc() @@ -146,9 +146,9 @@ func (p *jamfInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C } updateTimer.Reset(p.cfg.UpdateInterval) p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval)) - last = start - case start := <-updateTimer.C: - if err := p.runIncrementalUpdate(inputCtx, store, last, client); err != nil { + case <-updateTimer.C: + start := time.Now() + if err := p.runIncrementalUpdate(inputCtx, store, client); err != nil { p.logger.Errorw("Error running incremental update", "error", err) p.metrics.updateError.Inc() } @@ -156,7 +156,6 @@ func (p *jamfInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C p.metrics.updateProcessingTime.Update(time.Since(start).Nanoseconds()) updateTimer.Reset(p.cfg.UpdateInterval) p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval)) - last = start } } } @@ -351,7 +350,7 @@ func (p *jamfInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, clien // runIncrementalUpdate will run an incremental update. The process is similar // to full synchronization, except only users which have changed (newly // discovered, modified, or deleted) will be published. -func (p *jamfInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, last time.Time, client beat.Client) error { +func (p *jamfInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { p.logger.Debugf("Running incremental update...") state, err := newStateStore(store) @@ -375,9 +374,7 @@ func (p *jamfInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto if len(updatedDevices) != 0 { tracker = kvstore.NewTxTracker(ctx) for _, d := range updatedDevices { - if d.Modified.After(last) { - p.publishComputer(d, inputCtx.ID, client, tracker) - } + p.publishComputer(d, inputCtx.ID, client, tracker) } tracker.Wait() } diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf_test.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf_test.go index 176a4a1023a..9ca7128d11c 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf_test.go @@ -47,7 +47,6 @@ func TestJamfDoFetch(t *testing.T) { wantComputers = append(wantComputers, &Computer{ Computer: c, State: Discovered, - Modified: time.Now(), }) } @@ -90,11 +89,8 @@ func TestJamfDoFetch(t *testing.T) { t.Fatalf("unexpected error from doFetch: %v", err) } - validTime := cmp.Comparer(func(a, b time.Time) bool { - return !a.IsZero() && !b.IsZero() - }) - if wantComputers != nil && !cmp.Equal(wantComputers, got, validTime) { - t.Errorf("unexpected result\n--- want\n+++ got\n%s", cmp.Diff(wantComputers, got, validTime)) + if wantComputers != nil && !cmp.Equal(wantComputers, got) { + t.Errorf("unexpected result\n--- want\n+++ got\n%s", cmp.Diff(wantComputers, got)) } }) } diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore.go index e5ae0de2d03..ffe2d0714cb 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore.go +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore.go @@ -35,8 +35,7 @@ const ( type Computer struct { jamf.Computer `json:"properties"` - State State `json:"state"` - Modified time.Time `json:"modified"` + State State `json:"state"` } // stateStore wraps a kvstore.Transaction and provides convenience methods for @@ -108,7 +107,7 @@ func (s *stateStore) storeComputer(c jamf.Computer) (_ *Computer, changed bool) if !ok { // Whether this is managed or not, it is discovered. The next sync // will change its state to Deleted if it is unmanaged. - curr := &Computer{Computer: c, State: Discovered, Modified: time.Now()} + curr := &Computer{Computer: c, State: Discovered} s.computers[*c.Udid] = curr return curr, true } @@ -121,7 +120,6 @@ func (s *stateStore) storeComputer(c jamf.Computer) (_ *Computer, changed bool) } if changed { stored.State = Modified - stored.Modified = time.Now() } return stored, changed }