Skip to content

Commit

Permalink
Add URL option for sampling strategies file
Browse files Browse the repository at this point in the history
This adds an option to support downloading sampling
strategies file from a URL in JSON format.

Fixes #2015

Signed-off-by: Deepak Sah <sah.sslpu@gmail.com>
  • Loading branch information
goku321 committed Sep 29, 2020
1 parent ecc5091 commit 932f6e0
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 18 deletions.
8 changes: 7 additions & 1 deletion plugin/sampling/strategystore/static/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
)

const (
// SamplingStrategiesFile contains the name of CLI opions for config file.
// SamplingStrategiesFile contains the name of CLI options for config file.
SamplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
// SamplingStrategiesURL contains the name of CLI option for config file URL.
SamplingStrategiesURL = "sampling.strategies-url"
)

// Options holds configuration for the static sampling strategy store.
Expand All @@ -33,6 +35,8 @@ type Options struct {
StrategiesFile string
// ReloadInterval is the time interval to check and reload sampling strategies file
ReloadInterval time.Duration
// StrategiesURL is the url for downloading sampling strategies JSON file.
StrategiesURL string
}

// AddFlags adds flags for Options
Expand All @@ -44,11 +48,13 @@ func AddFlags(flagSet *flag.FlagSet) {
// AddOTELFlags adds flags that are exposed by OTEL collector
func AddOTELFlags(flagSet *flag.FlagSet) {
flagSet.String(SamplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file")
flagSet.String(SamplingStrategiesURL, "", "The URL to download sampling strategies file in JSON format.")
}

// InitFromViper initializes Options with properties from viper
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
opts.StrategiesFile = v.GetString(SamplingStrategiesFile)
opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval)
opts.StrategiesFile = v.GetString(SamplingStrategiesURL)
return opts
}
99 changes: 86 additions & 13 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"path/filepath"
"sync/atomic"
"time"
Expand Down Expand Up @@ -55,14 +56,14 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er
}
h.storedStrategies.Store(defaultStrategies())

strategies, err := loadStrategies(options.StrategiesFile)
strategies, err := loadStrategies(options)
if err != nil {
return nil, err
}
h.parseStrategies(strategies)

if options.ReloadInterval > 0 {
go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile)
go h.autoUpdateStrategies(options.ReloadInterval, options)
}
return h, nil
}
Expand All @@ -83,32 +84,73 @@ func (h *strategyStore) Close() {
h.cancelFunc()
}

func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) {
func (h *strategyStore) autoUpdateStrategies(interval time.Duration, opts Options) {
// Default reload option.
reloadFrom := "file"
path := opts.StrategiesFile
if opts.StrategiesURL != "" {
reloadFrom = "url"
path = opts.StrategiesURL
}

lastValue := ""
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lastValue = h.reloadSamplingStrategyFile(filePath, lastValue)
lastValue = h.reloadSamplingStrategy(reloadFrom, path, lastValue)
case <-h.ctx.Done():
return
}
}
}

func (h *strategyStore) reloadSamplingStrategyFile(filePath string, lastValue string) string {
currBytes, err := ioutil.ReadFile(filepath.Clean(filePath))
if err != nil {
h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err))
return lastValue
func (h *strategyStore) reloadSamplingStrategy(reloadFrom string, path string, lastValue string) string {
var currBytes []byte
if reloadFrom == "url" {
resp, err := http.Get(path)
if err != nil {
h.logger.Error("failed to download sampling strategies", zap.String("url", path), zap.Error(err))
return lastValue
}

defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
h.logger.Error(
"failed to download sampling strategies",
zap.String("url", path),
zap.String("status", resp.Status),
)
return lastValue
}

buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
h.logger.Error(
"failed to load sampling strategies from downloaded json",
zap.String("url", path),
zap.Error(err),
)
return lastValue
}
currBytes = buf.Bytes()
} else {
var err error
currBytes, err = ioutil.ReadFile(filepath.Clean(path))
if err != nil {
h.logger.Error("failed to load sampling strategies", zap.String("file", path), zap.Error(err))
return lastValue
}
}

newValue := string(currBytes)
if lastValue == newValue {
return lastValue
}
if err = h.updateSamplingStrategy(currBytes); err != nil {
h.logger.Error("failed to update sampling strategies from file", zap.Error(err))
if err := h.updateSamplingStrategy(currBytes); err != nil {
h.logger.Error("failed to update sampling strategies", zap.String("from", reloadFrom), zap.Error(err))
return lastValue
}
return newValue
Expand All @@ -125,10 +167,19 @@ func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
}

