-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add URL option for sampling strategies file #2519
Changes from 1 commit
270e96d
b50abef
964129a
f517c75
2593ee2
535b4b7
8e1719f
6673bdf
c95250b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -21,7 +21,8 @@ import ( | |||||||
"encoding/json" | ||||||||
"fmt" | ||||||||
"io/ioutil" | ||||||||
"path/filepath" | ||||||||
"net/http" | ||||||||
"net/url" | ||||||||
"sync/atomic" | ||||||||
"time" | ||||||||
|
||||||||
|
@@ -31,6 +32,9 @@ import ( | |||||||
"github.com/jaegertracing/jaeger/thrift-gen/sampling" | ||||||||
) | ||||||||
|
||||||||
// null represents "null" JSON value. | ||||||||
const null = "null" | ||||||||
|
||||||||
type strategyStore struct { | ||||||||
logger *zap.Logger | ||||||||
|
||||||||
|
@@ -45,6 +49,8 @@ type storedStrategies struct { | |||||||
serviceStrategies map[string]*sampling.SamplingStrategyResponse | ||||||||
} | ||||||||
|
||||||||
type strategyLoader func() ([]byte, error) | ||||||||
|
||||||||
// NewStrategyStore creates a strategy store that holds static sampling strategies. | ||||||||
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) { | ||||||||
ctx, cancelFunc := context.WithCancel(context.Background()) | ||||||||
|
@@ -55,14 +61,15 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er | |||||||
} | ||||||||
h.storedStrategies.Store(defaultStrategies()) | ||||||||
|
||||||||
strategies, err := loadStrategies(options.StrategiesFile) | ||||||||
loadFn := samplingStrategyLoader(options.StrategiesFile) | ||||||||
strategies, err := loadStrategies(loadFn) | ||||||||
if err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
h.parseStrategies(strategies) | ||||||||
|
||||||||
if options.ReloadInterval > 0 { | ||||||||
go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile) | ||||||||
go h.autoUpdateStrategies(options.ReloadInterval, loadFn) | ||||||||
} | ||||||||
return h, nil | ||||||||
} | ||||||||
|
@@ -83,35 +90,86 @@ func (h *strategyStore) Close() { | |||||||
h.cancelFunc() | ||||||||
} | ||||||||
|
||||||||
func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) { | ||||||||
lastValue := "" | ||||||||
func downloadSamplingStrategies(url string) ([]byte, error) { | ||||||||
resp, err := http.Get(url) | ||||||||
if err != nil { | ||||||||
return nil, fmt.Errorf("failed to download sampling strategies: %w", err) | ||||||||
} | ||||||||
|
||||||||
defer resp.Body.Close() | ||||||||
if resp.StatusCode != http.StatusOK { | ||||||||
if resp.StatusCode == http.StatusServiceUnavailable { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this does not need to be nested inside if != ok, you can do this check first, then if != OK. |
||||||||
return []byte("null"), nil | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
} | ||||||||
return nil, fmt.Errorf( | ||||||||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
"receiving %s while downloading strategies file", | ||||||||
resp.Status, | ||||||||
) | ||||||||
} | ||||||||
|
||||||||
buf := new(bytes.Buffer) | ||||||||
if _, err = buf.ReadFrom(resp.Body); err != nil { | ||||||||
return nil, fmt.Errorf("failed to read sampling strategies from downloaded JSON: %w", err) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
} | ||||||||
return buf.Bytes(), nil | ||||||||
} | ||||||||
|
||||||||
func isURL(str string) bool { | ||||||||
u, err := url.Parse(str) | ||||||||
return err == nil && u.Scheme != "" && u.Host != "" | ||||||||
} | ||||||||
|
||||||||
func samplingStrategyLoader(strategiesFile string) strategyLoader { | ||||||||
if strategiesFile == "" { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be an error upon instantiation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't quite understand this one. Can you elaborate it further? 😅 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a constructor function NewStrategyStore. This struct cannot do anything useful if the strategiesFile is empty, correct? So we could just return an error from there if the param is empty. If that's not sufficient (e.g. because it still needs to return the default strategy), then we can still do that in the constructor and just not execute all the loading/reloading functions if the file name is blank. This way we don't have to keep checking if the file path is empty or not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get it. Thanks for the explanation. |
||||||||
return func() ([]byte, error) { | ||||||||
// Using null so that it un-marshals to nil pointer. | ||||||||
return []byte(null), nil | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this comment should be on the constant
Suggested change
|
||||||||
} | ||||||||
} | ||||||||
|
||||||||
if isURL(strategiesFile) { | ||||||||
return func() ([]byte, error) { | ||||||||
return downloadSamplingStrategies(strategiesFile) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
return func() ([]byte, error) { | ||||||||
currBytes, err := ioutil.ReadFile(strategiesFile) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you may get go-sec linter error here, wrap the file name:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While you're at it, can you fix this line too? I.e. remove
|
||||||||
if err != nil { | ||||||||
return nil, fmt.Errorf("failed to open strategies file: %w", err) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
} | ||||||||
return currBytes, nil | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
func (h *strategyStore) autoUpdateStrategies(interval time.Duration, loader strategyLoader) { | ||||||||
lastValue := null | ||||||||
ticker := time.NewTicker(interval) | ||||||||
defer ticker.Stop() | ||||||||
for { | ||||||||
select { | ||||||||
case <-ticker.C: | ||||||||
lastValue = h.reloadSamplingStrategyFile(filePath, lastValue) | ||||||||
lastValue = h.reloadSamplingStrategy(loader, lastValue) | ||||||||
case <-h.ctx.Done(): | ||||||||
return | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
func (h *strategyStore) reloadSamplingStrategyFile(filePath string, lastValue string) string { | ||||||||
currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)) | ||||||||
func (h *strategyStore) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string { | ||||||||
newValue, err := loadFn() | ||||||||
if err != nil { | ||||||||
h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err)) | ||||||||
h.logger.Error("failed to re-load sampling strategies", zap.Error(err)) | ||||||||
return lastValue | ||||||||
} | ||||||||
newValue := string(currBytes) | ||||||||
if lastValue == newValue { | ||||||||
if lastValue == string(newValue) { | ||||||||
return lastValue | ||||||||
} | ||||||||
if err = h.updateSamplingStrategy(currBytes); err != nil { | ||||||||
h.logger.Error("failed to update sampling strategies from file", zap.Error(err)) | ||||||||
if err := h.updateSamplingStrategy(newValue); err != nil { | ||||||||
h.logger.Error("failed to update sampling strategies", zap.Error(err)) | ||||||||
return lastValue | ||||||||
} | ||||||||
return newValue | ||||||||
return string(newValue) | ||||||||
} | ||||||||
|
||||||||
func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { | ||||||||
|
@@ -125,24 +183,22 @@ func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { | |||||||
} | ||||||||
|
||||||||
// TODO good candidate for a global util function | ||||||||
func loadStrategies(strategiesFile string) (*strategies, error) { | ||||||||
if strategiesFile == "" { | ||||||||
return nil, nil | ||||||||
} | ||||||||
data, err := ioutil.ReadFile(strategiesFile) /* nolint #nosec , this comes from an admin, not user */ | ||||||||
func loadStrategies(loadFn strategyLoader) (*strategies, error) { | ||||||||
strategyBytes, err := loadFn() | ||||||||
if err != nil { | ||||||||
return nil, fmt.Errorf("failed to open strategies file: %w", err) | ||||||||
return nil, err | ||||||||
} | ||||||||
var strategies strategies | ||||||||
if err := json.Unmarshal(data, &strategies); err != nil { | ||||||||
|
||||||||
var strategies *strategies | ||||||||
if err := json.Unmarshal(strategyBytes, &strategies); err != nil { | ||||||||
return nil, fmt.Errorf("failed to unmarshal strategies: %w", err) | ||||||||
} | ||||||||
return &strategies, nil | ||||||||
return strategies, nil | ||||||||
} | ||||||||
|
||||||||
func (h *strategyStore) parseStrategies(strategies *strategies) { | ||||||||
if strategies == nil { | ||||||||
h.logger.Info("No sampling strategies provided, using defaults") | ||||||||
h.logger.Info("No sampling strategies provided or URL is unavailable, using defaults") | ||||||||
return | ||||||||
} | ||||||||
newStore := defaultStrategies() | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ package static | |
import ( | ||
"context" | ||
"io/ioutil" | ||
"net/http" | ||
"net/http/httptest" | ||
"os" | ||
"strings" | ||
"testing" | ||
|
@@ -31,6 +33,37 @@ import ( | |
"github.com/jaegertracing/jaeger/thrift-gen/sampling" | ||
) | ||
|
||
// Returns strategies in JSON format. Used for testing | ||
// URL option for sampling strategies. | ||
func mockStrategyServer() *httptest.Server { | ||
f := func(w http.ResponseWriter, r *http.Request) { | ||
switch r.URL.Path { | ||
case "/bad-content": | ||
w.Write([]byte("bad-content")) | ||
return | ||
|
||
case "/bad-status": | ||
w.WriteHeader(404) | ||
return | ||
|
||
case "/service-unavailable": | ||
w.WriteHeader(503) | ||
return | ||
|
||
default: | ||
data, err := ioutil.ReadFile("fixtures/strategies.json") | ||
if err != nil { | ||
w.WriteHeader(500) | ||
return | ||
} | ||
w.WriteHeader(200) | ||
w.Header().Set("Content-Type", "application/json") | ||
w.Write(data) | ||
} | ||
} | ||
return httptest.NewServer(http.HandlerFunc(f)) | ||
} | ||
|
||
func TestStrategyStore(t *testing.T) { | ||
_, err := NewStrategyStore(Options{StrategiesFile: "fileNotFound.json"}, zap.NewNop()) | ||
assert.EqualError(t, err, "failed to open strategies file: open fileNotFound.json: no such file or directory") | ||
|
@@ -43,7 +76,7 @@ func TestStrategyStore(t *testing.T) { | |
logger, buf := testutils.NewLogger() | ||
store, err := NewStrategyStore(Options{}, logger) | ||
require.NoError(t, err) | ||
assert.Contains(t, buf.String(), "No sampling strategies provided, using defaults") | ||
assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults") | ||
s, err := store.GetSamplingStrategy(context.Background(), "foo") | ||
require.NoError(t, err) | ||
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.001), *s) | ||
|
@@ -62,6 +95,26 @@ func TestStrategyStore(t *testing.T) { | |
s, err = store.GetSamplingStrategy(context.Background(), "default") | ||
require.NoError(t, err) | ||
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.5), *s) | ||
|
||
// Test default strategy when URL is temporarily unavailable. | ||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
mockServer := mockStrategyServer() | ||
store, err = NewStrategyStore(Options{StrategiesFile: mockServer.URL+"/service-unavailable"}, logger) | ||
assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults") | ||
s, err = store.GetSamplingStrategy(context.Background(), "foo") | ||
require.NoError(t, err) | ||
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.001), *s) | ||
|
||
// Test downloading strategies from a URL. | ||
store, err = NewStrategyStore(Options{StrategiesFile: mockServer.URL}, logger) | ||
require.NoError(t, err) | ||
|
||
s, err = store.GetSamplingStrategy(context.Background(), "foo") | ||
require.NoError(t, err) | ||
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) | ||
|
||
s, err = store.GetSamplingStrategy(context.Background(), "bar") | ||
require.NoError(t, err) | ||
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 5), *s) | ||
} | ||
|
||
func TestPerOperationSamplingStrategies(t *testing.T) { | ||
|
@@ -276,7 +329,7 @@ func TestAutoUpdateStrategy(t *testing.T) { | |
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) | ||
|
||
// verify that reloading in no-op | ||
value := store.reloadSamplingStrategyFile(dstFile, string(srcBytes)) | ||
value := store.reloadSamplingStrategy(samplingStrategyLoader(dstFile), string(srcBytes)) | ||
assert.Equal(t, string(srcBytes), value) | ||
|
||
// update file with new probability of 0.9 | ||
|
@@ -293,6 +346,49 @@ func TestAutoUpdateStrategy(t *testing.T) { | |
time.Sleep(1 * time.Millisecond) | ||
} | ||
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) | ||
|
||
// Test auto update strategy with URL option. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like a pretty separate testing scenario, any reason why it cannot be moved into a new test function with appropriate name? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it makes sense to move it to a separate test function. |
||
mockServer := mockStrategyServer() | ||
ss, err = NewStrategyStore(Options{ | ||
StrategiesFile: mockServer.URL, | ||
ReloadInterval: 10 * time.Millisecond, | ||
}, zap.NewNop()) | ||
require.NoError(t, err) | ||
store = ss.(*strategyStore) | ||
defer store.Close() | ||
|
||
// copy existing fixture content to restore it later. | ||
srcBytes, err = ioutil.ReadFile(srcFile) | ||
require.NoError(t, err) | ||
originalBytes := srcBytes | ||
|
||
// confirm baseline value | ||
s, err = store.GetSamplingStrategy(context.Background(), "foo") | ||
require.NoError(t, err) | ||
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) | ||
|
||
// verify that reloading in no-op | ||
value = store.reloadSamplingStrategy(samplingStrategyLoader(mockServer.URL), string(srcBytes)) | ||
assert.Equal(t, string(srcBytes), value) | ||
|
||
// update original strategies file with new probability of 0.9 | ||
newStr = strings.Replace(string(srcBytes), "0.8", "0.9", 1) | ||
require.NoError(t, ioutil.WriteFile(srcFile, []byte(newStr), 0644)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it's a good idea for the test to override the fixture file. In L316 it is already copied to a temp file that can be overwritten. Also, if you're testing with a mock HTTP server, why use file in the first place? You can simply replace the content returned by the mock server since you can control it. |
||
defer func() { | ||
// replace original strategies file with old content. | ||
require.NoError(t, ioutil.WriteFile(srcFile, originalBytes, 0644), "failed to restore original file content") | ||
}() | ||
|
||
// wait for reload timer | ||
for i := 0; i < 1000; i++ { // wait up to 1sec | ||
s, err = store.GetSamplingStrategy(context.Background(), "foo") | ||
require.NoError(t, err) | ||
if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 { | ||
break | ||
} | ||
time.Sleep(1 * time.Millisecond) | ||
} | ||
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) | ||
} | ||
|
||
func TestAutoUpdateStrategyErrors(t *testing.T) { | ||
|
@@ -314,13 +410,25 @@ func TestAutoUpdateStrategyErrors(t *testing.T) { | |
defer store.Close() | ||
|
||
// check invalid file path or read failure | ||
assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name()+"bad-path", "blah")) | ||
assert.Len(t, logs.FilterMessage("failed to load sampling strategies").All(), 1) | ||
assert.Equal(t, "blah", store.reloadSamplingStrategy(samplingStrategyLoader(tempFile.Name()+"bad-path"), "blah")) | ||
assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 1) | ||
|
||
// check bad file content | ||
require.NoError(t, ioutil.WriteFile(tempFile.Name(), []byte("bad value"), 0644)) | ||
assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name(), "blah")) | ||
assert.Len(t, logs.FilterMessage("failed to update sampling strategies from file").All(), 1) | ||
assert.Equal(t, "blah", store.reloadSamplingStrategy(samplingStrategyLoader(tempFile.Name()), "blah")) | ||
assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 1) | ||
|
||
// check invalid url | ||
assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader("bad-url"), "duh")) | ||
assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 2) | ||
|
||
// check status code other than 200 | ||
assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader(mockStrategyServer().URL+"/bad-status"), "duh")) | ||
assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 3) | ||
|
||
// check bad content from url | ||
assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader(mockStrategyServer().URL+"/bad-content"), "duh")) | ||
assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 2) | ||
} | ||
|
||
func TestServiceNoPerOperationStrategies(t *testing.T) { | ||
|
@@ -337,3 +445,21 @@ func TestServiceNoPerOperationStrategies(t *testing.T) { | |
expected := makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 3) | ||
assert.Equal(t, *expected.RateLimitingSampling, *s.RateLimitingSampling) | ||
} | ||
|
||
func TestSamplingStrategyLoader(t *testing.T) { | ||
// invalid file path | ||
loader := samplingStrategyLoader("not-exists") | ||
_, err := loader() | ||
assert.Contains(t, err.Error(), "failed to open strategies file") | ||
|
||
// status code other than 200 | ||
mockServer := mockStrategyServer() | ||
loader = samplingStrategyLoader(mockServer.URL + "/bad-status") | ||
_, err = loader() | ||
assert.Contains(t, err.Error(), "receiving 404 Not Found while downloading strategies file") | ||
|
||
// should download content from URL | ||
loader = samplingStrategyLoader(mockServer.URL + "/bad-content") | ||
content, err := loader() | ||
assert.Equal(t, "bad-content", string(content)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was rather thinking this: