From 932f6e01e32fefc1591f2d676ff53e7e6b708860 Mon Sep 17 00:00:00 2001 From: Deepak Date: Tue, 29 Sep 2020 18:17:21 +0530 Subject: [PATCH] Add URL option for sampling strategies file This adds an option to support downloading sampling strategies file from a URL in JSON format. Fixes #2015 Signed-off-by: Deepak Sah --- .../sampling/strategystore/static/options.go | 8 +- .../strategystore/static/strategy_store.go | 99 ++++++++++++++--- .../static/strategy_store_test.go | 102 +++++++++++++++++- 3 files changed, 191 insertions(+), 18 deletions(-) diff --git a/plugin/sampling/strategystore/static/options.go b/plugin/sampling/strategystore/static/options.go index 0ef18fbf2ea3..a02040c663d4 100644 --- a/plugin/sampling/strategystore/static/options.go +++ b/plugin/sampling/strategystore/static/options.go @@ -22,9 +22,11 @@ import ( ) const ( - // SamplingStrategiesFile contains the name of CLI opions for config file. + // SamplingStrategiesFile contains the name of CLI options for config file. SamplingStrategiesFile = "sampling.strategies-file" samplingStrategiesReloadInterval = "sampling.strategies-reload-interval" + // SamplingStrategiesURL contains the name of CLI option for config file URL. + SamplingStrategiesURL = "sampling.strategies-url" ) // Options holds configuration for the static sampling strategy store. @@ -33,6 +35,8 @@ type Options struct { StrategiesFile string // ReloadInterval is the time interval to check and reload sampling strategies file ReloadInterval time.Duration + // StrategiesURL is the url for downloading sampling strategies JSON file. + StrategiesURL string } // AddFlags adds flags for Options @@ -44,11 +48,13 @@ func AddFlags(flagSet *flag.FlagSet) { // AddOTELFlags adds flags that are exposed by OTEL collector func AddOTELFlags(flagSet *flag.FlagSet) { flagSet.String(SamplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file") + flagSet.String(SamplingStrategiesURL, "", "The URL to download sampling strategies file in JSON format.") } // InitFromViper initializes Options with properties from viper func (opts *Options) InitFromViper(v *viper.Viper) *Options { opts.StrategiesFile = v.GetString(SamplingStrategiesFile) opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval) + opts.StrategiesFile = v.GetString(SamplingStrategiesURL) return opts } diff --git a/plugin/sampling/strategystore/static/strategy_store.go b/plugin/sampling/strategystore/static/strategy_store.go index 561e7cb91b42..4ae644a15dac 100644 --- a/plugin/sampling/strategystore/static/strategy_store.go +++ b/plugin/sampling/strategystore/static/strategy_store.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" "path/filepath" "sync/atomic" "time" @@ -55,14 +56,14 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er } h.storedStrategies.Store(defaultStrategies()) - strategies, err := loadStrategies(options.StrategiesFile) + strategies, err := loadStrategies(options) if err != nil { return nil, err } h.parseStrategies(strategies) if options.ReloadInterval > 0 { - go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile) + go h.autoUpdateStrategies(options.ReloadInterval, options) } return h, nil } @@ -83,32 +84,73 @@ func (h *strategyStore) Close() { h.cancelFunc() } -func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) { +func (h *strategyStore) autoUpdateStrategies(interval time.Duration, opts Options) { + // Default reload option. + reloadFrom := "file" + path := opts.StrategiesFile + if opts.StrategiesURL != "" { + reloadFrom = "url" + path = opts.StrategiesURL + } + lastValue := "" ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: - lastValue = h.reloadSamplingStrategyFile(filePath, lastValue) + lastValue = h.reloadSamplingStrategy(reloadFrom, path, lastValue) case <-h.ctx.Done(): return } } } -func (h *strategyStore) reloadSamplingStrategyFile(filePath string, lastValue string) string { - currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)) - if err != nil { - h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err)) - return lastValue +func (h *strategyStore) reloadSamplingStrategy(reloadFrom string, path string, lastValue string) string { + var currBytes []byte + if reloadFrom == "url" { + resp, err := http.Get(path) + if err != nil { + h.logger.Error("failed to download sampling strategies", zap.String("url", path), zap.Error(err)) + return lastValue + } + + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + h.logger.Error( + "failed to download sampling strategies", + zap.String("url", path), + zap.String("status", resp.Status), + ) + return lastValue + } + + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(resp.Body) + if err != nil { + h.logger.Error( + "failed to load sampling strategies from downloaded json", + zap.String("url", path), + zap.Error(err), + ) + return lastValue + } + currBytes = buf.Bytes() + } else { + var err error + currBytes, err = ioutil.ReadFile(filepath.Clean(path)) + if err != nil { + h.logger.Error("failed to load sampling strategies", zap.String("file", path), zap.Error(err)) + return lastValue + } } + newValue := string(currBytes) if lastValue == newValue { return lastValue } - if err = h.updateSamplingStrategy(currBytes); err != nil { - h.logger.Error("failed to update sampling strategies from file", zap.Error(err)) + if err := h.updateSamplingStrategy(currBytes); err != nil { + h.logger.Error("failed to update sampling strategies", zap.String("from", reloadFrom), zap.Error(err)) return lastValue } return newValue @@ -125,10 +167,19 @@ func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { } // TODO good candidate for a global util function -func loadStrategies(strategiesFile string) (*strategies, error) { - if strategiesFile == "" { +func loadStrategies(opts Options) (*strategies, error) { + if opts.StrategiesURL != "" { + // Download Strategies from given URL. + return downloadStrategies(opts.StrategiesURL) + } else if opts.StrategiesFile != "" { + // Load strategies from given file. + return loadStrategiesFromFile(opts.StrategiesFile) + } else { return nil, nil } +} + +func loadStrategiesFromFile(strategiesFile string) (*strategies, error) { data, err := ioutil.ReadFile(strategiesFile) /* nolint #nosec , this comes from an admin, not user */ if err != nil { return nil, fmt.Errorf("failed to open strategies file: %w", err) @@ -140,6 +191,28 @@ func loadStrategies(strategiesFile string) (*strategies, error) { return &strategies, nil } +func downloadStrategies(strategiesURL string) (*strategies, error) { + resp, err := http.Get(strategiesURL) + if err != nil { + return nil, fmt.Errorf("failed to download strategies file: %w", err) + } + + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf( + "receiving %s while downloading strategies file from %s", + resp.Status, + strategiesURL, + ) + } + + var strategies strategies + if err := json.NewDecoder(resp.Body).Decode(&strategies); err != nil { + return nil, fmt.Errorf("failed to unmarshal strategies from downloaded json: %w", err) + } + return &strategies, nil +} + func (h *strategyStore) parseStrategies(strategies *strategies) { if strategies == nil { h.logger.Info("No sampling strategies provided, using defaults") diff --git a/plugin/sampling/strategystore/static/strategy_store_test.go b/plugin/sampling/strategystore/static/strategy_store_test.go index 29a96c474c26..1752cf70cb0b 100644 --- a/plugin/sampling/strategystore/static/strategy_store_test.go +++ b/plugin/sampling/strategystore/static/strategy_store_test.go @@ -17,6 +17,8 @@ package static import ( "context" "io/ioutil" + "net/http" + "net/http/httptest" "os" "strings" "testing" @@ -31,6 +33,30 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/sampling" ) +// Returns strategies in JSON format. Used for testing +// URL option for sampling strategies. +func mockStrategyServer() *httptest.Server { + f := func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/bad-content" { + w.Write([]byte("bad-content")) + return + } + if r.URL.Path == "/bad-status" { + w.WriteHeader(404) + return + } + data, err := ioutil.ReadFile("fixtures/strategies.json") + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + w.Write(data) + } + return httptest.NewServer(http.HandlerFunc(f)) +} + func TestStrategyStore(t *testing.T) { _, err := NewStrategyStore(Options{StrategiesFile: "fileNotFound.json"}, zap.NewNop()) assert.EqualError(t, err, "failed to open strategies file: open fileNotFound.json: no such file or directory") @@ -62,6 +88,19 @@ func TestStrategyStore(t *testing.T) { s, err = store.GetSamplingStrategy(context.Background(), "default") require.NoError(t, err) assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.5), *s) + + // Test downloading strategies from a URL. + mockServer := mockStrategyServer() + store, err = NewStrategyStore(Options{StrategiesURL: mockServer.URL}, logger) + require.NoError(t, err) + + s, err = store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + s, err = store.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 5), *s) } func TestPerOperationSamplingStrategies(t *testing.T) { @@ -276,7 +315,7 @@ func TestAutoUpdateStrategy(t *testing.T) { assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) // verify that reloading in no-op - value := store.reloadSamplingStrategyFile(dstFile, string(srcBytes)) + value := store.reloadSamplingStrategy("file", dstFile, string(srcBytes)) assert.Equal(t, string(srcBytes), value) // update file with new probability of 0.9 @@ -293,6 +332,49 @@ func TestAutoUpdateStrategy(t *testing.T) { time.Sleep(1 * time.Millisecond) } assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) + + // Test auto update strategy with URL option. + mockServer := mockStrategyServer() + ss, err = NewStrategyStore(Options{ + StrategiesURL: mockServer.URL, + ReloadInterval: 10 * time.Millisecond, + }, zap.NewNop()) + require.NoError(t, err) + store = ss.(*strategyStore) + defer store.Close() + + // copy existing fixture content to restore it later. + srcBytes, err = ioutil.ReadFile(srcFile) + require.NoError(t, err) + originalBytes := srcBytes + + // confirm baseline value + s, err = store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + // verify that reloading in no-op + value = store.reloadSamplingStrategy("url", mockServer.URL, string(srcBytes)) + assert.Equal(t, string(srcBytes), value) + + // update original strategies file with new probability of 0.9 + newStr = strings.Replace(string(srcBytes), "0.8", "0.9", 1) + require.NoError(t, ioutil.WriteFile(srcFile, []byte(newStr), 0644)) + defer func() { + // replace original strategies file with old content. + require.NoError(t, ioutil.WriteFile(srcFile, originalBytes, 0644), "failed to restore original file content") + }() + + // wait for reload timer + for i := 0; i < 1000; i++ { // wait up to 1sec + s, err = store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 { + break + } + time.Sleep(1 * time.Millisecond) + } + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) } func TestAutoUpdateStrategyErrors(t *testing.T) { @@ -314,13 +396,25 @@ func TestAutoUpdateStrategyErrors(t *testing.T) { defer store.Close() // check invalid file path or read failure - assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name()+"bad-path", "blah")) + assert.Equal(t, "blah", store.reloadSamplingStrategy("file", tempFile.Name()+"bad-path", "blah")) assert.Len(t, logs.FilterMessage("failed to load sampling strategies").All(), 1) // check bad file content require.NoError(t, ioutil.WriteFile(tempFile.Name(), []byte("bad value"), 0644)) - assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name(), "blah")) - assert.Len(t, logs.FilterMessage("failed to update sampling strategies from file").All(), 1) + assert.Equal(t, "blah", store.reloadSamplingStrategy("file", tempFile.Name(), "blah")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 1) + + // check invalid url + assert.Equal(t, "duh", store.reloadSamplingStrategy("url", "bad-url", "duh")) + assert.Len(t, logs.FilterMessage("failed to download sampling strategies").All(), 1) + + // check status code other than 200 + assert.Equal(t, "duh", store.reloadSamplingStrategy("url", mockStrategyServer().URL+"/bad-status", "duh")) + assert.Len(t, logs.FilterMessage("failed to download sampling strategies").All(), 2) + + // check bad content from url + assert.Equal(t, "duh", store.reloadSamplingStrategy("url", mockStrategyServer().URL+"/bad-content", "duh")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 2) } func TestServiceNoPerOperationStrategies(t *testing.T) {