Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libbeat/processors/cache: don't write cache states that have not been altered #36696

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add device handling to Okta API package for entity analytics. {pull}35980[35980]
- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]
- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686] {pull}36696[36696]

==== Deprecated

Expand Down
6 changes: 6 additions & 0 deletions libbeat/processors/cache/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (c *fileStore) readState() {
}
if e.Expires.Before(time.Now()) {
// Don't retain expired elements.
c.dirty = true // The cache now does not reflect the file.
continue
}
c.cache[e.Key] = &e
Expand Down Expand Up @@ -239,6 +240,9 @@ func (c *fileStore) periodicWriteOut(ctx context.Context, every time.Duration) {
// writeState writes the current cache state to the backing file.
// If final is true and the cache is empty, the file will be deleted.
func (c *fileStore) writeState(final bool) {
if !c.dirty {
return
}
if len(c.cache) == 0 && final {
err := os.Remove(c.path)
if err != nil {
Expand Down Expand Up @@ -291,4 +295,6 @@ func (c *fileStore) writeState(final bool) {
return
}
}
// Only mark as not dirty if we succeeded in the write.
c.dirty = false
}
7 changes: 7 additions & 0 deletions libbeat/processors/cache/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ var fileStoreTests = []struct {
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -221,6 +222,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -247,6 +249,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -267,6 +270,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -291,6 +295,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -315,6 +320,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 1,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -333,6 +339,7 @@ var fileStoreTests = []struct {
cache: nil, // assistively nil-ed.
expiries: nil, // assistively nil-ed.
refs: 0,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand Down
5 changes: 5 additions & 0 deletions libbeat/processors/cache/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type memStore struct {
expiries expiryHeap
ttl time.Duration // ttl is the time entries are valid for in the cache.
refs int // refs is the number of processors referring to this store.
// dirty marks the cache as changed from the
// state in a backing file if it exists.
dirty bool

// id is the index into global cache store for the cache.
id string
Expand Down Expand Up @@ -176,6 +179,7 @@ func (c *memStore) Put(key string, val any) error {
}
c.cache[key] = e
heap.Push(&c.expiries, e)
c.dirty = true
return nil
}

Expand Down Expand Up @@ -212,6 +216,7 @@ func (c *memStore) Delete(key string) error {
}
heap.Remove(&c.expiries, v.index)
delete(c.cache, key)
c.dirty = true
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions libbeat/processors/cache/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ var memStoreTests = []struct {
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -209,6 +210,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -235,6 +237,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -255,6 +258,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -279,6 +283,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -303,6 +308,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 1,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -321,6 +327,7 @@ var memStoreTests = []struct {
cache: nil, // assistively nil-ed.
expiries: nil, // assistively nil-ed.
refs: 0,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand Down