From 1cc44da29e534a87ac8c2730e10eadcbd719c72e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 14 Aug 2020 15:57:39 +0200 Subject: [PATCH 1/3] Add initial skeleton of filestream input --- filebeat/input/filestream/input.go | 68 ++ .../internal/input-logfile/clean.go | 124 ++++ .../internal/input-logfile/clean_test.go | 162 +++++ .../internal/input-logfile/cursor.go | 43 ++ .../internal/input-logfile/cursor_test.go | 124 ++++ .../filestream/internal/input-logfile/doc.go | 58 ++ .../internal/input-logfile/fswatch.go | 65 ++ .../internal/input-logfile/harvester.go | 121 ++++ .../internal/input-logfile/input.go | 106 ++++ .../internal/input-logfile/manager.go | 199 ++++++ .../internal/input-logfile/manager_test.go | 600 ++++++++++++++++++ .../internal/input-logfile/prospector.go | 35 + .../internal/input-logfile/publish.go | 153 +++++ .../internal/input-logfile/publish_test.go | 158 +++++ .../internal/input-logfile/store.go | 324 ++++++++++ .../internal/input-logfile/store_test.go | 351 ++++++++++ filebeat/input/filestream/prospector.go | 38 ++ filebeat/input/log/input.go | 2 + 18 files changed, 2731 insertions(+) create mode 100644 filebeat/input/filestream/input.go create mode 100644 filebeat/input/filestream/internal/input-logfile/clean.go create mode 100644 filebeat/input/filestream/internal/input-logfile/clean_test.go create mode 100644 filebeat/input/filestream/internal/input-logfile/cursor.go create mode 100644 filebeat/input/filestream/internal/input-logfile/cursor_test.go create mode 100644 filebeat/input/filestream/internal/input-logfile/doc.go create mode 100644 filebeat/input/filestream/internal/input-logfile/fswatch.go create mode 100644 filebeat/input/filestream/internal/input-logfile/harvester.go create mode 100644 filebeat/input/filestream/internal/input-logfile/input.go create mode 100644 filebeat/input/filestream/internal/input-logfile/manager.go create mode 100644 filebeat/input/filestream/internal/input-logfile/manager_test.go create mode 100644 filebeat/input/filestream/internal/input-logfile/prospector.go create mode 100644 filebeat/input/filestream/internal/input-logfile/publish.go create mode 100644 filebeat/input/filestream/internal/input-logfile/publish_test.go create mode 100644 filebeat/input/filestream/internal/input-logfile/store.go create mode 100644 filebeat/input/filestream/internal/input-logfile/store_test.go create mode 100644 filebeat/input/filestream/prospector.go diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go new file mode 100644 index 00000000000..487a5f01c2a --- /dev/null +++ b/filebeat/input/filestream/input.go @@ -0,0 +1,68 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// filestream is the input for reading from files which +// are actively written by other applications. +type filestream struct{} + +const pluginName = "filestream" + +// Plugin creates a new filestream input plugin for creating a stateful input. +func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin { + return input.Plugin{ + Name: pluginName, + Stability: feature.Experimental, + Deprecated: false, + Info: "filestream input", + Doc: "The filestream input collects logs from the local filestream service", + Manager: &loginp.InputManager{ + Logger: log, + StateStore: store, + Type: pluginName, + Configure: configure, + }, + } +} + +func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) { + panic("TODO: implement me") +} + +func (inp *filestream) Name() string { return pluginName } + +func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error { + panic("TODO: implement me") +} + +func (inp *filestream) Run( + ctx input.Context, + src loginp.Source, + cursor loginp.Cursor, + publisher loginp.Publisher, +) error { + panic("TODO: implement me") +} diff --git a/filebeat/input/filestream/internal/input-logfile/clean.go b/filebeat/input/filestream/internal/input-logfile/clean.go new file mode 100644 index 00000000000..d738dabbc55 --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/clean.go @@ -0,0 +1,124 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "time" + + "github.com/elastic/go-concert/timed" + "github.com/elastic/go-concert/unison" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +// cleaner removes finished entries from the registry file. +type cleaner struct { + log *logp.Logger +} + +// run starts a loop that tries to clean entries from the registry. +// The cleaner locks the store, such that no new states can be created +// during the cleanup phase. Only resources that are finished and whos TTL +// (clean_timeout setting) has expired will be removed. +// +// Resources are considered "Finished" if they do not have a current owner (active input), and +// if they have no pending updates that still need to be written to the registry file after associated +// events have been ACKed by the outputs. +// The event acquisition timestamp is used as reference to clean resources. If a resources was blocked +// for a long time, and the life time has been exhausted, then the resource will be removed immediately +// once the last event has been ACKed. +func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) { + started := time.Now() + timed.Periodic(canceler, interval, func() error { + gcStore(c.log, started, store) + return nil + }) +} + +// gcStore looks for resources to remove and deletes these. `gcStore` receives +// the start timestamp of the cleaner as reference. If we have entries without +// updates in the registry, that are older than `started`, we will use `started +// + ttl` to decide if an entry will be removed. This way old entries are not +// removed immediately on startup if the Beat is down for a longer period of +// time. +func gcStore(log *logp.Logger, started time.Time, store *store) { + log.Debugf("Start store cleanup") + defer log.Debugf("Done store cleanup") + + states := store.ephemeralStore + states.mu.Lock() + defer states.mu.Unlock() + + keys := gcFind(states.table, started, time.Now()) + if len(keys) == 0 { + log.Debug("No entries to remove were found") + return + } + + if err := gcClean(store, keys); err != nil { + log.Errorf("Failed to remove all entries from the registry: %+v", err) + } +} + +// gcFind searches the store of resources that can be removed. A set of keys to delete is returned. +func gcFind(table map[string]*resource, started, now time.Time) map[string]struct{} { + keys := map[string]struct{}{} + for key, resource := range table { + clean := checkCleanResource(started, now, resource) + if !clean { + // do not clean the resource if it is still live or not serialized to the persistent store yet. + continue + } + keys[key] = struct{}{} + } + + return keys +} + +// gcClean removes key value pairs in the removeSet from the store. +// If deletion in the persistent store fails the entry is kept in memory and +// eventually cleaned up later. +func gcClean(store *store, removeSet map[string]struct{}) error { + for key := range removeSet { + if err := store.persistentStore.Remove(key); err != nil { + return err + } + delete(store.ephemeralStore.table, key) + } + return nil +} + +// checkCleanResource returns true for a key-value pair is assumed to be old, +// if is not in use and there are no more pending updates that still need to be +// written to the persistent store anymore. +func checkCleanResource(started, now time.Time, resource *resource) bool { + if !resource.Finished() { + return false + } + + resource.stateMutex.Lock() + defer resource.stateMutex.Unlock() + + ttl := resource.internalState.TTL + reference := resource.internalState.Updated + if started.After(reference) { + reference = started + } + + return reference.Add(ttl).Before(now) && resource.stored +} diff --git a/filebeat/input/filestream/internal/input-logfile/clean_test.go b/filebeat/input/filestream/internal/input-logfile/clean_test.go new file mode 100644 index 00000000000..83e5bff412f --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/clean_test.go @@ -0,0 +1,162 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestGCStore(t *testing.T) { + t.Run("empty store", func(t *testing.T) { + started := time.Now() + + backend := createSampleStore(t, nil) + store := testOpenStore(t, "test", backend) + defer store.Release() + + gcStore(logp.NewLogger("test"), started, store) + + want := map[string]state{} + checkEqualStoreState(t, want, backend.snapshot()) + }) + + t.Run("state is still alive", func(t *testing.T) { + started := time.Now() + const ttl = 60 * time.Second + + initState := map[string]state{ + "test::key": { + TTL: ttl, + Updated: started.Add(-ttl / 2), + }, + } + + backend := createSampleStore(t, initState) + store := testOpenStore(t, "test", backend) + defer store.Release() + + gcStore(logp.NewLogger("test"), started, store) + + checkEqualStoreState(t, initState, backend.snapshot()) + }) + + t.Run("old state can be removed", func(t *testing.T) { + const ttl = 60 * time.Second + started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already + + initState := map[string]state{ + "test::key": { + TTL: ttl, + Updated: started.Add(-ttl), + }, + } + + backend := createSampleStore(t, initState) + store := testOpenStore(t, "test", backend) + defer store.Release() + + gcStore(logp.NewLogger("test"), started, store) + + want := map[string]state{} + checkEqualStoreState(t, want, backend.snapshot()) + }) + + t.Run("old state is not removed if cleanup is not active long enough", func(t *testing.T) { + const ttl = 60 * time.Minute + started := time.Now() + + initState := map[string]state{ + "test::key": { + TTL: ttl, + Updated: started.Add(-2 * ttl), + }, + } + + backend := createSampleStore(t, initState) + store := testOpenStore(t, "test", backend) + defer store.Release() + + gcStore(logp.NewLogger("test"), started, store) + + checkEqualStoreState(t, initState, backend.snapshot()) + }) + + t.Run("old state but resource is accessed", func(t *testing.T) { + const ttl = 60 * time.Second + started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already + + initState := map[string]state{ + "test::key": { + TTL: ttl, + Updated: started.Add(-ttl), + }, + } + + backend := createSampleStore(t, initState) + store := testOpenStore(t, "test", backend) + defer store.Release() + + // access resource and check it is not gc'ed + res := store.Get("test::key") + gcStore(logp.NewLogger("test"), started, store) + checkEqualStoreState(t, initState, backend.snapshot()) + + // release resource and check it gets gc'ed + res.Release() + want := map[string]state{} + gcStore(logp.NewLogger("test"), started, store) + checkEqualStoreState(t, want, backend.snapshot()) + }) + + t.Run("old state but resource has pending updates", func(t *testing.T) { + const ttl = 60 * time.Second + started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already + + initState := map[string]state{ + "test::key": { + TTL: ttl, + Updated: started.Add(-ttl), + }, + } + + backend := createSampleStore(t, initState) + store := testOpenStore(t, "test", backend) + defer store.Release() + + // create pending update operation + res := store.Get("test::key") + op, err := createUpdateOp(store, res, "test-state-update") + require.NoError(t, err) + res.Release() + + // cleanup fails + gcStore(logp.NewLogger("test"), started, store) + checkEqualStoreState(t, initState, backend.snapshot()) + + // cancel operation (no more pending operations) and try to gc again + op.done(1) + gcStore(logp.NewLogger("test"), started, store) + want := map[string]state{} + checkEqualStoreState(t, want, backend.snapshot()) + }) +} diff --git a/filebeat/input/filestream/internal/input-logfile/cursor.go b/filebeat/input/filestream/internal/input-logfile/cursor.go new file mode 100644 index 00000000000..37de24fe56c --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/cursor.go @@ -0,0 +1,43 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +// Cursor allows the input to check if cursor status has been stored +// in the past and unpack the status into a custom structure. +type Cursor struct { + store *store + resource *resource +} + +func makeCursor(store *store, res *resource) Cursor { + return Cursor{store: store, resource: res} +} + +// IsNew returns true if no cursor information has been stored +// for the current Source. +func (c Cursor) IsNew() bool { return c.resource.IsNew() } + +// Unpack deserialized the cursor state into to. Unpack fails if no pointer is +// given, or if the structure to points to is not compatible with the document +// stored. +func (c Cursor) Unpack(to interface{}) error { + if c.IsNew() { + return nil + } + return c.resource.UnpackCursor(to) +} diff --git a/filebeat/input/filestream/internal/input-logfile/cursor_test.go b/filebeat/input/filestream/internal/input-logfile/cursor_test.go new file mode 100644 index 00000000000..db2ff0c3a30 --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/cursor_test.go @@ -0,0 +1,124 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCursor_IsNew(t *testing.T) { + t.Run("true if key is not in store", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + cursor := makeCursor(store, store.Get("test::key")) + require.True(t, cursor.IsNew()) + }) + + t.Run("true if key is in store but without cursor value", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": {Cursor: nil}, + })) + defer store.Release() + + cursor := makeCursor(store, store.Get("test::key")) + require.True(t, cursor.IsNew()) + }) + + t.Run("false if key with cursor value is in persistent store", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": {Cursor: "test"}, + })) + defer store.Release() + + cursor := makeCursor(store, store.Get("test::key")) + require.False(t, cursor.IsNew()) + }) + + t.Run("false if key with cursor value is in memory store only", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": {Cursor: nil}, + })) + defer store.Release() + + res := store.Get("test::key") + op, err := createUpdateOp(store, res, "test-state-update") + require.NoError(t, err) + defer op.done(1) + + cursor := makeCursor(store, res) + require.False(t, cursor.IsNew()) + }) +} + +func TestCursor_Unpack(t *testing.T) { + t.Run("nothing to unpack if key is new", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + var st string + cursor := makeCursor(store, store.Get("test::key")) + + require.NoError(t, cursor.Unpack(&st)) + require.Equal(t, "", st) + }) + + t.Run("unpack fails if types are not compatible", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": {Cursor: "test"}, + })) + defer store.Release() + + var st struct{ A uint } + cursor := makeCursor(store, store.Get("test::key")) + require.Error(t, cursor.Unpack(&st)) + }) + + t.Run("unpack from state in persistent store", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": {Cursor: "test"}, + })) + defer store.Release() + + var st string + cursor := makeCursor(store, store.Get("test::key")) + + require.NoError(t, cursor.Unpack(&st)) + require.Equal(t, "test", st) + }) + + t.Run("unpack from in memory state if updates are pending", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": {Cursor: "test"}, + })) + defer store.Release() + + res := store.Get("test::key") + op, err := createUpdateOp(store, res, "test-state-update") + require.NoError(t, err) + defer op.done(1) + + var st string + cursor := makeCursor(store, store.Get("test::key")) + + require.NoError(t, cursor.Unpack(&st)) + require.Equal(t, "test-state-update", st) + }) +} diff --git a/filebeat/input/filestream/internal/input-logfile/doc.go b/filebeat/input/filestream/internal/input-logfile/doc.go new file mode 100644 index 00000000000..cf318d4bfed --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/doc.go @@ -0,0 +1,58 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package cursor provides an InputManager for use with the v2 API, that is +// capable of storing an internal cursor state between restarts. +// +// The InputManager requires authors to Implement a configuration function and +// the cursor.Input interface. The configuration function returns a slice of +// sources ([]Source) that it has read from the configuration object, and the +// actual Input that will be used to collect events from each configured +// source. +// When Run a go-routine will be started per configured source. If two inputs have +// configured the same source, only one will be active, while the other waits +// for the resource to become free. +// The manager keeps track of the state per source. When publishing an event a +// new cursor value can be passed as well. Future instance of the input can +// read the last published cursor state. +// +// For each source an in-memory and a persitent state are tracked. Internal +// meta updates by the input manager can not be read by Inputs, and will be +// written to the persistent store immediately. Cursor state updates are read +// and update by the input. Cursor updates are written to the persistent store +// only after the events have been ACKed by the output. Internally the input +// manager keeps track of already ACKed updates and pending ACKs. +// In order to guarantee progress even if the pbulishing is slow or blocked, all cursor +// updates are written to the in-memory state immediately. Source without any +// pending updates are in-sync (in-memory state == persistet state). All +// updates are ordered, but we allow the in-memory state to be ahead of the +// persistent state. +// When an input is started, the cursor state is read from the in-memory state. +// This way a new input instance can continue where other inputs have been +// stopped, even if we still have in-flight events from older input instances. +// The coordination between inputs guarantees that all updates are always in +// order. +// +// When a shutdown signal is received, the publisher is directly disconnected +// from the outputs. As all coordination is directly handled by the +// InputManager, shutdown will be immediate (once the input itself has +// returned), and can not be blocked by the outputs. +// +// An input that is about to collect a source that is already collected by +// another input will wait until the other input has returned or the current +// input did receive a shutdown signal. +package input_logfile diff --git a/filebeat/input/filestream/internal/input-logfile/fswatch.go b/filebeat/input/filestream/internal/input-logfile/fswatch.go new file mode 100644 index 00000000000..685b54253a4 --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/fswatch.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "os" + + "github.com/elastic/go-concert/unison" +) + +const ( + OpDone Operation = iota + OpCreate + OpWrite + OpDelete + OpRename +) + +// Operation describes what happened to a file. +type Operation uint8 + +// FSEvent returns inforamation about file system changes. +type FSEvent struct { + // NewPath is the new path of the file. + NewPath string + // OldPath is the previous path to the file, is it was + // deleted or renamed. + OldPath string + // Op is the file system event: create, write, rename, remove + Op Operation + // Info describes the file in the event. + Info os.FileInfo +} + +// FSScanner retrieves a list of files from the file system. +type FSScanner interface { + // GetFiles returns the list of monitored files. + // The keys of the map are the paths to the files and + // the values are the FileInfos describing the file. + GetFiles() map[string]os.FileInfo +} + +// FSWatcher returns file events of the monitored files. +type FSWatcher interface { + // Run is the event loop which watchers for changes + // in the file system and returns events based on the data. + Run(unison.Canceler) + // Event returns the next event captured by FSWatcher. + Event() FSEvent +} diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go new file mode 100644 index 00000000000..f89ced83abb --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -0,0 +1,121 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "context" + "fmt" + "runtime/debug" + "time" + + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/go-concert/ctxtool" + "github.com/elastic/go-concert/unison" +) + +// Harvester is the reader which collects the lines from +// the configured source. +type Harvester interface { + // Name returns the type of the Harvester + Name() string + // Test checks if the Harvester can be started with the given configuration. + Test(Source, input.TestContext) error + // Run is the event loop which reads from the source + // and forwards it to the publisher. + Run(input.Context, Source, Cursor, Publisher) error +} + +// HarvesterGroup is responsible for running the +// Harvesters started by the Prospector. +type HarvesterGroup struct { + manager *InputManager + readers map[string]context.CancelFunc + pipeline beat.PipelineConnector + harvester Harvester + cleanTimeout time.Duration + store *store + tg unison.TaskGroup +} + +// Run starts the Harvester for a Source. +func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error { + log := ctx.Logger.With("source", s.Name()) + log.Debug("Starting harvester for file") + + harvesterCtx, cancelHarvester := context.WithCancel(ctxtool.FromCanceller(ctx.Cancelation)) + ctx.Cancelation = harvesterCtx + + resource, err := hg.manager.lock(ctx, s.Name()) + if err != nil { + return err + } + + if _, ok := hg.readers[s.Name()]; ok { + log.Debug("A harvester is already running for file") + return nil + } + hg.readers[s.Name()] = cancelHarvester + + hg.store.UpdateTTL(resource, hg.cleanTimeout) + + client, err := hg.pipeline.ConnectWith(beat.ClientConfig{ + CloseRef: ctx.Cancelation, + ACKHandler: newInputACKHandler(ctx.Logger), + }) + if err != nil { + return err + } + + cursor := makeCursor(hg.store, resource) + publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor} + + go func() { + defer client.Close() + defer log.Debug("Stopped harvester for file") + defer cancelHarvester() + defer releaseResource(resource) + + defer func() { + if v := recover(); v != nil { + err := fmt.Errorf("harvester panic with: %+v\n%s", v, debug.Stack()) + ctx.Logger.Errorf("Harvester crashed with: %+v", err) + } + }() + + err := hg.harvester.Run(ctx, s, cursor, publisher) + if err != nil { + log.Errorf("Harvester stopped: %v", err) + } + }() + return nil +} + +// Cancel stops the running Harvester for a given Source. +func (hg *HarvesterGroup) Cancel(s Source) error { + if cancel, ok := hg.readers[s.Name()]; ok { + cancel() + return nil + } + return fmt.Errorf("no such harvester %s", s.Name()) +} + +func releaseResource(resource *resource) { + resource.lock.Unlock() + resource.Release() +} diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go new file mode 100644 index 00000000000..7084315b0c1 --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -0,0 +1,106 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "context" + "time" + + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert/ctxtool" + "github.com/elastic/go-concert/unison" +) + +type managedInput struct { + manager *InputManager + prospector Prospector + harvester Harvester + cleanTimeout time.Duration +} + +// Name is required to implement the v2.Input interface +func (inp *managedInput) Name() string { return inp.harvester.Name() } + +// Test runs the Test method for each configured source. +func (inp *managedInput) Test(ctx input.TestContext) error { + return inp.prospector.Test() +} + +// Run +func (inp *managedInput) Run( + ctx input.Context, + pipeline beat.PipelineConnector, +) (err error) { + // Setup cancellation using a custom cancel context. All workers will be + // stopped if one failed badly by returning an error. + cancelCtx, cancel := context.WithCancel(ctxtool.FromCanceller(ctx.Cancelation)) + defer cancel() + ctx.Cancelation = cancelCtx + + store := inp.manager.store + store.Retain() + defer store.Release() + + hg := &HarvesterGroup{ + pipeline: pipeline, + readers: make(map[string]context.CancelFunc), + manager: inp.manager, + cleanTimeout: inp.cleanTimeout, + harvester: inp.harvester, + store: store, + tg: unison.TaskGroup{}, + } + + stateStore, err := inp.manager.StateStore.Access() + if err != nil { + return err + } + defer stateStore.Close() + + inp.prospector.Run(ctx, stateStore, hg) + + return nil +} + +func newInputACKHandler(log *logp.Logger) beat.ACKer { + return acker.EventPrivateReporter(func(acked int, private []interface{}) { + var n uint + var last int + for i := 0; i < len(private); i++ { + current := private[i] + if current == nil { + continue + } + + if _, ok := current.(*updateOp); !ok { + continue + } + + n++ + last = i + } + + if n == 0 { + return + } + private[last].(*updateOp).Execute(n) + }) +} diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go new file mode 100644 index 00000000000..db3c600d2bc --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -0,0 +1,199 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "errors" + "sync" + "time" + + "github.com/urso/sderr" + + "github.com/elastic/go-concert/unison" + + input "github.com/elastic/beats/v7/filebeat/input/v2" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" +) + +// InputManager is used to create, manage, and coordinate stateful inputs and +// their persistent state. +// The InputManager ensures that only one input can be active for a unique source. +// If two inputs have overlapping sources, both can still collect data, but +// only one input will collect from the common source. +// +// The InputManager automatically cleans up old entries without an active +// input, and without any pending update operations for the persistent store. +// +// The Type field is used to create the key name in the persistent store. Users +// are allowed to add a custome per input configuration ID using the `id` +// setting, to collect the same source multiple times, but with different +// state. The key name in the persistent store becomes -[]- +type InputManager struct { + Logger *logp.Logger + + // StateStore gives the InputManager access to the persitent key value store. + StateStore StateStore + + // Type must contain the name of the input type. It is used to create the key name + // for all sources the inputs collect from. + Type string + + // DefaultCleanTimeout configures the key/value garbage collection interval. + // The InputManager will only collect keys for the configured 'Type' + DefaultCleanTimeout time.Duration + + // Configure returns an array of Sources, and a configured Input instances + // that will be used to collect events from each source. + Configure func(cfg *common.Config) (Prospector, Harvester, error) + + initOnce sync.Once + initErr error + store *store +} + +// Source describe a source the input can collect data from. +// The `Name` method must return an unique name, that will be used to identify +// the source in the persistent state store. +type Source interface { + Name() string +} + +var errNoSourceConfigured = errors.New("no source has been configured") +var errNoInputRunner = errors.New("no input runner available") + +// StateStore interface and configurations used to give the Manager access to the persistent store. +type StateStore interface { + Access() (*statestore.Store, error) + CleanupInterval() time.Duration +} + +func (cim *InputManager) init() error { + cim.initOnce.Do(func() { + if cim.DefaultCleanTimeout <= 0 { + cim.DefaultCleanTimeout = 30 * time.Minute + } + + log := cim.Logger.With("input_type", cim.Type) + var store *store + store, cim.initErr = openStore(log, cim.StateStore, cim.Type) + if cim.initErr != nil { + return + } + + cim.store = store + }) + + return cim.initErr +} + +// Init starts background processes for deleting old entries from the +// persistent store if mode is ModeRun. +func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error { + if mode != v2.ModeRun { + return nil + } + + if err := cim.init(); err != nil { + return err + } + + log := cim.Logger.With("input_type", cim.Type) + + store := cim.store + cleaner := &cleaner{log: log} + store.Retain() + err := group.Go(func(canceler unison.Canceler) error { + defer cim.shutdown() + defer store.Release() + interval := cim.StateStore.CleanupInterval() + if interval <= 0 { + interval = 5 * time.Minute + } + cleaner.run(canceler, store, interval) + return nil + }) + if err != nil { + store.Release() + cim.shutdown() + return sderr.Wrap(err, "Can not start registry cleanup process") + } + + return nil +} + +func (cim *InputManager) shutdown() { + cim.store.Release() +} + +// Create builds a new v2.Input using the provided Configure function. +// The Input will run a go-routine per source that has been configured. +func (cim *InputManager) Create(config *common.Config) (input.Input, error) { + if err := cim.init(); err != nil { + return nil, err + } + + settings := struct { + ID string `config:"id"` + CleanTimeout time.Duration `config:"clean_timeout"` + }{ID: "", CleanTimeout: cim.DefaultCleanTimeout} + if err := config.Unpack(&settings); err != nil { + return nil, err + } + + prospector, harvester, err := cim.Configure(config) + if err != nil { + return nil, err + } + if harvester == nil { + return nil, errNoInputRunner + } + + return &managedInput{ + manager: cim, + prospector: prospector, + harvester: harvester, + cleanTimeout: settings.CleanTimeout, + }, nil +} + +// Lock locks a key for exclusive access and returns an resource that can be used to modify +// the cursor state and unlock the key. +func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) { + resource := cim.store.Get(key) + err := lockResource(ctx.Logger, resource, ctx.Cancelation) + if err != nil { + resource.Release() + return nil, err + } + return resource, nil +} + +func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error { + if !resource.lock.TryLock() { + log.Infof("Resource '%v' currently in use, waiting...", resource.key) + err := resource.lock.LockContext(canceler) + if err != nil { + log.Infof("Input for resource '%v' has been stopped while waiting", resource.key) + return err + } + } + return nil +} diff --git a/filebeat/input/filestream/internal/input-logfile/manager_test.go b/filebeat/input/filestream/internal/input-logfile/manager_test.go new file mode 100644 index 00000000000..4a9342cd35f --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/manager_test.go @@ -0,0 +1,600 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "context" + "errors" + "fmt" + "runtime" + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + input "github.com/elastic/beats/v7/filebeat/input/v2" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/beats/v7/libbeat/tests/resources" + "github.com/elastic/go-concert/unison" +) + +type fakeTestInput struct { + OnTest func(Source, input.TestContext) error + OnRun func(input.Context, Source, Cursor, Publisher) error +} + +type stringSource string + +func TestManager_Init(t *testing.T) { + // Integration style tests for the InputManager and the state garbage collector + + t.Run("stopping the taskgroup kills internal go-routines", func(t *testing.T) { + numRoutines := runtime.NumGoroutine() + + var grp unison.TaskGroup + store := createSampleStore(t, nil) + manager := &InputManager{ + Logger: logp.NewLogger("test"), + StateStore: store, + Type: "test", + DefaultCleanTimeout: 10 * time.Millisecond, + } + + err := manager.Init(&grp, v2.ModeRun) + require.NoError(t, err) + + time.Sleep(200 * time.Millisecond) + grp.Stop() + + // wait for all go-routines to be gone + + for numRoutines < runtime.NumGoroutine() { + time.Sleep(1 * time.Millisecond) + } + }) + + t.Run("collect old entries after startup", func(t *testing.T) { + store := createSampleStore(t, map[string]state{ + "test::key": { + TTL: 1 * time.Millisecond, + Updated: time.Now().Add(-24 * time.Hour), + }, + }) + store.GCPeriod = 10 * time.Millisecond + + var grp unison.TaskGroup + defer grp.Stop() + manager := &InputManager{ + Logger: logp.NewLogger("test"), + StateStore: store, + Type: "test", + DefaultCleanTimeout: 10 * time.Millisecond, + } + + err := manager.Init(&grp, v2.ModeRun) + require.NoError(t, err) + + for len(store.snapshot()) > 0 { + time.Sleep(1 * time.Millisecond) + } + }) +} + +func TestManager_Create(t *testing.T) { + t.Run("fail if no source is configured", func(t *testing.T) { + manager := constInput(t, nil, &fakeTestInput{}) + _, err := manager.Create(common.NewConfig()) + require.Error(t, err) + }) + + t.Run("fail if config error", func(t *testing.T) { + manager := failingManager(t, errors.New("oops")) + _, err := manager.Create(common.NewConfig()) + require.Error(t, err) + }) + + t.Run("fail if no input runner is returned", func(t *testing.T) { + manager := constInput(t, sourceList("test"), nil) + _, err := manager.Create(common.NewConfig()) + require.Error(t, err) + }) + + t.Run("configure ok", func(t *testing.T) { + manager := constInput(t, sourceList("test"), &fakeTestInput{}) + _, err := manager.Create(common.NewConfig()) + require.NoError(t, err) + }) + + t.Run("configuring inputs with overlapping sources is allowed", func(t *testing.T) { + manager := simpleManagerWithConfigure(t, func(cfg *common.Config) ([]Source, Input, error) { + config := struct{ Sources []string }{} + err := cfg.Unpack(&config) + return sourceList(config.Sources...), &fakeTestInput{}, err + }) + + _, err := manager.Create(common.MustNewConfigFrom(map[string]interface{}{ + "sources": []string{"a"}, + })) + require.NoError(t, err) + + _, err = manager.Create(common.MustNewConfigFrom(map[string]interface{}{ + "sources": []string{"a"}, + })) + require.NoError(t, err) + }) +} + +func TestManager_InputsTest(t *testing.T) { + var mu sync.Mutex + var seen []string + + sources := sourceList("source1", "source2") + + t.Run("test is run for each source", func(t *testing.T) { + defer resources.NewGoroutinesChecker().Check(t) + + manager := constInput(t, sources, &fakeTestInput{ + OnTest: func(source Source, _ v2.TestContext) error { + mu.Lock() + defer mu.Unlock() + seen = append(seen, source.Name()) + return nil + }, + }) + + inp, err := manager.Create(common.NewConfig()) + require.NoError(t, err) + + err = inp.Test(input.TestContext{}) + require.NoError(t, err) + + sort.Strings(seen) + require.Equal(t, []string{"source1", "source2"}, seen) + }) + + t.Run("cancel gets distributed to all source tests", func(t *testing.T) { + defer resources.NewGoroutinesChecker().Check(t) + + manager := constInput(t, sources, &fakeTestInput{ + OnTest: func(_ Source, ctx v2.TestContext) error { + <-ctx.Cancelation.Done() + return nil + }, + }) + + inp, err := manager.Create(common.NewConfig()) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.TODO()) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err = inp.Test(input.TestContext{Cancelation: ctx}) + }() + + cancel() + wg.Wait() + require.NoError(t, err) + }) + + t.Run("fail if test for one source fails", func(t *testing.T) { + defer resources.NewGoroutinesChecker().Check(t) + + failing := Source(stringSource("source1")) + sources := []Source{failing, stringSource("source2")} + + manager := constInput(t, sources, &fakeTestInput{ + OnTest: func(source Source, _ v2.TestContext) error { + if source == failing { + t.Log("return error") + return errors.New("oops") + } + t.Log("return ok") + return nil + }, + }) + + inp, err := manager.Create(common.NewConfig()) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err = inp.Test(input.TestContext{}) + t.Logf("Test returned: %v", err) + }() + + wg.Wait() + require.Error(t, err) + }) + + t.Run("panic is captured", func(t *testing.T) { + defer resources.NewGoroutinesChecker().Check(t) + + manager := constInput(t, sources, &fakeTestInput{ + OnTest: func(source Source, _ v2.TestContext) error { + panic("oops") + }, + }) + + inp, err := manager.Create(common.NewConfig()) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err = inp.Test(input.TestContext{Logger: logp.NewLogger("test")}) + t.Logf("Test returned: %v", err) + }() + + wg.Wait() + require.Error(t, err) + }) +} + +func TestManager_InputsRun(t *testing.T) { + // Integration style tests for the InputManager and Input.Run + + t.Run("input returned with error", func(t *testing.T) { + defer resources.NewGoroutinesChecker().Check(t) + + manager := constInput(t, sourceList("test"), &fakeTestInput{ + OnRun: func(_ input.Context, _ Source, _ Cursor, _ Publisher) error { + return errors.New("oops") + }, + }) + + inp, err := manager.Create(common.NewConfig()) + require.NoError(t, err) + + cancelCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var clientCounters pubtest.ClientCounter + err = inp.Run(v2.Context{ + Logger: manager.Logger, + Cancelation: cancelCtx, + }, clientCounters.BuildConnector()) + require.Error(t, err) + require.Equal(t, 0, clientCounters.Active()) + }) + + t.Run("panic is captured", func(t *testing.T) { + defer resources.NewGoroutinesChecker().Check(t) + + manager := constInput(t, sourceList("test"), &fakeTestInput{ + OnRun: func(_ input.Context, _ Source, _ Cursor, _ Publisher) error { + panic("oops") + }, + }) + + inp, err := manager.Create(common.NewConfig()) + require.NoError(t, err) + + cancelCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var clientCounters pubtest.ClientCounter + err = inp.Run(v2.Context{ + Logger: manager.Logger, + Cancelation: cancelCtx, + }, clientCounters.BuildConnector()) + require.Error(t, err) + require.Equal(t, 0, clientCounters.Active()) + }) + + t.Run("shutdown on signal", func(t *testing.T) { + defer resources.NewGoroutinesChecker().Check(t) + + manager := constInput(t, sourceList("test"), &fakeTestInput{ + OnRun: func(ctx input.Context, _ Source, _ Cursor, _ Publisher) error { + <-ctx.Cancelation.Done() + return nil + }, + }) + + inp, err := manager.Create(common.NewConfig()) + require.NoError(t, err) + + cancelCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var clientCounters pubtest.ClientCounter + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err = inp.Run(v2.Context{ + Logger: manager.Logger, + Cancelation: cancelCtx, + }, clientCounters.BuildConnector()) + }() + + cancel() + wg.Wait() + require.NoError(t, err) + require.Equal(t, 0, clientCounters.Active()) + }) + + t.Run("continue sending from last known position", func(t *testing.T) { + log := logp.NewLogger("test") + + type runConfig struct{ Max int } + + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + manager := simpleManagerWithConfigure(t, func(cfg *common.Config) ([]Source, Input, error) { + config := runConfig{} + if err := cfg.Unpack(&config); err != nil { + return nil, nil, err + } + + inp := &fakeTestInput{ + OnRun: func(_ input.Context, _ Source, cursor Cursor, pub Publisher) error { + state := struct{ N int }{} + if !cursor.IsNew() { + if err := cursor.Unpack(&state); err != nil { + return fmt.Errorf("failed to unpack cursor: %w", err) + } + } + + for i := 0; i < config.Max; i++ { + event := beat.Event{Fields: common.MapStr{"n": state.N}} + state.N++ + pub.Publish(event, state) + } + return nil + }, + } + + return sourceList("test"), inp, nil + }) + + var ids []int + pipeline := pubtest.ConstClient(&pubtest.FakeClient{ + PublishFunc: func(event beat.Event) { + id := event.Fields["n"].(int) + ids = append(ids, id) + }, + }) + + // create and run first instance + inp, err := manager.Create(common.MustNewConfigFrom(runConfig{Max: 3})) + require.NoError(t, err) + require.NoError(t, inp.Run(input.Context{ + Logger: log, + Cancelation: context.Background(), + }, pipeline)) + + // create and run second instance instance + inp, err = manager.Create(common.MustNewConfigFrom(runConfig{Max: 3})) + require.NoError(t, err) + inp.Run(input.Context{ + Logger: log, + Cancelation: context.Background(), + }, pipeline) + + // verify + assert.Equal(t, []int{0, 1, 2, 3, 4, 5}, ids) + }) + + t.Run("event ACK triggers execution of update operations", func(t *testing.T) { + defer resources.NewGoroutinesChecker().Check(t) + + store := createSampleStore(t, nil) + var wgSend sync.WaitGroup + wgSend.Add(1) + manager := constInput(t, sourceList("key"), &fakeTestInput{ + OnRun: func(ctx input.Context, _ Source, _ Cursor, pub Publisher) error { + defer wgSend.Done() + fields := common.MapStr{"hello": "world"} + pub.Publish(beat.Event{Fields: fields}, "test-cursor-state1") + pub.Publish(beat.Event{Fields: fields}, "test-cursor-state2") + pub.Publish(beat.Event{Fields: fields}, "test-cursor-state3") + pub.Publish(beat.Event{Fields: fields}, nil) + pub.Publish(beat.Event{Fields: fields}, "test-cursor-state4") + pub.Publish(beat.Event{Fields: fields}, "test-cursor-state5") + pub.Publish(beat.Event{Fields: fields}, "test-cursor-state6") + return nil + }, + }) + manager.StateStore = store + + inp, err := manager.Create(common.NewConfig()) + require.NoError(t, err) + + cancelCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // setup publishing pipeline and capture ACKer, so we can simulate progress in the Output + var acker beat.ACKer + var wgACKer sync.WaitGroup + wgACKer.Add(1) + pipeline := &pubtest.FakeConnector{ + ConnectFunc: func(cfg beat.ClientConfig) (beat.Client, error) { + defer wgACKer.Done() + acker = cfg.ACKHandler + return &pubtest.FakeClient{ + PublishFunc: func(event beat.Event) { + acker.AddEvent(event, true) + }, + }, nil + }, + } + + // start the input + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err = inp.Run(v2.Context{ + Logger: manager.Logger, + Cancelation: cancelCtx, + }, pipeline) + }() + // wait for test setup to shutdown + defer wg.Wait() + + // wait for setup complete and events being send (pending operations in the pipeline) + wgACKer.Wait() + wgSend.Wait() + + // 1. No cursor state in store yet, all operations are still pending + require.Equal(t, nil, store.snapshot()["test::key"].Cursor) + + // ACK first 2 events and check snapshot state + acker.ACKEvents(2) + require.Equal(t, "test-cursor-state2", store.snapshot()["test::key"].Cursor) + + // ACK 1 events and check snapshot state (3 events published) + acker.ACKEvents(1) + require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor) + + // ACK event without cursor update and check snapshot state not modified + acker.ACKEvents(1) + require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor) + + // ACK rest + acker.ACKEvents(3) + require.Equal(t, "test-cursor-state6", store.snapshot()["test::key"].Cursor) + }) +} + +func TestLockResource(t *testing.T) { + t.Run("can lock unused resource", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + res := store.Get("test::key") + err := lockResource(logp.NewLogger("test"), res, context.TODO()) + require.NoError(t, err) + }) + + t.Run("fail to lock resource in use when context is cancelled", func(t *testing.T) { + log := logp.NewLogger("test") + + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + resUsed := store.Get("test::key") + err := lockResource(log, resUsed, context.TODO()) + require.NoError(t, err) + + // fail to lock resource in use + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + resFail := store.Get("test::key") + err = lockResource(log, resFail, ctx) + require.Error(t, err) + resFail.Release() + + // unlock and release resource in use -> it should be marked finished now + releaseResource(resUsed) + require.True(t, resUsed.Finished()) + }) + + t.Run("succeed to lock resource after it has been released", func(t *testing.T) { + log := logp.NewLogger("test") + + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + resUsed := store.Get("test::key") + err := lockResource(log, resUsed, context.TODO()) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + resOther := store.Get("test::key") + err := lockResource(log, resOther, context.TODO()) + if err == nil { + releaseResource(resOther) + } + }() + + go func() { + time.Sleep(100 * time.Millisecond) + releaseResource(resUsed) + }() + + wg.Wait() // <- block forever if waiting go-routine can not acquire lock + }) +} + +func (s stringSource) Name() string { return string(s) } + +func simpleManagerWithConfigure(t *testing.T, configure func(*common.Config) ([]Source, Input, error)) *InputManager { + return &InputManager{ + Logger: logp.NewLogger("test"), + StateStore: createSampleStore(t, nil), + Type: "test", + Configure: configure, + } +} + +func constConfigureResult(t *testing.T, sources []Source, inp Input, err error) *InputManager { + return simpleManagerWithConfigure(t, func(cfg *common.Config) ([]Source, Input, error) { + return sources, inp, err + }) +} + +func failingManager(t *testing.T, err error) *InputManager { + return constConfigureResult(t, nil, nil, err) +} + +func constInput(t *testing.T, sources []Source, inp Input) *InputManager { + return constConfigureResult(t, sources, inp, nil) +} + +func (f *fakeTestInput) Name() string { return "test" } + +func (f *fakeTestInput) Test(source Source, ctx input.TestContext) error { + if f.OnTest != nil { + return f.OnTest(source, ctx) + } + return nil +} + +func (f *fakeTestInput) Run(ctx input.Context, source Source, cursor Cursor, pub Publisher) error { + if f.OnRun != nil { + return f.OnRun(ctx, source, cursor, pub) + } + return nil +} + +func sourceList(names ...string) []Source { + tmp := make([]Source, len(names)) + for i, name := range names { + tmp[i] = stringSource(name) + } + return tmp +} diff --git a/filebeat/input/filestream/internal/input-logfile/prospector.go b/filebeat/input/filestream/internal/input-logfile/prospector.go new file mode 100644 index 00000000000..9488596eb2c --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/prospector.go @@ -0,0 +1,35 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/statestore" +) + +// Prospector is responsible for starting, stopping harvesters +// based on the retrieved information about the configured paths. +// It also updates the statestore with the meta data of the running harvesters. +type Prospector interface { + // Run starts the event loop and handles the incoming events + // either by starting/stopping a harvester, or updating the statestore. + Run(input.Context, *statestore.Store, *HarvesterGroup) + // Test checks if the Prospector is able to run the configuration + // specified by the user. + Test() error +} diff --git a/filebeat/input/filestream/internal/input-logfile/publish.go b/filebeat/input/filestream/internal/input-logfile/publish.go new file mode 100644 index 00000000000..547a82c479f --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/publish.go @@ -0,0 +1,153 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "time" + + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/statestore" +) + +// Publisher is used to publish an event and update the cursor in a single call to Publish. +// Inputs are allowed to pass `nil` as cursor state. In this case the state is not updated, but the +// event will still be published as is. +type Publisher interface { + Publish(event beat.Event, cursor interface{}) error +} + +// cursorPublisher implements the Publisher interface and used internally by the managedInput. +// When publishing an event with cursor state updates, the cursorPublisher +// updates the in memory state and create an updateOp that is used to schedule +// an update for the persistent store. The updateOp is run by the inputs ACK +// handler, persisting the pending update. +type cursorPublisher struct { + canceler input.Canceler + client beat.Client + cursor *Cursor +} + +// updateOp keeps track of pending updates that are not written to the persistent store yet. +// Update operations are ordered. The input manager guarantees that only one +// input can create update operation for a source, such that new input +// instances can add update operations to be executed after already pending +// update operations from older inputs instances that have been shutdown. +type updateOp struct { + store *store + resource *resource + + // state updates to persist + timestamp time.Time + ttl time.Duration + delta interface{} +} + +// Publish publishes an event. Publish returns false if the inputs cancellation context has been marked as done. +// If cursorUpdate is not nil, Publish updates the in memory state and create and updateOp for the pending update. +// It overwrite event.Private with the update operation, before finally sending the event. +// The ACK ordering in the publisher pipeline guarantees that update operations +// will be ACKed and executed in the correct order. +func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) error { + if cursorUpdate == nil { + return c.forward(event) + } + + op, err := createUpdateOp(c.cursor.store, c.cursor.resource, cursorUpdate) + if err != nil { + return err + } + + event.Private = op + return c.forward(event) +} + +func (c *cursorPublisher) forward(event beat.Event) error { + c.client.Publish(event) + if c.canceler == nil { + return nil + } + return c.canceler.Err() +} + +func createUpdateOp(store *store, resource *resource, updates interface{}) (*updateOp, error) { + ts := time.Now() + + resource.stateMutex.Lock() + defer resource.stateMutex.Unlock() + + cursor := resource.pendingCursor + if resource.activeCursorOperations == 0 { + var tmp interface{} + typeconv.Convert(&tmp, cursor) + resource.pendingCursor = tmp + cursor = tmp + } + if err := typeconv.Convert(&cursor, updates); err != nil { + return nil, err + } + resource.pendingCursor = cursor + + resource.Retain() + resource.activeCursorOperations++ + return &updateOp{ + resource: resource, + store: store, + timestamp: ts, + delta: updates, + }, nil +} + +// done releases resources held by the last N updateOps. +func (op *updateOp) done(n uint) { + op.resource.UpdatesReleaseN(n) + op.resource = nil + *op = updateOp{} +} + +// Execute updates the persistent store with the scheduled changes and releases the resource. +func (op *updateOp) Execute(n uint) { + resource := op.resource + defer op.done(n) + + resource.stateMutex.Lock() + defer resource.stateMutex.Unlock() + + resource.activeCursorOperations -= n + if resource.activeCursorOperations == 0 { + resource.cursor = resource.pendingCursor + resource.pendingCursor = nil + } else { + typeconv.Convert(&resource.cursor, op.delta) + } + + if resource.internalState.Updated.Before(op.timestamp) { + resource.internalState.Updated = op.timestamp + } + + err := op.store.persistentStore.Set(resource.key, resource.inSyncStateSnapshot()) + if err != nil { + if !statestore.IsClosed(err) { + op.store.log.Errorf("Failed to update state in the registry for '%v'", resource.key) + } + } else { + resource.internalInSync = true + resource.stored = true + } +} diff --git a/filebeat/input/filestream/internal/input-logfile/publish_test.go b/filebeat/input/filestream/internal/input-logfile/publish_test.go new file mode 100644 index 00000000000..ede25670a95 --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/publish_test.go @@ -0,0 +1,158 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" +) + +func TestPublish(t *testing.T) { + t.Run("event with cursor state creates update operation", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + cursor := makeCursor(store, store.Get("test::key")) + + var actual beat.Event + client := &pubtest.FakeClient{ + PublishFunc: func(event beat.Event) { actual = event }, + } + publisher := cursorPublisher{nil, client, &cursor} + publisher.Publish(beat.Event{}, "test") + + require.NotNil(t, actual.Private) + }) + + t.Run("event without cursor creates no update operation", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + cursor := makeCursor(store, store.Get("test::key")) + + var actual beat.Event + client := &pubtest.FakeClient{ + PublishFunc: func(event beat.Event) { actual = event }, + } + publisher := cursorPublisher{nil, client, &cursor} + publisher.Publish(beat.Event{}, nil) + require.Nil(t, actual.Private) + }) + + t.Run("publish returns error if context has been cancelled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + cursor := makeCursor(store, store.Get("test::key")) + + publisher := cursorPublisher{ctx, &pubtest.FakeClient{}, &cursor} + err := publisher.Publish(beat.Event{}, nil) + require.Equal(t, context.Canceled, err) + }) +} + +func TestOp_Execute(t *testing.T) { + t.Run("applying final op marks the key as finished", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + res := store.Get("test::key") + + // create op and release resource. The 'resource' must still be active + op := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state") + res.Release() + require.False(t, res.Finished()) + + // this was the last op, the resource should become inactive + op.Execute(1) + require.True(t, res.Finished()) + + // validate state: + inSyncCursor := storeInSyncSnapshot(store)["test::key"].Cursor + inMemCursor := storeMemorySnapshot(store)["test::key"].Cursor + want := "test-updated-cursor-state" + assert.Equal(t, want, inSyncCursor) + assert.Equal(t, want, inMemCursor) + }) + + t.Run("acking multiple ops applies the latest update and marks key as finished", func(t *testing.T) { + // when acking N events, intermediate updates are dropped in favor of the latest update operation. + // This test checks that the resource is correctly marked as finished. + + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + res := store.Get("test::key") + + // create update operations and release resource. The 'resource' must still be active + mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-dropped") + op := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-final") + res.Release() + require.False(t, res.Finished()) + + // this was the last op, the resource should become inactive + op.Execute(2) + require.True(t, res.Finished()) + + // validate state: + inSyncCursor := storeInSyncSnapshot(store)["test::key"].Cursor + inMemCursor := storeMemorySnapshot(store)["test::key"].Cursor + want := "test-updated-cursor-state-final" + assert.Equal(t, want, inSyncCursor) + assert.Equal(t, want, inMemCursor) + }) + + t.Run("ACK only subset of pending ops will only update up to ACKed state", func(t *testing.T) { + // when acking N events, intermediate updates are dropped in favor of the latest update operation. + // This test checks that the resource is correctly marked as finished. + + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + res := store.Get("test::key") + + // create update operations and release resource. The 'resource' must still be active + op1 := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-intermediate") + op2 := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-final") + res.Release() + require.False(t, res.Finished()) + + defer op2.done(1) // cleanup after test + + // this was the intermediate op, the resource should still be active + op1.Execute(1) + require.False(t, res.Finished()) + + // validate state (in memory state is always up to data to most recent update): + inSyncCursor := storeInSyncSnapshot(store)["test::key"].Cursor + inMemCursor := storeMemorySnapshot(store)["test::key"].Cursor + assert.Equal(t, "test-updated-cursor-state-intermediate", inSyncCursor) + assert.Equal(t, "test-updated-cursor-state-final", inMemCursor) + }) +} + +func mustCreateUpdateOp(t *testing.T, store *store, resource *resource, updates interface{}) *updateOp { + op, err := createUpdateOp(store, resource, updates) + if err != nil { + t.Fatalf("Failed to create update op: %v", err) + } + return op +} diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go new file mode 100644 index 00000000000..8267565f551 --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -0,0 +1,324 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "strings" + "sync" + "time" + + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/common/cleanup" + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/go-concert" + "github.com/elastic/go-concert/unison" +) + +// store encapsulates the persistent store and the in memory state store, that +// can be ahead of the the persistent store. +// The store lifetime is managed by a reference counter. Once all owners (the +// session, and the resource cleaner) have dropped ownership, backing resources +// will be released and closed. +type store struct { + log *logp.Logger + refCount concert.RefCount + persistentStore *statestore.Store + ephemeralStore *states +} + +// states stores resource states in memory. When a cursor for an input is updated, +// it's state is updated first. The entry in the persistent store 'follows' the internal state. +// As long as a resources stored in states is not 'Finished', the in memory +// store is assumed to be ahead (in memory and persistent state are out of +// sync). +type states struct { + mu sync.Mutex + table map[string]*resource +} + +// resource holds the in memory state and keeps track of pending updates and inputs collecting +// event for the resource its key. +// A resource is assumed active for as long as at least one input has (or tries +// to) acuired the lock, and as long as there are pending updateOp instances in +// the pipeline not ACKed yet. The key can not gc'ed by the cleaner, as long as the resource is active. +// +// State chagnes and writes to the persistent store are protected using the +// stateMutex, to ensure full consistency between direct writes and updates +// after ACK. +type resource struct { + // pending counts the number of Inputs and outstanding registry updates. + // as long as pending is > 0 the resource is in used and must not be garbage collected. + pending atomic.Uint64 + + // lock guarantees only one input create updates for this entry + lock unison.Mutex + + // key of the resource as used for the registry. + key string + + // stateMutex is used to lock the resource when it is update/read from + // multiple go-routines like the ACK handler or the input publishing an + // event. + // stateMutex is used to access the fields 'stored', 'state' and 'internalInSync' + stateMutex sync.Mutex + + // stored indicates that the state is available in the registry file. It is false for new entries. + stored bool + + // internalInSync is true if all 'Internal' metadata like TTL or update timestamp are in sync. + // Normally resources are added when being created. But if operations failed we will retry inserting + // them on each update operation until we eventually succeeded + internalInSync bool + + activeCursorOperations uint + internalState stateInternal + + // cursor states. The cursor holds the state as it is currently known to the + // persistent store, while pendingCursor contains the most recent update + // (in-memory state), that still needs to be synced to the persistent store. + // The pendingCursor is nil if there are no pending updates. + // When processing update operations on ACKs, the state is applied to cursor + // first, which is finally written to the persistent store. This ensures that + // we always write the complete state of the key/value pair. + cursor interface{} + pendingCursor interface{} +} + +type ( + // state represents the full document as it is stored in the registry. + // + // The TTL and Update fields are for internal use only. + // + // The `Cursor` namespace is used to store the cursor information that are + // required to continue processing from the last known position. Cursor + // updates in the registry file are only executed after events have been + // ACKed by the outputs. Therefore the cursor MUST NOT include any + // information that are require to identify/track the source we are + // collecting from. + state struct { + TTL time.Duration + Updated time.Time + Cursor interface{} + } + + stateInternal struct { + TTL time.Duration + Updated time.Time + } +) + +// hook into store close for testing purposes +var closeStore = (*store).close + +func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) { + ok := false + + persistentStore, err := statestore.Access() + if err != nil { + return nil, err + } + defer cleanup.IfNot(&ok, func() { persistentStore.Close() }) + + states, err := readStates(log, persistentStore, prefix) + if err != nil { + return nil, err + } + + ok = true + return &store{ + log: log, + persistentStore: persistentStore, + ephemeralStore: states, + }, nil +} + +func (s *store) Retain() { s.refCount.Retain() } +func (s *store) Release() { + if s.refCount.Release() { + closeStore(s) + } +} + +func (s *store) close() { + if err := s.persistentStore.Close(); err != nil { + s.log.Errorf("Closing registry store did report an error: %+v", err) + } +} + +// Get returns the resource for the key. +// A new shared resource is generated if the key is not known. The generated +// resource is not synced to disk yet. +func (s *store) Get(key string) *resource { + return s.ephemeralStore.Find(key, true) +} + +// UpdateTTL updates the time-to-live of a resource. Inactive resources with expired TTL are subject to removal. +// The TTL value is part of the internal state, and will be written immediately to the persistent store. +// On update the resource its `cursor` state is used, to keep the cursor state in sync with the current known +// on disk store state. +func (s *store) UpdateTTL(resource *resource, ttl time.Duration) { + resource.stateMutex.Lock() + defer resource.stateMutex.Unlock() + if resource.stored && resource.internalState.TTL == ttl { + return + } + + resource.internalState.TTL = ttl + if resource.internalState.Updated.IsZero() { + resource.internalState.Updated = time.Now() + } + + err := s.persistentStore.Set(resource.key, state{ + TTL: resource.internalState.TTL, + Updated: resource.internalState.Updated, + Cursor: resource.cursor, + }) + if err != nil { + s.log.Errorf("Failed to update resource management fields for '%v'", resource.key) + resource.internalInSync = false + } else { + resource.stored = true + resource.internalInSync = true + } +} + +// Find returns the resource for a given key. If the key is unknown and create is set to false nil will be returned. +// The resource returned by Find is marked as active. (*resource).Release must be called to mark the resource as inactive again. +func (s *states) Find(key string, create bool) *resource { + s.mu.Lock() + defer s.mu.Unlock() + + if resource := s.table[key]; resource != nil { + resource.Retain() + return resource + } + + if !create { + return nil + } + + // resource is owned by table(session) and input that uses the resource. + resource := &resource{ + stored: false, + key: key, + lock: unison.MakeMutex(), + } + s.table[key] = resource + resource.Retain() + return resource +} + +// IsNew returns true if we have no state recorded for the current resource. +func (r *resource) IsNew() bool { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + return r.pendingCursor == nil && r.cursor == nil +} + +// Retain is used to indicate that 'resource' gets an additional 'owner'. +// Owners of an resource can be active inputs or pending update operations +// not yet written to disk. +func (r *resource) Retain() { r.pending.Inc() } + +// Release reduced the owner ship counter of the resource. +func (r *resource) Release() { r.pending.Dec() } + +// UpdatesReleaseN is used to release ownership of N pending update operations. +func (r *resource) UpdatesReleaseN(n uint) { + r.pending.Sub(uint64(n)) +} + +// Finished returns true if the resource is not in use and if there are no pending updates +// that still need to be written to the registry. +func (r *resource) Finished() bool { return r.pending.Load() == 0 } + +// UnpackCursor deserializes the in memory state. +func (r *resource) UnpackCursor(to interface{}) error { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + if r.activeCursorOperations == 0 { + return typeconv.Convert(to, r.cursor) + } + return typeconv.Convert(to, r.pendingCursor) +} + +// syncStateSnapshot returns the current insync state based on already ACKed update operations. +func (r *resource) inSyncStateSnapshot() state { + return state{ + TTL: r.internalState.TTL, + Updated: r.internalState.Updated, + Cursor: r.cursor, + } +} + +// stateSnapshot returns the current in memory state, that already contains state updates +// not yet ACKed. +func (r *resource) stateSnapshot() state { + cursor := r.pendingCursor + if r.activeCursorOperations == 0 { + cursor = r.cursor + } + + return state{ + TTL: r.internalState.TTL, + Updated: r.internalState.Updated, + Cursor: cursor, + } +} + +func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*states, error) { + keyPrefix := prefix + "::" + states := &states{ + table: map[string]*resource{}, + } + + err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(string(key), keyPrefix) { + return true, nil + } + + var st state + if err := dec.Decode(&st); err != nil { + log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", + key, err) + return true, nil + } + + resource := &resource{ + key: key, + stored: true, + lock: unison.MakeMutex(), + internalInSync: true, + internalState: stateInternal{ + TTL: st.TTL, + Updated: st.Updated, + }, + cursor: st.Cursor, + } + states.table[resource.key] = resource + + return true, nil + }) + + if err != nil { + return nil, err + } + return states, nil +} diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go new file mode 100644 index 00000000000..71ea41298b2 --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/store_test.go @@ -0,0 +1,351 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "errors" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" +) + +type testStateStore struct { + Store *statestore.Store + GCPeriod time.Duration +} + +func TestStore_OpenClose(t *testing.T) { + t.Run("releasing store closes", func(t *testing.T) { + var closed bool + cleanup := closeStoreWith(func(s *store) { + closed = true + s.close() + }) + defer cleanup() + + store := testOpenStore(t, "test", nil) + store.Release() + + require.True(t, closed) + }) + + t.Run("fail if persistent store can not be accessed", func(t *testing.T) { + _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test") + require.Error(t, err) + }) + + t.Run("load from empty", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + require.Equal(t, 0, len(storeMemorySnapshot(store))) + require.Equal(t, 0, len(storeInSyncSnapshot(store))) + }) + + t.Run("already available state is loaded", func(t *testing.T) { + states := map[string]state{ + "test::key0": {Cursor: "1"}, + "test::key1": {Cursor: "2"}, + } + + store := testOpenStore(t, "test", createSampleStore(t, states)) + defer store.Release() + + checkEqualStoreState(t, states, storeMemorySnapshot(store)) + checkEqualStoreState(t, states, storeInSyncSnapshot(store)) + }) + + t.Run("ignore entries with wrong index on open", func(t *testing.T) { + states := map[string]state{ + "test::key0": {Cursor: "1"}, + "other::key": {Cursor: "2"}, + } + + store := testOpenStore(t, "test", createSampleStore(t, states)) + defer store.Release() + + want := map[string]state{ + "test::key0": {Cursor: "1"}, + } + checkEqualStoreState(t, want, storeMemorySnapshot(store)) + checkEqualStoreState(t, want, storeInSyncSnapshot(store)) + }) +} + +func TestStore_Get(t *testing.T) { + t.Run("find existing resource", func(t *testing.T) { + cursorState := state{Cursor: "1"} + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key0": cursorState, + })) + defer store.Release() + + res := store.Get("test::key0") + require.NotNil(t, res) + defer res.Release() + + // check in memory state matches matches original persistent state + require.Equal(t, cursorState, res.stateSnapshot()) + // check assumed in-sync state matches matches original persistent state + require.Equal(t, cursorState, res.inSyncStateSnapshot()) + }) + + t.Run("access unknown resource", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + res := store.Get("test::key") + require.NotNil(t, res) + defer res.Release() + + // new resource has empty state + require.Equal(t, state{}, res.stateSnapshot()) + }) + + t.Run("same resource is returned", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + res1 := store.Get("test::key") + require.NotNil(t, res1) + defer res1.Release() + + res2 := store.Get("test::key") + require.NotNil(t, res2) + defer res2.Release() + + assert.Equal(t, res1, res2) + }) +} + +func TestStore_UpdateTTL(t *testing.T) { + t.Run("add TTL for new entry to store", func(t *testing.T) { + // when creating a resource we set the TTL and insert a new key value pair without cursor value into the store: + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + res := store.Get("test::key") + store.UpdateTTL(res, 60*time.Second) + + want := map[string]state{ + "test::key": { + TTL: 60 * time.Second, + Updated: res.internalState.Updated, + Cursor: nil, + }, + } + + checkEqualStoreState(t, want, storeMemorySnapshot(store)) + checkEqualStoreState(t, want, storeInSyncSnapshot(store)) + }) + + t.Run("update TTL for in-sync resource does not overwrite state", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": { + TTL: 1 * time.Second, + Cursor: "test", + }, + })) + defer store.Release() + + res := store.Get("test::key") + store.UpdateTTL(res, 60*time.Second) + want := map[string]state{ + "test::key": { + Updated: res.internalState.Updated, + TTL: 60 * time.Second, + Cursor: "test", + }, + } + + checkEqualStoreState(t, want, storeMemorySnapshot(store)) + checkEqualStoreState(t, want, storeInSyncSnapshot(store)) + }) + + t.Run("update TTL for resource with pending updates", func(t *testing.T) { + // This test updates the resource TTL while update operations are still + // pending, but not synced to the persistent store yet. + // UpdateTTL changes the state in the persistent store immediately, and must therefore + // serialize the old in-sync state with update meta-data. + + // create store + backend := createSampleStore(t, map[string]state{ + "test::key": { + TTL: 1 * time.Second, + Cursor: "test", + }, + }) + store := testOpenStore(t, "test", backend) + defer store.Release() + + // create pending update operation + res := store.Get("test::key") + op, err := createUpdateOp(store, res, "test-state-update") + require.NoError(t, err) + defer op.done(1) + + // Update key/value pair TTL. This will update the internal state in the + // persistent store only, not modifying the old cursor state yet. + store.UpdateTTL(res, 60*time.Second) + + // validate + wantMemoryState := state{ + Updated: res.internalState.Updated, + TTL: 60 * time.Second, + Cursor: "test-state-update", + } + wantInSyncState := state{ + Updated: res.internalState.Updated, + TTL: 60 * time.Second, + Cursor: "test", + } + + checkEqualStoreState(t, map[string]state{"test::key": wantMemoryState}, storeMemorySnapshot(store)) + checkEqualStoreState(t, map[string]state{"test::key": wantInSyncState}, storeInSyncSnapshot(store)) + checkEqualStoreState(t, map[string]state{"test::key": wantInSyncState}, backend.snapshot()) + }) +} + +func closeStoreWith(fn func(s *store)) func() { + old := closeStore + closeStore = fn + return func() { + closeStore = old + } +} + +func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *store { + if persistentStore == nil { + persistentStore = createSampleStore(t, nil) + } + + store, err := openStore(logp.NewLogger("test"), persistentStore, prefix) + if err != nil { + t.Fatalf("failed to open the store") + } + return store +} + +func createSampleStore(t *testing.T, data map[string]state) testStateStore { + storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) + store, err := storeReg.Get("test") + if err != nil { + t.Fatalf("Failed to access store: %v", err) + } + + for k, v := range data { + if err := store.Set(k, v); err != nil { + t.Fatalf("Error when populating the sample store: %v", err) + } + } + + return testStateStore{ + Store: store, + } +} + +func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts } +func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod } +func (ts testStateStore) Access() (*statestore.Store, error) { + if ts.Store == nil { + return nil, errors.New("no store configured") + } + return ts.Store, nil +} + +// snapshot copies all key/value pairs from the persistent store into a table for inspection. +func (ts testStateStore) snapshot() map[string]state { + states := map[string]state{} + err := ts.Store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + var st state + if err := dec.Decode(&st); err != nil { + return false, err + } + states[key] = st + return true, nil + }) + + if err != nil { + panic("unexpected decode error from persistent test store") + } + return states +} + +// storeMemorySnapshot copies all key/value pairs into a table for inspection. +// The state returned reflects the in memory state, which can be ahead of the +// persistent state. +// +// Note: The state returned by storeMemorySnapshot is always ahead of the state returned by storeInSyncSnapshot. +// All key value pairs are fully in-sync, if both snapshot functions return the same state. +func storeMemorySnapshot(store *store) map[string]state { + store.ephemeralStore.mu.Lock() + defer store.ephemeralStore.mu.Unlock() + + states := map[string]state{} + for k, res := range store.ephemeralStore.table { + states[k] = res.stateSnapshot() + } + return states +} + +// storeInSyncSnapshot copies all key/value pairs into the table for inspection. +// The state returned reflects the current state that the in-memory tables assumed to be +// written to the persistent store already. + +// Note: The state returned by storeMemorySnapshot is always ahead of the state returned by storeInSyncSnapshot. +// All key value pairs are fully in-sync, if both snapshot functions return the same state. +func storeInSyncSnapshot(store *store) map[string]state { + store.ephemeralStore.mu.Lock() + defer store.ephemeralStore.mu.Unlock() + + states := map[string]state{} + for k, res := range store.ephemeralStore.table { + states[k] = res.inSyncStateSnapshot() + } + return states +} + +// checkEqualStoreState compares 2 store snapshot tables for equality. The test +// fails with Errorf if the state differ. +// +// Note: testify is too strict when comparing timestamp, better use checkEqualStoreState. +func checkEqualStoreState(t *testing.T, want, got map[string]state) bool { + if d := cmp.Diff(want, got); d != "" { + t.Errorf("store state mismatch (-want +got):\n%s", d) + return false + } + return true +} + +// requireEqualStoreState compares 2 store snapshot tables for equality. The test +// fails with Fatalf if the state differ. +// +// Note: testify is too strict when comparing timestamp, better use checkEqualStoreState. +func requireEqualStoreState(t *testing.T, want, got map[string]state) bool { + if d := cmp.Diff(want, got); d != "" { + t.Fatalf("store state mismatch (-want +got):\n%s", d) + return false + } + return true +} diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go new file mode 100644 index 00000000000..257574b9ca1 --- /dev/null +++ b/filebeat/input/filestream/prospector.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/statestore" +) + +// fileProspector implements the Prospector interface. +// It contains a file scanner which returns file system events. +// The FS events then trigger either new Harvester runs or updates +// the statestore. +type fileProspector struct{} + +func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.HarvesterGroup) { + panic("TODO: implement me") +} + +func (p *fileProspector) Test() error { + panic("TODO: implement me") +} diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 365da416ed3..ba5b14b1a0c 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -281,6 +281,8 @@ func (p *Input) getFiles() map[string]os.FileInfo { continue } + logp.Debug("Checking matches of %s: %v", path, matches) + OUTER: // Check any matched files to see if we need to start a harvester for _, file := range matches { From a11899f51a0be19dc7cb32e3756f04bb1b8f7c33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 30 Sep 2020 17:37:44 +0200 Subject: [PATCH 2/3] rm accidentally commited file --- filebeat/input/log/input.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index ba5b14b1a0c..365da416ed3 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -281,8 +281,6 @@ func (p *Input) getFiles() map[string]os.FileInfo { continue } - logp.Debug("Checking matches of %s: %v", path, matches) - OUTER: // Check any matched files to see if we need to start a harvester for _, file := range matches { From 1c303b8d5f5282f6c9a72991019f794fb536c673 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 30 Sep 2020 18:05:55 +0200 Subject: [PATCH 3/3] minor fixes --- .../internal/input-logfile/harvester.go | 10 +- .../internal/input-logfile/manager_test.go | 600 ------------------ 2 files changed, 7 insertions(+), 603 deletions(-) delete mode 100644 filebeat/input/filestream/internal/input-logfile/manager_test.go diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index f89ced83abb..d2f184cac7b 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -63,10 +63,12 @@ func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error { resource, err := hg.manager.lock(ctx, s.Name()) if err != nil { + cancelHarvester() return err } if _, ok := hg.readers[s.Name()]; ok { + cancelHarvester() log.Debug("A harvester is already running for file") return nil } @@ -79,17 +81,19 @@ func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error { ACKHandler: newInputACKHandler(ctx.Logger), }) if err != nil { + cancelHarvester() return err } cursor := makeCursor(hg.store, resource) publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor} - go func() { + go func(cancel context.CancelFunc) { defer client.Close() defer log.Debug("Stopped harvester for file") - defer cancelHarvester() + defer cancel() defer releaseResource(resource) + defer delete(hg.readers, s.Name()) defer func() { if v := recover(); v != nil { @@ -102,7 +106,7 @@ func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error { if err != nil { log.Errorf("Harvester stopped: %v", err) } - }() + }(cancelHarvester) return nil } diff --git a/filebeat/input/filestream/internal/input-logfile/manager_test.go b/filebeat/input/filestream/internal/input-logfile/manager_test.go deleted file mode 100644 index 4a9342cd35f..00000000000 --- a/filebeat/input/filestream/internal/input-logfile/manager_test.go +++ /dev/null @@ -1,600 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package input_logfile - -import ( - "context" - "errors" - "fmt" - "runtime" - "sort" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - input "github.com/elastic/beats/v7/filebeat/input/v2" - v2 "github.com/elastic/beats/v7/filebeat/input/v2" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" - pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" - "github.com/elastic/beats/v7/libbeat/tests/resources" - "github.com/elastic/go-concert/unison" -) - -type fakeTestInput struct { - OnTest func(Source, input.TestContext) error - OnRun func(input.Context, Source, Cursor, Publisher) error -} - -type stringSource string - -func TestManager_Init(t *testing.T) { - // Integration style tests for the InputManager and the state garbage collector - - t.Run("stopping the taskgroup kills internal go-routines", func(t *testing.T) { - numRoutines := runtime.NumGoroutine() - - var grp unison.TaskGroup - store := createSampleStore(t, nil) - manager := &InputManager{ - Logger: logp.NewLogger("test"), - StateStore: store, - Type: "test", - DefaultCleanTimeout: 10 * time.Millisecond, - } - - err := manager.Init(&grp, v2.ModeRun) - require.NoError(t, err) - - time.Sleep(200 * time.Millisecond) - grp.Stop() - - // wait for all go-routines to be gone - - for numRoutines < runtime.NumGoroutine() { - time.Sleep(1 * time.Millisecond) - } - }) - - t.Run("collect old entries after startup", func(t *testing.T) { - store := createSampleStore(t, map[string]state{ - "test::key": { - TTL: 1 * time.Millisecond, - Updated: time.Now().Add(-24 * time.Hour), - }, - }) - store.GCPeriod = 10 * time.Millisecond - - var grp unison.TaskGroup - defer grp.Stop() - manager := &InputManager{ - Logger: logp.NewLogger("test"), - StateStore: store, - Type: "test", - DefaultCleanTimeout: 10 * time.Millisecond, - } - - err := manager.Init(&grp, v2.ModeRun) - require.NoError(t, err) - - for len(store.snapshot()) > 0 { - time.Sleep(1 * time.Millisecond) - } - }) -} - -func TestManager_Create(t *testing.T) { - t.Run("fail if no source is configured", func(t *testing.T) { - manager := constInput(t, nil, &fakeTestInput{}) - _, err := manager.Create(common.NewConfig()) - require.Error(t, err) - }) - - t.Run("fail if config error", func(t *testing.T) { - manager := failingManager(t, errors.New("oops")) - _, err := manager.Create(common.NewConfig()) - require.Error(t, err) - }) - - t.Run("fail if no input runner is returned", func(t *testing.T) { - manager := constInput(t, sourceList("test"), nil) - _, err := manager.Create(common.NewConfig()) - require.Error(t, err) - }) - - t.Run("configure ok", func(t *testing.T) { - manager := constInput(t, sourceList("test"), &fakeTestInput{}) - _, err := manager.Create(common.NewConfig()) - require.NoError(t, err) - }) - - t.Run("configuring inputs with overlapping sources is allowed", func(t *testing.T) { - manager := simpleManagerWithConfigure(t, func(cfg *common.Config) ([]Source, Input, error) { - config := struct{ Sources []string }{} - err := cfg.Unpack(&config) - return sourceList(config.Sources...), &fakeTestInput{}, err - }) - - _, err := manager.Create(common.MustNewConfigFrom(map[string]interface{}{ - "sources": []string{"a"}, - })) - require.NoError(t, err) - - _, err = manager.Create(common.MustNewConfigFrom(map[string]interface{}{ - "sources": []string{"a"}, - })) - require.NoError(t, err) - }) -} - -func TestManager_InputsTest(t *testing.T) { - var mu sync.Mutex - var seen []string - - sources := sourceList("source1", "source2") - - t.Run("test is run for each source", func(t *testing.T) { - defer resources.NewGoroutinesChecker().Check(t) - - manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(source Source, _ v2.TestContext) error { - mu.Lock() - defer mu.Unlock() - seen = append(seen, source.Name()) - return nil - }, - }) - - inp, err := manager.Create(common.NewConfig()) - require.NoError(t, err) - - err = inp.Test(input.TestContext{}) - require.NoError(t, err) - - sort.Strings(seen) - require.Equal(t, []string{"source1", "source2"}, seen) - }) - - t.Run("cancel gets distributed to all source tests", func(t *testing.T) { - defer resources.NewGoroutinesChecker().Check(t) - - manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(_ Source, ctx v2.TestContext) error { - <-ctx.Cancelation.Done() - return nil - }, - }) - - inp, err := manager.Create(common.NewConfig()) - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.TODO()) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err = inp.Test(input.TestContext{Cancelation: ctx}) - }() - - cancel() - wg.Wait() - require.NoError(t, err) - }) - - t.Run("fail if test for one source fails", func(t *testing.T) { - defer resources.NewGoroutinesChecker().Check(t) - - failing := Source(stringSource("source1")) - sources := []Source{failing, stringSource("source2")} - - manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(source Source, _ v2.TestContext) error { - if source == failing { - t.Log("return error") - return errors.New("oops") - } - t.Log("return ok") - return nil - }, - }) - - inp, err := manager.Create(common.NewConfig()) - require.NoError(t, err) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err = inp.Test(input.TestContext{}) - t.Logf("Test returned: %v", err) - }() - - wg.Wait() - require.Error(t, err) - }) - - t.Run("panic is captured", func(t *testing.T) { - defer resources.NewGoroutinesChecker().Check(t) - - manager := constInput(t, sources, &fakeTestInput{ - OnTest: func(source Source, _ v2.TestContext) error { - panic("oops") - }, - }) - - inp, err := manager.Create(common.NewConfig()) - require.NoError(t, err) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err = inp.Test(input.TestContext{Logger: logp.NewLogger("test")}) - t.Logf("Test returned: %v", err) - }() - - wg.Wait() - require.Error(t, err) - }) -} - -func TestManager_InputsRun(t *testing.T) { - // Integration style tests for the InputManager and Input.Run - - t.Run("input returned with error", func(t *testing.T) { - defer resources.NewGoroutinesChecker().Check(t) - - manager := constInput(t, sourceList("test"), &fakeTestInput{ - OnRun: func(_ input.Context, _ Source, _ Cursor, _ Publisher) error { - return errors.New("oops") - }, - }) - - inp, err := manager.Create(common.NewConfig()) - require.NoError(t, err) - - cancelCtx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var clientCounters pubtest.ClientCounter - err = inp.Run(v2.Context{ - Logger: manager.Logger, - Cancelation: cancelCtx, - }, clientCounters.BuildConnector()) - require.Error(t, err) - require.Equal(t, 0, clientCounters.Active()) - }) - - t.Run("panic is captured", func(t *testing.T) { - defer resources.NewGoroutinesChecker().Check(t) - - manager := constInput(t, sourceList("test"), &fakeTestInput{ - OnRun: func(_ input.Context, _ Source, _ Cursor, _ Publisher) error { - panic("oops") - }, - }) - - inp, err := manager.Create(common.NewConfig()) - require.NoError(t, err) - - cancelCtx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var clientCounters pubtest.ClientCounter - err = inp.Run(v2.Context{ - Logger: manager.Logger, - Cancelation: cancelCtx, - }, clientCounters.BuildConnector()) - require.Error(t, err) - require.Equal(t, 0, clientCounters.Active()) - }) - - t.Run("shutdown on signal", func(t *testing.T) { - defer resources.NewGoroutinesChecker().Check(t) - - manager := constInput(t, sourceList("test"), &fakeTestInput{ - OnRun: func(ctx input.Context, _ Source, _ Cursor, _ Publisher) error { - <-ctx.Cancelation.Done() - return nil - }, - }) - - inp, err := manager.Create(common.NewConfig()) - require.NoError(t, err) - - cancelCtx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var clientCounters pubtest.ClientCounter - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err = inp.Run(v2.Context{ - Logger: manager.Logger, - Cancelation: cancelCtx, - }, clientCounters.BuildConnector()) - }() - - cancel() - wg.Wait() - require.NoError(t, err) - require.Equal(t, 0, clientCounters.Active()) - }) - - t.Run("continue sending from last known position", func(t *testing.T) { - log := logp.NewLogger("test") - - type runConfig struct{ Max int } - - store := testOpenStore(t, "test", createSampleStore(t, nil)) - defer store.Release() - - manager := simpleManagerWithConfigure(t, func(cfg *common.Config) ([]Source, Input, error) { - config := runConfig{} - if err := cfg.Unpack(&config); err != nil { - return nil, nil, err - } - - inp := &fakeTestInput{ - OnRun: func(_ input.Context, _ Source, cursor Cursor, pub Publisher) error { - state := struct{ N int }{} - if !cursor.IsNew() { - if err := cursor.Unpack(&state); err != nil { - return fmt.Errorf("failed to unpack cursor: %w", err) - } - } - - for i := 0; i < config.Max; i++ { - event := beat.Event{Fields: common.MapStr{"n": state.N}} - state.N++ - pub.Publish(event, state) - } - return nil - }, - } - - return sourceList("test"), inp, nil - }) - - var ids []int - pipeline := pubtest.ConstClient(&pubtest.FakeClient{ - PublishFunc: func(event beat.Event) { - id := event.Fields["n"].(int) - ids = append(ids, id) - }, - }) - - // create and run first instance - inp, err := manager.Create(common.MustNewConfigFrom(runConfig{Max: 3})) - require.NoError(t, err) - require.NoError(t, inp.Run(input.Context{ - Logger: log, - Cancelation: context.Background(), - }, pipeline)) - - // create and run second instance instance - inp, err = manager.Create(common.MustNewConfigFrom(runConfig{Max: 3})) - require.NoError(t, err) - inp.Run(input.Context{ - Logger: log, - Cancelation: context.Background(), - }, pipeline) - - // verify - assert.Equal(t, []int{0, 1, 2, 3, 4, 5}, ids) - }) - - t.Run("event ACK triggers execution of update operations", func(t *testing.T) { - defer resources.NewGoroutinesChecker().Check(t) - - store := createSampleStore(t, nil) - var wgSend sync.WaitGroup - wgSend.Add(1) - manager := constInput(t, sourceList("key"), &fakeTestInput{ - OnRun: func(ctx input.Context, _ Source, _ Cursor, pub Publisher) error { - defer wgSend.Done() - fields := common.MapStr{"hello": "world"} - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state1") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state2") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state3") - pub.Publish(beat.Event{Fields: fields}, nil) - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state4") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state5") - pub.Publish(beat.Event{Fields: fields}, "test-cursor-state6") - return nil - }, - }) - manager.StateStore = store - - inp, err := manager.Create(common.NewConfig()) - require.NoError(t, err) - - cancelCtx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // setup publishing pipeline and capture ACKer, so we can simulate progress in the Output - var acker beat.ACKer - var wgACKer sync.WaitGroup - wgACKer.Add(1) - pipeline := &pubtest.FakeConnector{ - ConnectFunc: func(cfg beat.ClientConfig) (beat.Client, error) { - defer wgACKer.Done() - acker = cfg.ACKHandler - return &pubtest.FakeClient{ - PublishFunc: func(event beat.Event) { - acker.AddEvent(event, true) - }, - }, nil - }, - } - - // start the input - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err = inp.Run(v2.Context{ - Logger: manager.Logger, - Cancelation: cancelCtx, - }, pipeline) - }() - // wait for test setup to shutdown - defer wg.Wait() - - // wait for setup complete and events being send (pending operations in the pipeline) - wgACKer.Wait() - wgSend.Wait() - - // 1. No cursor state in store yet, all operations are still pending - require.Equal(t, nil, store.snapshot()["test::key"].Cursor) - - // ACK first 2 events and check snapshot state - acker.ACKEvents(2) - require.Equal(t, "test-cursor-state2", store.snapshot()["test::key"].Cursor) - - // ACK 1 events and check snapshot state (3 events published) - acker.ACKEvents(1) - require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor) - - // ACK event without cursor update and check snapshot state not modified - acker.ACKEvents(1) - require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor) - - // ACK rest - acker.ACKEvents(3) - require.Equal(t, "test-cursor-state6", store.snapshot()["test::key"].Cursor) - }) -} - -func TestLockResource(t *testing.T) { - t.Run("can lock unused resource", func(t *testing.T) { - store := testOpenStore(t, "test", createSampleStore(t, nil)) - defer store.Release() - - res := store.Get("test::key") - err := lockResource(logp.NewLogger("test"), res, context.TODO()) - require.NoError(t, err) - }) - - t.Run("fail to lock resource in use when context is cancelled", func(t *testing.T) { - log := logp.NewLogger("test") - - store := testOpenStore(t, "test", createSampleStore(t, nil)) - defer store.Release() - - resUsed := store.Get("test::key") - err := lockResource(log, resUsed, context.TODO()) - require.NoError(t, err) - - // fail to lock resource in use - ctx, cancel := context.WithCancel(context.TODO()) - cancel() - resFail := store.Get("test::key") - err = lockResource(log, resFail, ctx) - require.Error(t, err) - resFail.Release() - - // unlock and release resource in use -> it should be marked finished now - releaseResource(resUsed) - require.True(t, resUsed.Finished()) - }) - - t.Run("succeed to lock resource after it has been released", func(t *testing.T) { - log := logp.NewLogger("test") - - store := testOpenStore(t, "test", createSampleStore(t, nil)) - defer store.Release() - - resUsed := store.Get("test::key") - err := lockResource(log, resUsed, context.TODO()) - require.NoError(t, err) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - resOther := store.Get("test::key") - err := lockResource(log, resOther, context.TODO()) - if err == nil { - releaseResource(resOther) - } - }() - - go func() { - time.Sleep(100 * time.Millisecond) - releaseResource(resUsed) - }() - - wg.Wait() // <- block forever if waiting go-routine can not acquire lock - }) -} - -func (s stringSource) Name() string { return string(s) } - -func simpleManagerWithConfigure(t *testing.T, configure func(*common.Config) ([]Source, Input, error)) *InputManager { - return &InputManager{ - Logger: logp.NewLogger("test"), - StateStore: createSampleStore(t, nil), - Type: "test", - Configure: configure, - } -} - -func constConfigureResult(t *testing.T, sources []Source, inp Input, err error) *InputManager { - return simpleManagerWithConfigure(t, func(cfg *common.Config) ([]Source, Input, error) { - return sources, inp, err - }) -} - -func failingManager(t *testing.T, err error) *InputManager { - return constConfigureResult(t, nil, nil, err) -} - -func constInput(t *testing.T, sources []Source, inp Input) *InputManager { - return constConfigureResult(t, sources, inp, nil) -} - -func (f *fakeTestInput) Name() string { return "test" } - -func (f *fakeTestInput) Test(source Source, ctx input.TestContext) error { - if f.OnTest != nil { - return f.OnTest(source, ctx) - } - return nil -} - -func (f *fakeTestInput) Run(ctx input.Context, source Source, cursor Cursor, pub Publisher) error { - if f.OnRun != nil { - return f.OnRun(ctx, source, cursor, pub) - } - return nil -} - -func sourceList(names ...string) []Source { - tmp := make([]Source, len(names)) - for i, name := range names { - tmp[i] = stringSource(name) - } - return tmp -}