Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slightly simplify the queue settings code to help reduce the risk of problems #12976

Merged
merged 2 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions modules/queue/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,56 @@ import (
"reflect"
)

// Mappable represents an interface that can MapTo another interface
type Mappable interface {
MapTo(v interface{}) error
}

// toConfig will attempt to convert a given configuration cfg into the provided exemplar type.
//
// It will tolerate the cfg being passed as a []byte or string of a json representation of the
// exemplar or the correct type of the exemplar itself
func toConfig(exemplar, cfg interface{}) (interface{}, error) {

// First of all check if we've got the same type as the exemplar - if so it's all fine.
if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
return cfg, nil
}

// Now if not - does it provide a MapTo function we can try?
if mappable, ok := cfg.(Mappable); ok {
newVal := reflect.New(reflect.TypeOf(exemplar))
if err := mappable.MapTo(newVal.Interface()); err == nil {
return newVal.Elem().Interface(), nil
}
// MapTo has failed us ... let's try the json route ...
}

// OK we've been passed a byte array right?
configBytes, ok := cfg.([]byte)
if !ok {
configStr, ok := cfg.(string)
if !ok {
return nil, ErrInvalidConfiguration{cfg: cfg}
}
// oh ... it's a string then?
var configStr string

configStr, ok = cfg.(string)
configBytes = []byte(configStr)
}
if !ok {
// hmm ... can we marshal it to json?
var err error

configBytes, err = json.Marshal(cfg)
ok = (err == nil)
}
if !ok {
// no ... we've tried hard enough at this point - throw an error!
return nil, ErrInvalidConfiguration{cfg: cfg}
}

// OK unmarshal the byte array into a new copy of the exemplar
newVal := reflect.New(reflect.TypeOf(exemplar))
if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
// If we can't unmarshal it then return an error!
return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
}
return newVal.Elem().Interface(), nil
Expand Down
25 changes: 4 additions & 21 deletions modules/queue/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,10 @@ func validType(t string) (Type, error) {

func getQueueSettings(name string) (setting.QueueSettings, []byte) {
q := setting.GetQueueSettings(name)
opts := make(map[string]interface{})
opts["Name"] = name
opts["QueueLength"] = q.Length
opts["BatchLength"] = q.BatchLength
opts["DataDir"] = q.DataDir
opts["Addresses"] = q.Addresses
opts["Network"] = q.Network
opts["Password"] = q.Password
opts["DBIndex"] = q.DBIndex
opts["QueueName"] = q.QueueName
opts["SetName"] = q.SetName
opts["Workers"] = q.Workers
opts["MaxWorkers"] = q.MaxWorkers
opts["BlockTimeout"] = q.BlockTimeout
opts["BoostTimeout"] = q.BoostTimeout
opts["BoostWorkers"] = q.BoostWorkers
opts["ConnectionString"] = q.ConnectionString

cfg, err := json.Marshal(opts)
cfg, err := json.Marshal(q)
if err != nil {
log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
log.Error("Unable to marshall generic options: %v Error: %v", q, err)
log.Error("Unable to create queue for %s", name, err)
return q, []byte{}
}
Expand Down Expand Up @@ -75,7 +58,7 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
Timeout: q.Timeout,
MaxAttempts: q.MaxAttempts,
Config: cfg,
QueueLength: q.Length,
QueueLength: q.QueueLength,
Name: name,
}, exemplar)
}
Expand Down Expand Up @@ -114,7 +97,7 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un
Timeout: q.Timeout,
MaxAttempts: q.MaxAttempts,
Config: cfg,
QueueLength: q.Length,
QueueLength: q.QueueLength,
}, exemplar)
}
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions modules/setting/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (

// QueueSettings represent the settings for a queue from the ini
type QueueSettings struct {
Name string
DataDir string
Length int
QueueLength int `ini:"LENGTH"`
BatchLength int
ConnectionString string
Type string
Expand All @@ -44,6 +45,8 @@ var Queue = QueueSettings{}
func GetQueueSettings(name string) QueueSettings {
q := QueueSettings{}
sec := Cfg.Section("queue." + name)
q.Name = name

// DataDir is not directly inheritable
q.DataDir = filepath.Join(Queue.DataDir, name)
// QueueName is not directly inheritable either
Expand All @@ -65,8 +68,9 @@ func GetQueueSettings(name string) QueueSettings {
q.DataDir = filepath.Join(AppDataPath, q.DataDir)
}
_, _ = sec.NewKey("DATADIR", q.DataDir)

// The rest are...
q.Length = sec.Key("LENGTH").MustInt(Queue.Length)
q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength)
q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength)
q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString)
q.Type = sec.Key("TYPE").MustString(Queue.Type)
Expand All @@ -91,7 +95,7 @@ func NewQueueService() {
if !filepath.IsAbs(Queue.DataDir) {
Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir)
}
Queue.Length = sec.Key("LENGTH").MustInt(20)
Queue.QueueLength = sec.Key("LENGTH").MustInt(20)
Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20)
Queue.ConnectionString = sec.Key("CONN_STR").MustString("")
Queue.Type = sec.Key("TYPE").MustString("persistable-channel")
Expand Down