// TODO good candidate for a global util function
func loadStrategies(strategiesFile string) (*strategies, error) {
if strategiesFile == "" {
func loadStrategies(opts Options) (*strategies, error) {
if opts.StrategiesURL != "" {
// Download Strategies from given URL.
return downloadStrategies(opts.StrategiesURL)
} else if opts.StrategiesFile != "" {
// Load strategies from given file.
return loadStrategiesFromFile(opts.StrategiesFile)
} else {
return nil, nil
}
}

func loadStrategiesFromFile(strategiesFile string) (*strategies, error) {
data, err := ioutil.ReadFile(strategiesFile) /* nolint #nosec , this comes from an admin, not user */
if err != nil {
return nil, fmt.Errorf("failed to open strategies file: %w", err)
Expand All @@ -140,6 +191,28 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
return &strategies, nil
}

func downloadStrategies(strategiesURL string) (*strategies, error) {
resp, err := http.Get(strategiesURL)
if err != nil {
return nil, fmt.Errorf("failed to download strategies file: %w", err)
}

defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf(
"receiving %s while downloading strategies file from %s",
resp.Status,
strategiesURL,
)
}

var strategies strategies
if err := json.NewDecoder(resp.Body).Decode(&strategies); err != nil {
return nil, fmt.Errorf("failed to unmarshal strategies from downloaded json: %w", err)
}
return &strategies, nil
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
Expand Down
102 changes: 98 additions & 4 deletions plugin/sampling/strategystore/static/strategy_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package static
import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
Expand All @@ -31,6 +33,30 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// Returns strategies in JSON format. Used for testing
// URL option for sampling strategies.
func mockStrategyServer() *httptest.Server {
f := func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/bad-content" {
w.Write([]byte("bad-content"))
return
}
if r.URL.Path == "/bad-status" {
w.WriteHeader(404)
return
}
data, err := ioutil.ReadFile("fixtures/strategies.json")
if err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
w.Write(data)
}
return httptest.NewServer(http.HandlerFunc(f))
}

func TestStrategyStore(t *testing.T) {
_, err := NewStrategyStore(Options{StrategiesFile: "fileNotFound.json"}, zap.NewNop())
assert.EqualError(t, err, "failed to open strategies file: open fileNotFound.json: no such file or directory")
Expand Down Expand Up @@ -62,6 +88,19 @@ func TestStrategyStore(t *testing.T) {
s, err = store.GetSamplingStrategy(context.Background(), "default")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.5), *s)

// Test downloading strategies from a URL.
mockServer := mockStrategyServer()
store, err = NewStrategyStore(Options{StrategiesURL: mockServer.URL}, logger)
require.NoError(t, err)

s, err = store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)

s, err = store.GetSamplingStrategy(context.Background(), "bar")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 5), *s)
}

func TestPerOperationSamplingStrategies(t *testing.T) {
Expand Down Expand Up @@ -276,7 +315,7 @@ func TestAutoUpdateStrategy(t *testing.T) {
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)

// verify that reloading in no-op
value := store.reloadSamplingStrategyFile(dstFile, string(srcBytes))
value := store.reloadSamplingStrategy("file", dstFile, string(srcBytes))
assert.Equal(t, string(srcBytes), value)

// update file with new probability of 0.9
Expand All @@ -293,6 +332,49 @@ func TestAutoUpdateStrategy(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)

// Test auto update strategy with URL option.
mockServer := mockStrategyServer()
ss, err = NewStrategyStore(Options{
StrategiesURL: mockServer.URL,
ReloadInterval: 10 * time.Millisecond,
}, zap.NewNop())
require.NoError(t, err)
store = ss.(*strategyStore)
defer store.Close()

// copy existing fixture content to restore it later.
srcBytes, err = ioutil.ReadFile(srcFile)
require.NoError(t, err)
originalBytes := srcBytes

// confirm baseline value
s, err = store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)

// verify that reloading in no-op
value = store.reloadSamplingStrategy("url", mockServer.URL, string(srcBytes))
assert.Equal(t, string(srcBytes), value)

// update original strategies file with new probability of 0.9
newStr = strings.Replace(string(srcBytes), "0.8", "0.9", 1)
require.NoError(t, ioutil.WriteFile(srcFile, []byte(newStr), 0644))
defer func() {
// replace original strategies file with old content.
require.NoError(t, ioutil.WriteFile(srcFile, originalBytes, 0644), "failed to restore original file content")
}()

// wait for reload timer
for i := 0; i < 1000; i++ { // wait up to 1sec
s, err = store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 {
break
}
time.Sleep(1 * time.Millisecond)
}
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)
}

func TestAutoUpdateStrategyErrors(t *testing.T) {
Expand All @@ -314,13 +396,25 @@ func TestAutoUpdateStrategyErrors(t *testing.T) {
defer store.Close()

// check invalid file path or read failure
assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name()+"bad-path", "blah"))
assert.Equal(t, "blah", store.reloadSamplingStrategy("file", tempFile.Name()+"bad-path", "blah"))
assert.Len(t, logs.FilterMessage("failed to load sampling strategies").All(), 1)

// check bad file content
require.NoError(t, ioutil.WriteFile(tempFile.Name(), []byte("bad value"), 0644))
assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name(), "blah"))
assert.Len(t, logs.FilterMessage("failed to update sampling strategies from file").All(), 1)
assert.Equal(t, "blah", store.reloadSamplingStrategy("file", tempFile.Name(), "blah"))
assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 1)

// check invalid url
assert.Equal(t, "duh", store.reloadSamplingStrategy("url", "bad-url", "duh"))
assert.Len(t, logs.FilterMessage("failed to download sampling strategies").All(), 1)

// check status code other than 200
assert.Equal(t, "duh", store.reloadSamplingStrategy("url", mockStrategyServer().URL+"/bad-status", "duh"))
assert.Len(t, logs.FilterMessage("failed to download sampling strategies").All(), 2)

// check bad content from url
assert.Equal(t, "duh", store.reloadSamplingStrategy("url", mockStrategyServer().URL+"/bad-content", "duh"))
assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 2)
}

func TestServiceNoPerOperationStrategies(t *testing.T) {
Expand Down

0 comments on commit 932f6e0

Please sign in to comment.