diff --git a/cmd/query/app/static_handler.go b/cmd/query/app/static_handler.go index de204f166d9..5f8e9709bd6 100644 --- a/cmd/query/app/static_handler.go +++ b/cmd/query/app/static_handler.go @@ -215,7 +215,7 @@ func loadUIConfig(uiConfig string) (map[string]interface{}, error) { return nil, nil } ext := filepath.Ext(uiConfig) - bytes, err := ioutil.ReadFile(uiConfig) /* nolint #nosec , this comes from an admin, not user */ + bytes, err := ioutil.ReadFile(filepath.Clean(uiConfig)) if err != nil { return nil, fmt.Errorf("cannot read UI config file %v: %w", uiConfig, err) } diff --git a/plugin/sampling/strategystore/static/options.go b/plugin/sampling/strategystore/static/options.go index 0ef18fbf2ea..b7222b3a5c2 100644 --- a/plugin/sampling/strategystore/static/options.go +++ b/plugin/sampling/strategystore/static/options.go @@ -22,7 +22,7 @@ import ( ) const ( - // SamplingStrategiesFile contains the name of CLI opions for config file. + // SamplingStrategiesFile contains the name of CLI option for config file. SamplingStrategiesFile = "sampling.strategies-file" samplingStrategiesReloadInterval = "sampling.strategies-reload-interval" ) diff --git a/plugin/sampling/strategystore/static/strategy_store.go b/plugin/sampling/strategystore/static/strategy_store.go index 561e7cb91b4..440ace0ff64 100644 --- a/plugin/sampling/strategystore/static/strategy_store.go +++ b/plugin/sampling/strategystore/static/strategy_store.go @@ -21,6 +21,8 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" + "net/url" "path/filepath" "sync/atomic" "time" @@ -31,6 +33,10 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/sampling" ) +// null represents "null" JSON value and +// it un-marshals to nil pointer. +var nullJSON = []byte("null") + type strategyStore struct { logger *zap.Logger @@ -45,6 +51,8 @@ type storedStrategies struct { serviceStrategies map[string]*sampling.SamplingStrategyResponse } +type strategyLoader func() ([]byte, error) + // NewStrategyStore creates a strategy store that holds static sampling strategies. func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) { ctx, cancelFunc := context.WithCancel(context.Background()) @@ -55,14 +63,20 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er } h.storedStrategies.Store(defaultStrategies()) - strategies, err := loadStrategies(options.StrategiesFile) + if options.StrategiesFile == "" { + h.parseStrategies(nil) + return h, nil + } + + loadFn := samplingStrategyLoader(options.StrategiesFile) + strategies, err := loadStrategies(loadFn) if err != nil { return nil, err } h.parseStrategies(strategies) if options.ReloadInterval > 0 { - go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile) + go h.autoUpdateStrategies(options.ReloadInterval, loadFn) } return h, nil } @@ -83,35 +97,81 @@ func (h *strategyStore) Close() { h.cancelFunc() } -func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) { - lastValue := "" +func downloadSamplingStrategies(url string) ([]byte, error) { + resp, err := http.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to download sampling strategies: %w", err) + } + + defer resp.Body.Close() + buf := new(bytes.Buffer) + if _, err = buf.ReadFrom(resp.Body); err != nil { + return nil, fmt.Errorf("failed to read sampling strategies HTTP response body: %w", err) + } + + if resp.StatusCode == http.StatusServiceUnavailable { + return nullJSON, nil + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf( + "receiving %s while downloading strategies file: %s", + resp.Status, + buf.String(), + ) + } + + return buf.Bytes(), nil +} + +func isURL(str string) bool { + u, err := url.Parse(str) + return err == nil && u.Scheme != "" && u.Host != "" +} + +func samplingStrategyLoader(strategiesFile string) strategyLoader { + if isURL(strategiesFile) { + return func() ([]byte, error) { + return downloadSamplingStrategies(strategiesFile) + } + } + + return func() ([]byte, error) { + currBytes, err := ioutil.ReadFile(filepath.Clean(strategiesFile)) + if err != nil { + return nil, fmt.Errorf("failed to read strategies file %s: %w", strategiesFile, err) + } + return currBytes, nil + } +} + +func (h *strategyStore) autoUpdateStrategies(interval time.Duration, loader strategyLoader) { + lastValue := string(nullJSON) ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: - lastValue = h.reloadSamplingStrategyFile(filePath, lastValue) + lastValue = h.reloadSamplingStrategy(loader, lastValue) case <-h.ctx.Done(): return } } } -func (h *strategyStore) reloadSamplingStrategyFile(filePath string, lastValue string) string { - currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)) +func (h *strategyStore) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string { + newValue, err := loadFn() if err != nil { - h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err)) + h.logger.Error("failed to re-load sampling strategies", zap.Error(err)) return lastValue } - newValue := string(currBytes) - if lastValue == newValue { + if lastValue == string(newValue) { return lastValue } - if err = h.updateSamplingStrategy(currBytes); err != nil { - h.logger.Error("failed to update sampling strategies from file", zap.Error(err)) + if err := h.updateSamplingStrategy(newValue); err != nil { + h.logger.Error("failed to update sampling strategies", zap.Error(err)) return lastValue } - return newValue + return string(newValue) } func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { @@ -125,24 +185,22 @@ func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { } // TODO good candidate for a global util function -func loadStrategies(strategiesFile string) (*strategies, error) { - if strategiesFile == "" { - return nil, nil - } - data, err := ioutil.ReadFile(strategiesFile) /* nolint #nosec , this comes from an admin, not user */ +func loadStrategies(loadFn strategyLoader) (*strategies, error) { + strategyBytes, err := loadFn() if err != nil { - return nil, fmt.Errorf("failed to open strategies file: %w", err) + return nil, err } - var strategies strategies - if err := json.Unmarshal(data, &strategies); err != nil { + + var strategies *strategies + if err := json.Unmarshal(strategyBytes, &strategies); err != nil { return nil, fmt.Errorf("failed to unmarshal strategies: %w", err) } - return &strategies, nil + return strategies, nil } func (h *strategyStore) parseStrategies(strategies *strategies) { if strategies == nil { - h.logger.Info("No sampling strategies provided, using defaults") + h.logger.Info("No sampling strategies provided or URL is unavailable, using defaults") return } newStore := defaultStrategies() diff --git a/plugin/sampling/strategystore/static/strategy_store_test.go b/plugin/sampling/strategystore/static/strategy_store_test.go index 29a96c474c2..5fb9b01170d 100644 --- a/plugin/sampling/strategystore/static/strategy_store_test.go +++ b/plugin/sampling/strategystore/static/strategy_store_test.go @@ -16,7 +16,10 @@ package static import ( "context" + "fmt" "io/ioutil" + "net/http" + "net/http/httptest" "os" "strings" "testing" @@ -24,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -31,9 +35,64 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/sampling" ) -func TestStrategyStore(t *testing.T) { +// strategiesJSON returns the strategy with +// a given probability. +func strategiesJSON(probability float32) string { + strategy := fmt.Sprintf(` + { + "default_strategy": { + "type": "probabilistic", + "param": 0.5 + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": %.1f + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 5 + } + ] + } + `, + probability, + ) + return strategy +} + +// Returns strategies in JSON format. Used for testing +// URL option for sampling strategies. +func mockStrategyServer() (*httptest.Server, *atomic.String) { + strategy := atomic.NewString(strategiesJSON(0.8)) + f := func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/bad-content": + w.Write([]byte("bad-content")) + return + + case "/bad-status": + w.WriteHeader(404) + return + + case "/service-unavailable": + w.WriteHeader(503) + return + + default: + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(strategy.Load())) + } + } + return httptest.NewServer(http.HandlerFunc(f)), strategy +} + +func TestStrategyStoreWithFile(t *testing.T) { _, err := NewStrategyStore(Options{StrategiesFile: "fileNotFound.json"}, zap.NewNop()) - assert.EqualError(t, err, "failed to open strategies file: open fileNotFound.json: no such file or directory") + assert.Contains(t, err.Error(), "failed to read strategies file fileNotFound.json") _, err = NewStrategyStore(Options{StrategiesFile: "fixtures/bad_strategies.json"}, zap.NewNop()) assert.EqualError(t, err, @@ -43,7 +102,7 @@ func TestStrategyStore(t *testing.T) { logger, buf := testutils.NewLogger() store, err := NewStrategyStore(Options{}, logger) require.NoError(t, err) - assert.Contains(t, buf.String(), "No sampling strategies provided, using defaults") + assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults") s, err := store.GetSamplingStrategy(context.Background(), "foo") require.NoError(t, err) assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.001), *s) @@ -64,6 +123,30 @@ func TestStrategyStore(t *testing.T) { assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.5), *s) } +func TestStrategyStoreWithURL(t *testing.T) { + // Test default strategy when URL is temporarily unavailable. + logger, buf := testutils.NewLogger() + mockServer, _ := mockStrategyServer() + store, err := NewStrategyStore(Options{StrategiesFile: mockServer.URL + "/service-unavailable"}, logger) + require.NoError(t, err) + assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults") + s, err := store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.001), *s) + + // Test downloading strategies from a URL. + store, err = NewStrategyStore(Options{StrategiesFile: mockServer.URL}, logger) + require.NoError(t, err) + + s, err = store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + s, err = store.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 5), *s) +} + func TestPerOperationSamplingStrategies(t *testing.T) { logger, buf := testutils.NewLogger() store, err := NewStrategyStore(Options{StrategiesFile: "fixtures/operation_strategies.json"}, logger) @@ -249,7 +332,7 @@ func TestDeepCopy(t *testing.T) { assert.EqualValues(t, cp, s) } -func TestAutoUpdateStrategy(t *testing.T) { +func TestAutoUpdateStrategyWithFile(t *testing.T) { tempFile, _ := ioutil.TempFile("", "for_go_test_*.json") require.NoError(t, tempFile.Close()) defer func() { @@ -276,7 +359,7 @@ func TestAutoUpdateStrategy(t *testing.T) { assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) // verify that reloading in no-op - value := store.reloadSamplingStrategyFile(dstFile, string(srcBytes)) + value := store.reloadSamplingStrategy(samplingStrategyLoader(dstFile), string(srcBytes)) assert.Equal(t, string(srcBytes), value) // update file with new probability of 0.9 @@ -295,6 +378,40 @@ func TestAutoUpdateStrategy(t *testing.T) { assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) } +func TestAutoUpdateStrategyWithURL(t *testing.T) { + mockServer, mockStrategy := mockStrategyServer() + ss, err := NewStrategyStore(Options{ + StrategiesFile: mockServer.URL, + ReloadInterval: 10 * time.Millisecond, + }, zap.NewNop()) + require.NoError(t, err) + store := ss.(*strategyStore) + defer store.Close() + + // confirm baseline value + s, err := store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + // verify that reloading in no-op + value := store.reloadSamplingStrategy(samplingStrategyLoader(mockServer.URL), mockStrategy.Load()) + assert.Equal(t, mockStrategy.Load(), value) + + // update original strategies with new probability of 0.9 + mockStrategy.Store(strategiesJSON(0.9)) + + // wait for reload timer + for i := 0; i < 1000; i++ { // wait up to 1sec + s, err = store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 { + break + } + time.Sleep(1 * time.Millisecond) + } + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) +} + func TestAutoUpdateStrategyErrors(t *testing.T) { tempFile, _ := ioutil.TempFile("", "for_go_test_*.json") require.NoError(t, tempFile.Close()) @@ -314,13 +431,26 @@ func TestAutoUpdateStrategyErrors(t *testing.T) { defer store.Close() // check invalid file path or read failure - assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name()+"bad-path", "blah")) - assert.Len(t, logs.FilterMessage("failed to load sampling strategies").All(), 1) + assert.Equal(t, "blah", store.reloadSamplingStrategy(samplingStrategyLoader(tempFile.Name()+"bad-path"), "blah")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 1) // check bad file content require.NoError(t, ioutil.WriteFile(tempFile.Name(), []byte("bad value"), 0644)) - assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name(), "blah")) - assert.Len(t, logs.FilterMessage("failed to update sampling strategies from file").All(), 1) + assert.Equal(t, "blah", store.reloadSamplingStrategy(samplingStrategyLoader(tempFile.Name()), "blah")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 1) + + // check invalid url + assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader("bad-url"), "duh")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 2) + + // check status code other than 200 + mockServer, _ := mockStrategyServer() + assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader(mockServer.URL+"/bad-status"), "duh")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 3) + + // check bad content from url + assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader(mockServer.URL+"/bad-content"), "duh")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 2) } func TestServiceNoPerOperationStrategies(t *testing.T) { @@ -337,3 +467,22 @@ func TestServiceNoPerOperationStrategies(t *testing.T) { expected := makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 3) assert.Equal(t, *expected.RateLimitingSampling, *s.RateLimitingSampling) } + +func TestSamplingStrategyLoader(t *testing.T) { + // invalid file path + loader := samplingStrategyLoader("not-exists") + _, err := loader() + assert.Contains(t, err.Error(), "failed to read strategies file not-exists") + + // status code other than 200 + mockServer, _ := mockStrategyServer() + loader = samplingStrategyLoader(mockServer.URL + "/bad-status") + _, err = loader() + assert.Contains(t, err.Error(), "receiving 404 Not Found while downloading strategies file") + + // should download content from URL + loader = samplingStrategyLoader(mockServer.URL + "/bad-content") + content, err := loader() + require.NoError(t, err) + assert.Equal(t, "bad-content", string(content)) +}