Skip to content

Commit

Permalink
Local file storage extension (#3087)
Browse files Browse the repository at this point in the history
Follows #2883 

Will resolve #2287

### Summary

This PR includes:
- A full implementation of a `file_storage` extension, which can read and write data to the local file system. Any component in the collector may make use of this extension.
- Updates to `stanza/internal` to allow stanza-based receivers to use the extension for checkpoints.
- A new testbed scenario that has the filelogreceiver using the extension

Configuration of the extension is simple. 
```yaml
  file_storage:
  file_storage/all_settings:
    directory: /var/lib/otelcol/mydir
    timeout: 2s
```

The extension is made available to component's via the `host` parameter in their `Start` method:
```go
func (r *receiver) Start(ctx context.Context, host component.Host) error {
	for _, ext := range host.GetExtensions() {
		if se, ok := ext.(storage.Extension); ok {
			client, err := se.GetClient(ctx, component.KindReceiver, r.NamedEntity)
			if err != nil {
				return err
			}
			r.storageClient = client
			return nil
		}
	}
	r.storageClient = storage.NewNopClient()
        ...
}
```
  • Loading branch information
djaglowski authored Apr 16, 2021
1 parent 4388f10 commit c95f0c2
Show file tree
Hide file tree
Showing 38 changed files with 1,396 additions and 94 deletions.
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
Expand Down Expand Up @@ -91,6 +92,7 @@ func components() (component.Factories, error) {
hostobserver.NewFactory(),
httpforwarder.NewFactory(),
k8sobserver.NewFactory(),
filestorage.NewFactory(),
}

for _, ext := range factories.Extensions {
Expand Down
3 changes: 0 additions & 3 deletions extension/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,3 @@ Set(string, []byte) error
Delete(string) error
```
Note: All methods should return error only if a problem occurred. (For example, if a file is no longer accessible, or if a remote service is unavailable.)

# TODO Sample code
- Document component expectations
1 change: 1 addition & 0 deletions extension/storage/filestorage/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
34 changes: 34 additions & 0 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# File Storage

The File Storage extension can persist state to the local file system.

The extension requires read and write access to a directory. A default directory can be used, but it must already exist in order for the extension to operate.

`directory` is the relative or absolute path to the dedicated data storage directory.

`timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances.


```
extensions:
file_storage:
file_storage/all_settings:
directory: /var/lib/otelcol/mydir
timeout: 1s
service:
extensions: [file_storage, file_storage/all_settings]
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [nop]
# Data pipeline is required to load the config.
receivers:
nop:
processors:
nop:
exporters:
nop:
```
99 changes: 99 additions & 0 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filestorage

import (
"context"
"errors"
"time"

"go.etcd.io/bbolt"
)

var defaultBucket = []byte(`default`)

type fileStorageClient struct {
db *bbolt.DB
}

func newClient(filePath string, timeout time.Duration) (*fileStorageClient, error) {
options := &bbolt.Options{
Timeout: timeout,
NoSync: true,
}
db, err := bbolt.Open(filePath, 0600, options)
if err != nil {
return nil, err
}

initBucket := func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
}
if err := db.Update(initBucket); err != nil {
return nil, err
}

return &fileStorageClient{db}, nil
}

// Get will retrieve data from storage that corresponds to the specified key
func (c *fileStorageClient) Get(_ context.Context, key string) ([]byte, error) {
var result []byte
get := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
return errors.New("storage not initialized")
}
result = bucket.Get([]byte(key))
return nil // no error
}

if err := c.db.Update(get); err != nil {
return nil, err
}
return result, nil
}

// Set will store data. The data can be retrieved using the same key
func (c *fileStorageClient) Set(_ context.Context, key string, value []byte) error {
set := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
return errors.New("storage not initialized")
}
return bucket.Put([]byte(key), value)
}

return c.db.Update(set)
}

// Delete will delete data associated with the specified key
func (c *fileStorageClient) Delete(_ context.Context, key string) error {
delete := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
return errors.New("storage not initialized")
}
return bucket.Delete([]byte(key))
}

return c.db.Update(delete)
}

