Skip to content

Commit

Permalink
Add URL option for sampling strategies file (#2519)
Browse files Browse the repository at this point in the history
* Add url option for sampling strategies

This change will let user to provide a
URL to download sampling strategies. Default
strategy is the fallback option if the URL is
temporarily unavailable.

Signed-off-by: Deepak <sah.sslpu@gmail.com>

* Update NewStrategyStore constructor

This updates the logic for NewStrategyStore
constructor to skip any loading/reloading logic
to execute when StrategiesFile is empty.

Signed-off-by: Deepak <sah.sslpu@gmail.com>

* Refactor strategy store tests

This refactors the strategy store tests, moving tests
with URL to separate test functions.

Signed-off-by: Deepak <sah.sslpu@gmail.com>

* Use strings.Replace to update strategies

Signed-off-by: Deepak <sah.sslpu@gmail.com>

* Change strategiesJSON to a function

This updates the mockStrategyServer to return
a mock strategy along with a mock server. The
returned strategy can be used to update it externally.
Also, changed package-level var strategiesJSON to
a function to return strategies JSON with a given probability.

Signed-off-by: Deepak <sah.sslpu@gmail.com>

* Fix linter failures

Signed-off-by: Deepak <sah.sslpu@gmail.com>
  • Loading branch information
goku321 authored Oct 13, 2020
1 parent 28d62e4 commit 025d2f5
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/query/app/static_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func loadUIConfig(uiConfig string) (map[string]interface{}, error) {
return nil, nil
}
ext := filepath.Ext(uiConfig)
bytes, err := ioutil.ReadFile(uiConfig) /* nolint #nosec , this comes from an admin, not user */
bytes, err := ioutil.ReadFile(filepath.Clean(uiConfig))
if err != nil {
return nil, fmt.Errorf("cannot read UI config file %v: %w", uiConfig, err)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/static/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

const (
// SamplingStrategiesFile contains the name of CLI opions for config file.
// SamplingStrategiesFile contains the name of CLI option for config file.
SamplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
)
Expand Down
104 changes: 81 additions & 23 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path/filepath"
"sync/atomic"
"time"
Expand All @@ -31,6 +33,10 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// null represents "null" JSON value and
// it un-marshals to nil pointer.
var nullJSON = []byte("null")

type strategyStore struct {
logger *zap.Logger

Expand All @@ -45,6 +51,8 @@ type storedStrategies struct {
serviceStrategies map[string]*sampling.SamplingStrategyResponse
}

type strategyLoader func() ([]byte, error)

// NewStrategyStore creates a strategy store that holds static sampling strategies.
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
Expand All @@ -55,14 +63,20 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er
}
h.storedStrategies.Store(defaultStrategies())

strategies, err := loadStrategies(options.StrategiesFile)
if options.StrategiesFile == "" {
h.parseStrategies(nil)
return h, nil
}

loadFn := samplingStrategyLoader(options.StrategiesFile)
strategies, err := loadStrategies(loadFn)
if err != nil {
return nil, err
}
h.parseStrategies(strategies)

if options.ReloadInterval > 0 {
go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile)
go h.autoUpdateStrategies(options.ReloadInterval, loadFn)
}
return h, nil
}
Expand All @@ -83,35 +97,81 @@ func (h *strategyStore) Close() {
h.cancelFunc()
}

func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) {
lastValue := ""
func downloadSamplingStrategies(url string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to download sampling strategies: %w", err)
}

defer resp.Body.Close()
buf := new(bytes.Buffer)
if _, err = buf.ReadFrom(resp.Body); err != nil {
return nil, fmt.Errorf("failed to read sampling strategies HTTP response body: %w", err)
}

if resp.StatusCode == http.StatusServiceUnavailable {
return nullJSON, nil
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf(
"receiving %s while downloading strategies file: %s",
resp.Status,
buf.String(),
)
}

return buf.Bytes(), nil
}

func isURL(str string) bool {
u, err := url.Parse(str)
return err == nil && u.Scheme != "" && u.Host != ""
}

func samplingStrategyLoader(strategiesFile string) strategyLoader {
if isURL(strategiesFile) {
return func() ([]byte, error) {
return downloadSamplingStrategies(strategiesFile)
}
}

return func() ([]byte, error) {
currBytes, err := ioutil.ReadFile(filepath.Clean(strategiesFile))
if err != nil {
return nil, fmt.Errorf("failed to read strategies file %s: %w", strategiesFile, err)
}
return currBytes, nil
}
}

func (h *strategyStore) autoUpdateStrategies(interval time.Duration, loader strategyLoader) {
lastValue := string(nullJSON)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lastValue = h.reloadSamplingStrategyFile(filePath, lastValue)
lastValue = h.reloadSamplingStrategy(loader, lastValue)
case <-h.ctx.Done():
return
}
}
}

func (h *strategyStore) reloadSamplingStrategyFile(filePath string, lastValue string) string {
currBytes, err := ioutil.ReadFile(filepath.Clean(filePath))
func (h *strategyStore) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string {
newValue, err := loadFn()
if err != nil {
h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err))
h.logger.Error("failed to re-load sampling strategies", zap.Error(err))
return lastValue
}
newValue := string(currBytes)
if lastValue == newValue {
if lastValue == string(newValue) {
return lastValue
}
if err = h.updateSamplingStrategy(currBytes); err != nil {
h.logger.Error("failed to update sampling strategies from file", zap.Error(err))
if err := h.updateSamplingStrategy(newValue); err != nil {
h.logger.Error("failed to update sampling strategies", zap.Error(err))
return lastValue
}
return newValue
return string(newValue)
}

func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
Expand All @@ -125,24 +185,22 @@ func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
}

// TODO good candidate for a global util function
func loadStrategies(strategiesFile string) (*strategies, error) {
if strategiesFile == "" {
return nil, nil
}
data, err := ioutil.ReadFile(strategiesFile) /* nolint #nosec , this comes from an admin, not user */
func loadStrategies(loadFn strategyLoader) (*strategies, error) {
strategyBytes, err := loadFn()
if err != nil {
return nil, fmt.Errorf("failed to open strategies file: %w", err)
return nil, err
}
var strategies strategies
if err := json.Unmarshal(data, &strategies); err != nil {

var strategies *strategies
if err := json.Unmarshal(strategyBytes, &strategies); err != nil {
return nil, fmt.Errorf("failed to unmarshal strategies: %w", err)
}
return &strategies, nil
return strategies, nil
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
h.logger.Info("No sampling strategies provided or URL is unavailable, using defaults")
return
}
newStore := defaultStrategies()
Expand Down
Loading

0 comments on commit 025d2f5

Please sign in to comment.