Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow GCS config for setting attributes on new objects #1368

Merged
merged 13 commits into from
Apr 12, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ingesters during the rollout. [#1227](https://github.com/grafana/tempo/pull/1227) (@joe-elliott)
* [FEATURE] Added metrics-generator: an optional components to generate metrics from ingested traces [#1282](https://github.com/grafana/tempo/pull/1282) (@mapno, @kvrhdn)
* [FEATURE] Allow the compaction cycle to be configurable with a default of 30 seconds [#1335](https://github.com/grafana/tempo/pull/1335) (@willdot)
* [FEATURE] Add new config options for setting GCS metadata on new objects [](https://github.com/grafana/tempo/pull/1368) (@zalegrala)
* [ENHANCEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn)
* [ENHANCEMENT] Add new scaling alerts to the tempo-mixin [#1292](https://github.com/grafana/tempo/pull/1292) (@mapno)
* [ENHANCEMENT] Improve serverless handler error messages [#1305](https://github.com/grafana/tempo/pull/1305) (@joe-elliott)
Expand Down
9 changes: 5 additions & 4 deletions cmd/tempo-serverless/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
v2 "github.com/grafana/tempo/tempodb/encoding/v2"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/spf13/viper"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -164,11 +165,11 @@ func loadConfig() (*tempodb.Config, error) {
v := viper.NewWithOptions()
b, err := yaml.Marshal(defaultConfig)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to marshal default config")
}
v.SetConfigType("yaml")
if err := v.MergeConfig(bytes.NewReader(b)); err != nil {
return nil, err
if err = v.MergeConfig(bytes.NewReader(b)); err != nil {
return nil, errors.Wrap(err, "failed to merge config")
}

v.AutomaticEnv()
Expand All @@ -178,7 +179,7 @@ func loadConfig() (*tempodb.Config, error) {
cfg := &tempodb.Config{}
err = v.Unmarshal(cfg, setTagName, setDecodeHooks)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to unmarshal config")
}

return cfg, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-serverless/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestLoadConfig(t *testing.T) {
// purposefully not using testfiy to reduce dependencies and keep the serverless packages small
cfg, err := loadConfig()
if err != nil {
t.Error("failed to load config", err)
t.Error("failed to load config:", err)
return
}
if cfg.Backend != "gcs" {
Expand Down
17 changes: 15 additions & 2 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ storage:

# Buffer size for reads. Default is 10MB
# Example: "chunk_buffer_size: 5_000_000"
[chunk_buffer_size: <int>]
[chunk_buffer_size: <int>]

# Optional
# Api endpoint override
Expand All @@ -386,7 +386,7 @@ storage:
# Optional. Default is false.
# Example: "insecure: true"
# Set to true to enable authentication and certificate checks on gcs requests
[insecure: <bool>]
[insecure: <bool>]

# Optional. Default is 0 (disabled)
# Example: "hedge_requests_at: 500ms"
Expand All @@ -400,6 +400,19 @@ storage:
# The maximum number of requests to execute when hedging. Requires hedge_requests_at to be set.
[hedge_requests_up_to: <int>]

# Optional
# Example: "object_cache_control: "no-cache""
# A string to specify the behavior with respect to caching of the objects stored in GCS.
# See the GCS documentation for more detail: https://cloud.google.com/storage/docs/metadata
[object_cache_control: <string>]

# Optional
# Example: "object_metadata: {'key': 'value'}"
# A map key value strings for user metadata to store on the GCS objects.
# See the GCS documentation for more detail: https://cloud.google.com/storage/docs/metadata
[object_metadata: <map[string]string>]


# S3 configuration. Will be used only if value of backend is "s3"
# Check the S3 doc within this folder for information on s3 specific permissions.
s3:
Expand Down
5 changes: 4 additions & 1 deletion integration/e2e/config-all-in-one-gcs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ storage:
bucket_name: tempo
endpoint: https://tempo_e2e-gcs:4443/storage/v1/
insecure: true
object_cache_control: "no-cache"
object_metadata:
testing: "yes"
pool:
max_workers: 10
queue_depth: 1000

overrides:
max_search_bytes_per_trace: 50_000
max_search_bytes_per_trace: 50_000
18 changes: 11 additions & 7 deletions tempodb/backend/gcs/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package gcs

import "time"
import (
"time"
)

type Config struct {
BucketName string `yaml:"bucket_name"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
Endpoint string `yaml:"endpoint"`
Insecure bool `yaml:"insecure"`
HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"`
HedgeRequestsUpTo int `yaml:"hedge_requests_up_to"`
BucketName string `yaml:"bucket_name"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
Endpoint string `yaml:"endpoint"`
HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"`
HedgeRequestsUpTo int `yaml:"hedge_requests_up_to"`
Insecure bool `yaml:"insecure"`
ObjectCacheControl string `yaml:"object_cache_control"`
ObjectMetadata map[string]string `yaml:"object_metadata"`
}
19 changes: 15 additions & 4 deletions tempodb/backend/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"cloud.google.com/go/storage"
"github.com/cristalhq/hedgedhttp"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
Expand All @@ -35,7 +35,7 @@ func NewNoConfirm(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Co
return internalNew(cfg, false)
}

// New gets the S3 backend
// New gets the GCS backend
func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, error) {
return internalNew(cfg, true)
}
Expand Down Expand Up @@ -72,10 +72,11 @@ func internalNew(cfg *Config, confirm bool) (backend.RawReader, backend.RawWrite
// StreamWriter implements backend.Writer
func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
w := rw.writer(ctx, backend.ObjectFileName(keypath, name))

_, err := io.Copy(w, data)
if err != nil {
w.Close()
return err
return errors.Wrap(err, "failed to write")
}

return w.Close()
Expand Down Expand Up @@ -168,8 +169,18 @@ func (rw *readerWriter) Shutdown() {
}

func (rw *readerWriter) writer(ctx context.Context, name string) *storage.Writer {
w := rw.bucket.Object(name).NewWriter(ctx)
o := rw.bucket.Object(name)
w := o.NewWriter(ctx)
w.ChunkSize = rw.cfg.ChunkBufferSize

if rw.cfg.ObjectMetadata != nil {
w.Metadata = rw.cfg.ObjectMetadata
}

if rw.cfg.ObjectCacheControl != "" {
w.CacheControl = rw.cfg.ObjectCacheControl
}

return w
}

Expand Down
97 changes: 90 additions & 7 deletions tempodb/backend/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ package gcs
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
Expand All @@ -14,6 +19,7 @@ import (
"github.com/grafana/tempo/tempodb/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
raw "google.golang.org/api/storage/v1"
)

func TestHedge(t *testing.T) {
Expand Down Expand Up @@ -85,6 +91,57 @@ func TestHedge(t *testing.T) {
}
}

func TestReadError(t *testing.T) {
errA := storage.ErrObjectNotExist
errB := readError(errA)
assert.Equal(t, backend.ErrDoesNotExist, errB)

wups := fmt.Errorf("wups")
errB = readError(wups)
assert.Equal(t, wups, errB)
}

func TestObjectConfigAttributes(t *testing.T) {
tests := []struct {
name string
cacheControl string
metadata map[string]string
expectedObject raw.Object
}{
{
name: "cache controle enabled",
cacheControl: "no-cache",
expectedObject: raw.Object{Name: "test/object", Bucket: "blerg2", CacheControl: "no-cache"},
},
{
name: "medata set",
metadata: map[string]string{"one": "1"},
expectedObject: raw.Object{Name: "test/object", Bucket: "blerg2", Metadata: map[string]string{"one": "1"}},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
rawObject := raw.Object{}
server := fakeServerWithObjectAttributes(t, &rawObject)

_, w, _, err := New(&Config{
BucketName: "blerg2",
Endpoint: server.URL,
Insecure: true,
ObjectCacheControl: tc.cacheControl,
ObjectMetadata: tc.metadata,
})
require.NoError(t, err)

ctx := context.Background()

_ = w.Write(ctx, "object", []string{"test"}, bytes.NewReader([]byte{}), 0, false)
assert.Equal(t, tc.expectedObject, rawObject)
})
}
}

func fakeServer(t *testing.T, returnIn time.Duration, counter *int32) *httptest.Server {
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(returnIn)
Expand All @@ -98,12 +155,38 @@ func fakeServer(t *testing.T, returnIn time.Duration, counter *int32) *httptest.
return server
}

func TestReadError(t *testing.T) {
errA := storage.ErrObjectNotExist
errB := readError(errA)
assert.Equal(t, backend.ErrDoesNotExist, errB)
func fakeServerWithObjectAttributes(t *testing.T, o *raw.Object) *httptest.Server {
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

wups := fmt.Errorf("wups")
errB = readError(wups)
assert.Equal(t, wups, errB)
// Check that we are making the call to update the attributes before attempting to decode the request body.
if strings.HasPrefix(r.RequestURI, "/upload/storage/v1/b/blerg2") {

_, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
require.NoError(t, err)

reader := multipart.NewReader(r.Body, params["boundary"])
defer r.Body.Close()

for {
part, err := reader.NextPart()
if err == io.EOF {
break
}
require.NoError(t, err)
defer part.Close()

switch part.Header.Get("Content-Type") {
case "application/json":
err = json.NewDecoder(r.Body).Decode(&o)
require.NoError(t, err)
}
}
}

_, _ = w.Write([]byte(`{}`))
}))
server.StartTLS()
t.Cleanup(server.Close)

return server
}