// Close will close the database
func (c *fileStorageClient) close() error {
return c.db.Close()
}
194 changes: 194 additions & 0 deletions extension/storage/filestorage/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filestorage

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)

func TestClientOperations(t *testing.T) {
tempDir := newTempDir(t)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(t, err)

ctx := context.Background()
testKey := "testKey"
testValue := []byte("testValue")

// Make sure nothing is there
value, err := client.Get(ctx, testKey)
require.NoError(t, err)
require.Nil(t, value)

// Set it
err = client.Set(ctx, testKey, testValue)
require.NoError(t, err)

// Get it back out, make sure it's right
value, err = client.Get(ctx, testKey)
require.NoError(t, err)
require.Equal(t, testValue, value)

// Delete it
err = client.Delete(ctx, testKey)
require.NoError(t, err)

// Make sure it's gone
value, err = client.Get(ctx, testKey)
require.NoError(t, err)
require.Nil(t, value)
}

func TestNewClientTransactionErrors(t *testing.T) {
timeout := 100 * time.Millisecond

testKey := "testKey"
testValue := []byte("testValue")

testCases := []struct {
name string
setup func(*bbolt.Tx) error
validate func(*testing.T, *fileStorageClient)
}{
{
name: "get",
setup: func(tx *bbolt.Tx) error {
return tx.DeleteBucket(defaultBucket)
},
validate: func(t *testing.T, c *fileStorageClient) {
value, err := c.Get(context.Background(), testKey)
require.Error(t, err)
require.Equal(t, "storage not initialized", err.Error())
require.Nil(t, value)
},
},
{
name: "set",
setup: func(tx *bbolt.Tx) error {
return tx.DeleteBucket(defaultBucket)
},
validate: func(t *testing.T, c *fileStorageClient) {
err := c.Set(context.Background(), testKey, testValue)
require.Error(t, err)
require.Equal(t, "storage not initialized", err.Error())
},
},
{
name: "delete",
setup: func(tx *bbolt.Tx) error {
return tx.DeleteBucket(defaultBucket)
},
validate: func(t *testing.T, c *fileStorageClient) {
err := c.Delete(context.Background(), testKey)
require.Error(t, err)
require.Equal(t, "storage not initialized", err.Error())
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

tempDir := newTempDir(t)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, timeout)
require.NoError(t, err)

// Create a problem
client.db.Update(tc.setup)

// Validate expected behavior
tc.validate(t, client)
})
}
}

func TestNewClientErrorsOnInvalidBucket(t *testing.T) {
temp := defaultBucket
defaultBucket = nil

tempDir := newTempDir(t)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.Error(t, err)
require.Nil(t, client)

defaultBucket = temp
}

func BenchmarkClientGet(b *testing.B) {
tempDir := newTempDir(b)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(b, err)

ctx := context.Background()
testKey := "testKey"

for n := 0; n < b.N; n++ {
client.Get(ctx, testKey)
}
}

func BenchmarkClientSet(b *testing.B) {
tempDir := newTempDir(b)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(b, err)

ctx := context.Background()
testKey := "testKey"
testValue := []byte("testValue")

for n := 0; n < b.N; n++ {
client.Set(ctx, testKey, testValue)
}
}

func BenchmarkClientDelete(b *testing.B) {
tempDir := newTempDir(b)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(b, err)

ctx := context.Background()
testKey := "testKey"

for n := 0; n < b.N; n++ {
client.Delete(ctx, testKey)
}
}

func newTempDir(tb testing.TB) string {
tempDir, err := ioutil.TempDir("", "")
require.NoError(tb, err)
tb.Cleanup(func() { os.RemoveAll(tempDir) })
return tempDir
}
29 changes: 29 additions & 0 deletions extension/storage/filestorage/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filestorage

import (
"time"

"go.opentelemetry.io/collector/config"
)

// Config defines configuration for http forwarder extension.
type Config struct {
config.ExtensionSettings `mapstructure:",squash"`

Directory string `mapstructure:"directory,omitempty"`
Timeout time.Duration `mapstructure:"timeout,omitempty"`
}
Loading

0 comments on commit c95f0c2

Please sign in to comment.