Skip to content

Commit

Permalink
Add initial skeleton of filestream input (elastic#21427)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds the skeleton of the new `filestream` input. The name of the input can be changed. The input was renamed from `logfile` because we are not going to provide the same options as the current `log` input. As `logfile` is already used by Agent for the `log` input, it is easier to adopt a new input with a different name.

The PR seems big, but the contents of `filebeat/input/filestream/internal/input-logfile` is basically the same as `filebeat/input/v2/input-cursor`. It is separated into a different folder because when the time comes, we would like to unify the two input types. The main difference between the two inputs is that the `configure` function of `input-logfile` returns a `Prospector` which finds inputs dynamically. Whereas `input-cursor` requires a list of paths without globbing.

The following files need review:

* filebeat/input/filestream/input.go
* filebeat/input/filestream/internal/input-logfile/fswatch.go
* filebeat/input/filestream/internal/input-logfile/harvester.go
* filebeat/input/filestream/internal/input-logfile/input.go
* filebeat/input/filestream/internal/input-logfile/prospector.go
* filebeat/input/filestream/prospector.go

The others are the same as `input-cursor`.

Also, updated tests are coming in a new PR.

## Related issues

First step elastic#20243

(cherry picked from commit cb624cf)
  • Loading branch information
kvch committed Oct 1, 2020
1 parent 385ec4e commit 0a1294d
Show file tree
Hide file tree
Showing 16 changed files with 2,133 additions and 0 deletions.
68 changes: 68 additions & 0 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/logp"
)

// filestream is the input for reading from files which
// are actively written by other applications.
type filestream struct{}

const pluginName = "filestream"

// Plugin creates a new filestream input plugin for creating a stateful input.
func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin {
return input.Plugin{
Name: pluginName,
Stability: feature.Experimental,
Deprecated: false,
Info: "filestream input",
Doc: "The filestream input collects logs from the local filestream service",
Manager: &loginp.InputManager{
Logger: log,
StateStore: store,
Type: pluginName,
Configure: configure,
},
}
}

func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) {
panic("TODO: implement me")
}

func (inp *filestream) Name() string { return pluginName }

func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error {
panic("TODO: implement me")
}

func (inp *filestream) Run(
ctx input.Context,
src loginp.Source,
cursor loginp.Cursor,
publisher loginp.Publisher,
) error {
panic("TODO: implement me")
}
124 changes: 124 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package input_logfile

import (
"time"

"github.com/elastic/go-concert/timed"
"github.com/elastic/go-concert/unison"

"github.com/elastic/beats/v7/libbeat/logp"
)

// cleaner removes finished entries from the registry file.
type cleaner struct {
log *logp.Logger
}

// run starts a loop that tries to clean entries from the registry.
// The cleaner locks the store, such that no new states can be created
// during the cleanup phase. Only resources that are finished and whos TTL
// (clean_timeout setting) has expired will be removed.
//
// Resources are considered "Finished" if they do not have a current owner (active input), and
// if they have no pending updates that still need to be written to the registry file after associated
// events have been ACKed by the outputs.
// The event acquisition timestamp is used as reference to clean resources. If a resources was blocked
// for a long time, and the life time has been exhausted, then the resource will be removed immediately
// once the last event has been ACKed.
func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) {
started := time.Now()
timed.Periodic(canceler, interval, func() error {
gcStore(c.log, started, store)
return nil
})
}

// gcStore looks for resources to remove and deletes these. `gcStore` receives
// the start timestamp of the cleaner as reference. If we have entries without
// updates in the registry, that are older than `started`, we will use `started
// + ttl` to decide if an entry will be removed. This way old entries are not
// removed immediately on startup if the Beat is down for a longer period of
// time.
func gcStore(log *logp.Logger, started time.Time, store *store) {
log.Debugf("Start store cleanup")
defer log.Debugf("Done store cleanup")

states := store.ephemeralStore
states.mu.Lock()
defer states.mu.Unlock()

keys := gcFind(states.table, started, time.Now())
if len(keys) == 0 {
log.Debug("No entries to remove were found")
return
}

if err := gcClean(store, keys); err != nil {
log.Errorf("Failed to remove all entries from the registry: %+v", err)
}
}

// gcFind searches the store of resources that can be removed. A set of keys to delete is returned.
func gcFind(table map[string]*resource, started, now time.Time) map[string]struct{} {
keys := map[string]struct{}{}
for key, resource := range table {
clean := checkCleanResource(started, now, resource)
if !clean {
// do not clean the resource if it is still live or not serialized to the persistent store yet.
continue
}
keys[key] = struct{}{}
}

return keys
}

// gcClean removes key value pairs in the removeSet from the store.
// If deletion in the persistent store fails the entry is kept in memory and
// eventually cleaned up later.
func gcClean(store *store, removeSet map[string]struct{}) error {
for key := range removeSet {
if err := store.persistentStore.Remove(key); err != nil {
return err
}
delete(store.ephemeralStore.table, key)
}
return nil
}

// checkCleanResource returns true for a key-value pair is assumed to be old,
// if is not in use and there are no more pending updates that still need to be
// written to the persistent store anymore.
func checkCleanResource(started, now time.Time, resource *resource) bool {
if !resource.Finished() {
return false
}

resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()

ttl := resource.internalState.TTL
reference := resource.internalState.Updated
if started.After(reference) {
reference = started
}

return reference.Add(ttl).Before(now) && resource.stored
}
162 changes: 162 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/clean_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package input_logfile

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
)

func TestGCStore(t *testing.T) {
t.Run("empty store", func(t *testing.T) {
started := time.Now()

backend := createSampleStore(t, nil)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("state is still alive", func(t *testing.T) {
started := time.Now()
const ttl = 60 * time.Second

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl / 2),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

checkEqualStoreState(t, initState, backend.snapshot())
})

t.Run("old state can be removed", func(t *testing.T) {
const ttl = 60 * time.Second
started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("old state is not removed if cleanup is not active long enough", func(t *testing.T) {
const ttl = 60 * time.Minute
started := time.Now()

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-2 * ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

checkEqualStoreState(t, initState, backend.snapshot())
})

t.Run("old state but resource is accessed", func(t *testing.T) {
const ttl = 60 * time.Second
started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

// access resource and check it is not gc'ed
res := store.Get("test::key")
gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, initState, backend.snapshot())

// release resource and check it gets gc'ed
res.Release()
want := map[string]state{}
gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("old state but resource has pending updates", func(t *testing.T) {
const ttl = 60 * time.Second
started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

// create pending update operation
res := store.Get("test::key")
op, err := createUpdateOp(store, res, "test-state-update")
require.NoError(t, err)
res.Release()

// cleanup fails
gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, initState, backend.snapshot())

// cancel operation (no more pending operations) and try to gc again
op.done(1)
gcStore(logp.NewLogger("test"), started, store)
want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})
}
Loading

0 comments on commit 0a1294d

Please sign in to comment.