Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS] improve S3 input states copy by only storing filtered entries #41869

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
s3API.pagerConstant = newS3PagerConstant(curConfig.BucketListPrefix)
store := openTestStatestore()

states, err := newStates(nil, store)
states, err := newStates(nil, store, "")
assert.NoError(t, err, "states creation should succeed")

s3EventHandlerFactory := newS3ObjectProcessorFactory(metrics, s3API, config.FileSelectors, backupConfig{})
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/s3_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (in *s3PollerInput) Run(
var err error

// Load the persistent S3 polling state.
in.states, err = newStates(in.log, in.store)
in.states, err = newStates(in.log, in.store, in.config.BucketListPrefix)
if err != nil {
return fmt.Errorf("can not start persistent store: %w", err)
}
Expand Down
9 changes: 5 additions & 4 deletions x-pack/filebeat/input/awss3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestS3Poller(t *testing.T) {
logp.TestingSetup()

const bucket = "bucket"
const listPrefix = "key"
const numberOfWorkers = 5
const pollInterval = 2 * time.Second
const testTimeout = 1 * time.Second
Expand Down Expand Up @@ -127,15 +128,15 @@ func TestS3Poller(t *testing.T) {
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(nil, mockAPI, nil, backupConfig{})
states, err := newStates(nil, store)
states, err := newStates(nil, store, listPrefix)
require.NoError(t, err, "states creation must succeed")
poller := &s3PollerInput{
log: logp.NewLogger(inputName),
config: config{
NumberOfWorkers: numberOfWorkers,
BucketListInterval: pollInterval,
BucketARN: bucket,
BucketListPrefix: "key",
BucketListPrefix: listPrefix,
RegionName: "region",
},
s3: mockAPI,
Expand Down Expand Up @@ -265,15 +266,15 @@ func TestS3Poller(t *testing.T) {
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3, nil, backupConfig{})
states, err := newStates(nil, store)
states, err := newStates(nil, store, listPrefix)
require.NoError(t, err, "states creation must succeed")
poller := &s3PollerInput{
log: logp.NewLogger(inputName),
config: config{
NumberOfWorkers: numberOfWorkers,
BucketListInterval: pollInterval,
BucketARN: bucket,
BucketListPrefix: "key",
BucketListPrefix: listPrefix,
RegionName: "region",
},
s3: mockS3,
Expand Down
28 changes: 22 additions & 6 deletions x-pack/filebeat/input/awss3/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,27 @@ type states struct {
// storeLock must be held to access store.
store *statestore.Store
storeLock sync.Mutex

// Accepted prefixes of state keys of this registry
keyPrefix string
}

// newStates generates a new states registry.
func newStates(log *logp.Logger, stateStore beater.StateStore) (*states, error) {
func newStates(log *logp.Logger, stateStore beater.StateStore, listPrefix string) (*states, error) {
store, err := stateStore.Access()
if err != nil {
return nil, fmt.Errorf("can't access persistent store: %w", err)
}

stateTable, err := loadS3StatesFromRegistry(log, store)
stateTable, err := loadS3StatesFromRegistry(log, store, listPrefix)
if err != nil {
return nil, fmt.Errorf("loading S3 input state: %w", err)
}

return &states{
store: store,
states: stateTable,
store: store,
states: stateTable,
keyPrefix: listPrefix,
}, nil
}

Expand All @@ -57,6 +61,12 @@ func (s *states) IsProcessed(state state) bool {
}

func (s *states) AddState(state state) error {
if !strings.HasPrefix(state.Key, s.keyPrefix) {
// Note - This failure should not happen since we create a dedicated state instance per input.
// Yet, this is here to avoid any wiring errors within the component.
return fmt.Errorf("expected prefix %s in key %s, skipping state registering", s.keyPrefix, state.Key)
}

id := state.ID()
// Update in-memory copy
s.statesLock.Lock()
Expand All @@ -79,7 +89,9 @@ func (s *states) Close() {
s.storeLock.Unlock()
}

func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store) (map[string]state, error) {
// loadS3StatesFromRegistry loads a copy of the registry states.
// If prefix is set, entries will match the provided prefix(including empty prefix)
func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix string) (map[string]state, error) {
stateTable := map[string]state{}
err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) {
if !strings.HasPrefix(key, awsS3ObjectStatePrefix) {
Expand All @@ -103,7 +115,11 @@ func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store) (map[st
return true, nil
}

stateTable[st.ID()] = st
// filter based on prefix and add entry to local copy
if strings.HasPrefix(st.Key, prefix) {
stateTable[st.ID()] = st
}

return true, nil
})
if err != nil {
Expand Down
89 changes: 82 additions & 7 deletions x-pack/filebeat/input/awss3/states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -62,14 +63,14 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
},
"not existing state": {
statesEdit: func(states *states) {
states.AddState(testState2)
_ = states.AddState(testState2)
},
state: testState1,
expectedIsProcessed: false,
},
"existing state": {
statesEdit: func(states *states) {
states.AddState(testState1)
_ = states.AddState(testState1)
},
state: testState1,
expectedIsProcessed: true,
Expand All @@ -78,7 +79,7 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
statesEdit: func(states *states) {
state := testState1
state.Stored = true
states.AddState(state)
_ = states.AddState(state)
},
state: testState1,
shouldReload: true,
Expand All @@ -88,15 +89,15 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
statesEdit: func(states *states) {
state := testState1
state.Failed = true
states.AddState(state)
_ = states.AddState(state)
},
state: testState1,
shouldReload: true,
expectedIsProcessed: true,
},
"existing unprocessed state is not persisted": {
statesEdit: func(states *states) {
states.AddState(testState1)
_ = states.AddState(testState1)
},
state: testState1,
shouldReload: true,
Expand All @@ -108,13 +109,13 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
test := test
t.Run(name, func(t *testing.T) {
store := openTestStatestore()
states, err := newStates(nil, store)
states, err := newStates(nil, store, "")
require.NoError(t, err, "states creation must succeed")
if test.statesEdit != nil {
test.statesEdit(states)
}
if test.shouldReload {
states, err = newStates(nil, store)
states, err = newStates(nil, store, "")
require.NoError(t, err, "states creation must succeed")
}

Expand All @@ -123,3 +124,77 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
})
}
}

func TestStatesPrefixHandling(t *testing.T) {
logger := logp.NewLogger("state-prefix-testing")

t.Run("if prefix was set, accept only states with prefix", func(t *testing.T) {
// given
registry := openTestStatestore()

// when - registry with prefix
st, err := newStates(logger, registry, "staging-")
require.NoError(t, err)

// then - fail for non prefixed
err = st.AddState(newState("bucket", "production-logA", "etag", time.Now()))
require.Error(t, err)

// then - pass for correctly prefixed
err = st.AddState(newState("bucket", "staging-logA", "etag", time.Now()))
require.NoError(t, err)
})

t.Run("states store only load entries matching the given prefix", func(t *testing.T) {
// given
registry := openTestStatestore()

sA := newState("bucket", "A", "etag", time.Unix(1733221244, 0))
sA.Stored = true
sStagingA := newState("bucket", "staging-A", "etag", time.Unix(1733224844, 0))
sStagingA.Stored = true
sProdB := newState("bucket", "production/B", "etag", time.Unix(1733228444, 0))
sProdB.Stored = true
sSpace := newState("bucket", " B", "etag", time.Unix(1733230444, 0))
sSpace.Stored = true

// add various states first with no prefix
st, err := newStates(logger, registry, "")
require.NoError(t, err)

_ = st.AddState(sA)
_ = st.AddState(sStagingA)
_ = st.AddState(sProdB)
_ = st.AddState(sSpace)

// Reload states and validate

// when - no prefix reload
stNoPrefix, err := newStates(logger, registry, "")
require.NoError(t, err)

require.True(t, stNoPrefix.IsProcessed(sA))
require.True(t, stNoPrefix.IsProcessed(sStagingA))
require.True(t, stNoPrefix.IsProcessed(sProdB))
require.True(t, stNoPrefix.IsProcessed(sSpace))

// when - with prefix `staging-`
st, err = newStates(logger, registry, "staging-")
require.NoError(t, err)

require.False(t, st.IsProcessed(sA))
require.True(t, st.IsProcessed(sStagingA))
require.False(t, st.IsProcessed(sProdB))
require.False(t, st.IsProcessed(sSpace))

// when - with prefix `production/`
st, err = newStates(logger, registry, "production/")
require.NoError(t, err)

require.False(t, st.IsProcessed(sA))
require.False(t, st.IsProcessed(sStagingA))
require.True(t, st.IsProcessed(sProdB))
require.False(t, st.IsProcessed(sSpace))
})

}
Loading