diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 29832a8e3cf2..611874dc7bbf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -333,6 +333,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] - Update CEL mito extensions to v1.16.0. {pull}41727[41727] - Add evaluation state dump debugging option to CEL input. {pull}41335[41335] +- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869] *Auditbeat* diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 8fe28c754261..e47da893bbed 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -336,7 +336,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult s3API.pagerConstant = newS3PagerConstant(curConfig.BucketListPrefix) store := openTestStatestore() - states, err := newStates(nil, store) + states, err := newStates(nil, store, "") assert.NoError(t, err, "states creation should succeed") s3EventHandlerFactory := newS3ObjectProcessorFactory(metrics, s3API, config.FileSelectors, backupConfig{}) diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go index 6775a1be0336..bc8a21495e54 100644 --- a/x-pack/filebeat/input/awss3/s3_input.go +++ b/x-pack/filebeat/input/awss3/s3_input.go @@ -66,7 +66,7 @@ func (in *s3PollerInput) Run( var err error // Load the persistent S3 polling state. - in.states, err = newStates(in.log, in.store) + in.states, err = newStates(in.log, in.store, in.config.BucketListPrefix) if err != nil { return fmt.Errorf("can not start persistent store: %w", err) } diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index b0b19d828318..a1e9e02b4263 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -22,6 +22,7 @@ func TestS3Poller(t *testing.T) { logp.TestingSetup() const bucket = "bucket" + const listPrefix = "key" const numberOfWorkers = 5 const pollInterval = 2 * time.Second const testTimeout = 1 * time.Second @@ -127,7 +128,7 @@ func TestS3Poller(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(nil, mockAPI, nil, backupConfig{}) - states, err := newStates(nil, store) + states, err := newStates(nil, store, listPrefix) require.NoError(t, err, "states creation must succeed") poller := &s3PollerInput{ log: logp.NewLogger(inputName), @@ -135,7 +136,7 @@ func TestS3Poller(t *testing.T) { NumberOfWorkers: numberOfWorkers, BucketListInterval: pollInterval, BucketARN: bucket, - BucketListPrefix: "key", + BucketListPrefix: listPrefix, RegionName: "region", }, s3: mockAPI, @@ -265,7 +266,7 @@ func TestS3Poller(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3, nil, backupConfig{}) - states, err := newStates(nil, store) + states, err := newStates(nil, store, listPrefix) require.NoError(t, err, "states creation must succeed") poller := &s3PollerInput{ log: logp.NewLogger(inputName), @@ -273,7 +274,7 @@ func TestS3Poller(t *testing.T) { NumberOfWorkers: numberOfWorkers, BucketListInterval: pollInterval, BucketARN: bucket, - BucketListPrefix: "key", + BucketListPrefix: listPrefix, RegionName: "region", }, s3: mockS3, diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index cb40abbd41f0..8aef3de1c99e 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -28,23 +28,27 @@ type states struct { // storeLock must be held to access store. store *statestore.Store storeLock sync.Mutex + + // Accepted prefixes of state keys of this registry + keyPrefix string } // newStates generates a new states registry. -func newStates(log *logp.Logger, stateStore beater.StateStore) (*states, error) { +func newStates(log *logp.Logger, stateStore beater.StateStore, listPrefix string) (*states, error) { store, err := stateStore.Access() if err != nil { return nil, fmt.Errorf("can't access persistent store: %w", err) } - stateTable, err := loadS3StatesFromRegistry(log, store) + stateTable, err := loadS3StatesFromRegistry(log, store, listPrefix) if err != nil { return nil, fmt.Errorf("loading S3 input state: %w", err) } return &states{ - store: store, - states: stateTable, + store: store, + states: stateTable, + keyPrefix: listPrefix, }, nil } @@ -57,6 +61,12 @@ func (s *states) IsProcessed(state state) bool { } func (s *states) AddState(state state) error { + if !strings.HasPrefix(state.Key, s.keyPrefix) { + // Note - This failure should not happen since we create a dedicated state instance per input. + // Yet, this is here to avoid any wiring errors within the component. + return fmt.Errorf("expected prefix %s in key %s, skipping state registering", s.keyPrefix, state.Key) + } + id := state.ID() // Update in-memory copy s.statesLock.Lock() @@ -79,7 +89,9 @@ func (s *states) Close() { s.storeLock.Unlock() } -func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store) (map[string]state, error) { +// loadS3StatesFromRegistry loads a copy of the registry states. +// If prefix is set, entries will match the provided prefix(including empty prefix) +func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix string) (map[string]state, error) { stateTable := map[string]state{} err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { if !strings.HasPrefix(key, awsS3ObjectStatePrefix) { @@ -103,7 +115,11 @@ func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store) (map[st return true, nil } - stateTable[st.ID()] = st + // filter based on prefix and add entry to local copy + if strings.HasPrefix(st.Key, prefix) { + stateTable[st.ID()] = st + } + return true, nil }) if err != nil { diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go index dc345d5f88e8..328b57003cd1 100644 --- a/x-pack/filebeat/input/awss3/states_test.go +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/v7/filebeat/beater" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/elastic/elastic-agent-libs/logp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -62,14 +63,14 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) { }, "not existing state": { statesEdit: func(states *states) { - states.AddState(testState2) + _ = states.AddState(testState2) }, state: testState1, expectedIsProcessed: false, }, "existing state": { statesEdit: func(states *states) { - states.AddState(testState1) + _ = states.AddState(testState1) }, state: testState1, expectedIsProcessed: true, @@ -78,7 +79,7 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) { statesEdit: func(states *states) { state := testState1 state.Stored = true - states.AddState(state) + _ = states.AddState(state) }, state: testState1, shouldReload: true, @@ -88,7 +89,7 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) { statesEdit: func(states *states) { state := testState1 state.Failed = true - states.AddState(state) + _ = states.AddState(state) }, state: testState1, shouldReload: true, @@ -96,7 +97,7 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) { }, "existing unprocessed state is not persisted": { statesEdit: func(states *states) { - states.AddState(testState1) + _ = states.AddState(testState1) }, state: testState1, shouldReload: true, @@ -108,13 +109,13 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) { test := test t.Run(name, func(t *testing.T) { store := openTestStatestore() - states, err := newStates(nil, store) + states, err := newStates(nil, store, "") require.NoError(t, err, "states creation must succeed") if test.statesEdit != nil { test.statesEdit(states) } if test.shouldReload { - states, err = newStates(nil, store) + states, err = newStates(nil, store, "") require.NoError(t, err, "states creation must succeed") } @@ -123,3 +124,77 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) { }) } } + +func TestStatesPrefixHandling(t *testing.T) { + logger := logp.NewLogger("state-prefix-testing") + + t.Run("if prefix was set, accept only states with prefix", func(t *testing.T) { + // given + registry := openTestStatestore() + + // when - registry with prefix + st, err := newStates(logger, registry, "staging-") + require.NoError(t, err) + + // then - fail for non prefixed + err = st.AddState(newState("bucket", "production-logA", "etag", time.Now())) + require.Error(t, err) + + // then - pass for correctly prefixed + err = st.AddState(newState("bucket", "staging-logA", "etag", time.Now())) + require.NoError(t, err) + }) + + t.Run("states store only load entries matching the given prefix", func(t *testing.T) { + // given + registry := openTestStatestore() + + sA := newState("bucket", "A", "etag", time.Unix(1733221244, 0)) + sA.Stored = true + sStagingA := newState("bucket", "staging-A", "etag", time.Unix(1733224844, 0)) + sStagingA.Stored = true + sProdB := newState("bucket", "production/B", "etag", time.Unix(1733228444, 0)) + sProdB.Stored = true + sSpace := newState("bucket", " B", "etag", time.Unix(1733230444, 0)) + sSpace.Stored = true + + // add various states first with no prefix + st, err := newStates(logger, registry, "") + require.NoError(t, err) + + _ = st.AddState(sA) + _ = st.AddState(sStagingA) + _ = st.AddState(sProdB) + _ = st.AddState(sSpace) + + // Reload states and validate + + // when - no prefix reload + stNoPrefix, err := newStates(logger, registry, "") + require.NoError(t, err) + + require.True(t, stNoPrefix.IsProcessed(sA)) + require.True(t, stNoPrefix.IsProcessed(sStagingA)) + require.True(t, stNoPrefix.IsProcessed(sProdB)) + require.True(t, stNoPrefix.IsProcessed(sSpace)) + + // when - with prefix `staging-` + st, err = newStates(logger, registry, "staging-") + require.NoError(t, err) + + require.False(t, st.IsProcessed(sA)) + require.True(t, st.IsProcessed(sStagingA)) + require.False(t, st.IsProcessed(sProdB)) + require.False(t, st.IsProcessed(sSpace)) + + // when - with prefix `production/` + st, err = newStates(logger, registry, "production/") + require.NoError(t, err) + + require.False(t, st.IsProcessed(sA)) + require.False(t, st.IsProcessed(sStagingA)) + require.True(t, st.IsProcessed(sProdB)) + require.False(t, st.IsProcessed(sSpace)) + }) + +}