Skip to content

Commit

Permalink
x-pack/filebeat/input/entityanalytics/provider/{jamf,activedirectory}…
Browse files Browse the repository at this point in the history
…: don't publish unmodified updates
  • Loading branch information
efd6 committed Oct 9, 2024
1 parent 4be27b6 commit 73f01b5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add backup and delete for AWS S3 polling mode feature back. {pull}41071[41071]
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]
- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142]
- Don't send redundant documents for non-modified entities in the Jamf and Active Directory entityanalytics input. {pull}41179[41179]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (p *adInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client
start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
p.publishUser(u, state, inputCtx.ID, client, tracker, false)
}

end := time.Now()
Expand Down Expand Up @@ -298,7 +298,7 @@ func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store
if len(updatedUsers) != 0 {
tracker = kvstore.NewTxTracker(ctx)
for _, u := range updatedUsers {
p.publishUser(u, state, inputCtx.ID, client, tracker)
p.publishUser(u, state, inputCtx.ID, client, tracker, true)
}
tracker.Wait()
}
Expand Down Expand Up @@ -390,7 +390,7 @@ func (p *adInput) publishMarker(ts, eventTime time.Time, inputID string, start b
}

// publishUser will publish a user document using the given beat.Client.
func (p *adInput) publishUser(u *User, state *stateStore, inputID string, client beat.Client, tracker *kvstore.TxTracker) {
func (p *adInput) publishUser(u *User, state *stateStore, inputID string, client beat.Client, tracker *kvstore.TxTracker, update bool) {
userDoc := mapstr.M{}

_, _ = userDoc.Put("activedirectory", u.Entry)
Expand All @@ -401,6 +401,12 @@ func (p *adInput) publishUser(u *User, state *stateStore, inputID string, client
case Deleted:
_, _ = userDoc.Put("event.action", "user-deleted")
case Discovered:
if update {
// If this in an update, any computer that is in the discovered
// state will already have been published, so we don't need to
// send the data again.
return
}
_, _ = userDoc.Put("event.action", "user-discovered")
case Modified:
_, _ = userDoc.Put("event.action", "user-modified")
Expand Down
12 changes: 9 additions & 3 deletions x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (p *jamfInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, clien
start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
for _, c := range state.computers {
p.publishComputer(c, inputCtx.ID, client, tracker)
p.publishComputer(c, inputCtx.ID, client, tracker, false)
}

end := time.Now()
Expand Down Expand Up @@ -374,7 +374,7 @@ func (p *jamfInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto
if len(updatedDevices) != 0 {
tracker = kvstore.NewTxTracker(ctx)
for _, d := range updatedDevices {
p.publishComputer(d, inputCtx.ID, client, tracker)
p.publishComputer(d, inputCtx.ID, client, tracker, true)
}
tracker.Wait()
}
Expand Down Expand Up @@ -481,7 +481,7 @@ func (p *jamfInput) publishMarker(ts, eventTime time.Time, inputID string, start
}

// publishComputer will publish a computer document using the given beat.Client.
func (p *jamfInput) publishComputer(c *Computer, inputID string, client beat.Client, tracker *kvstore.TxTracker) {
func (p *jamfInput) publishComputer(c *Computer, inputID string, client beat.Client, tracker *kvstore.TxTracker, update bool) {
devDoc := mapstr.M{}

id := "unknown"
Expand All @@ -496,6 +496,12 @@ func (p *jamfInput) publishComputer(c *Computer, inputID string, client beat.Cli
case Deleted:
_, _ = devDoc.Put("event.action", "device-deleted")
case Discovered:
if update {
// If this in an update, any computer that is in the discovered
// state will already have been published, so we don't need to
// send the data again.
return
}
_, _ = devDoc.Put("event.action", "device-discovered")
case Modified:
_, _ = devDoc.Put("event.action", "device-modified")
Expand Down

0 comments on commit 73f01b5

Please sign in to comment.