-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[file-sd-part-1] Added file sd to query #546
Conversation
df34473
to
6c9aca9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one, thanks! So EXCITED
cmd/thanos/query.go
Outdated
@@ -64,6 +66,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string | |||
stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable)."). | |||
PlaceHolder("<store>").Strings() | |||
|
|||
filesToWatch := cmd.Flag("filesd", "Path to file that contain addresses of store API servers (repeatable)."). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note something about globbing option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
store-sd-files
cmd/thanos/query.go
Outdated
@@ -241,6 +258,15 @@ func runQuery( | |||
|
|||
specs = append(specs, &gossipSpec{id: id, addr: ps.StoreAPIAddr, peer: peer}) | |||
} | |||
|
|||
addrFromFileSD.mtx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not doing just addrFromFileSD.Lock()
cmd/thanos/query.go
Outdated
@@ -264,6 +290,48 @@ func runQuery( | |||
stores.Close() | |||
}) | |||
} | |||
// Run File Service Discovery and update the store set when the files are modified |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed trailing period
cmd/thanos/query.go
Outdated
@@ -264,6 +290,48 @@ func runQuery( | |||
stores.Close() | |||
}) | |||
} | |||
// Run File Service Discovery and update the store set when the files are modified | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if fileSD != nil {
cmd/thanos/query.go
Outdated
{ | ||
if fileSD != nil { | ||
var fileSDUpdates chan *discovery.Discoverable | ||
ctx, cancel := context.WithCancel(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never share contexts between two run groups
cmd/thanos/query.go
Outdated
continue | ||
} | ||
// TODO(ivan): resolve dns here maybe? | ||
addrFromFileSD.update(update.Source, update.Services) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`s/addrFromFileSD/cache
cmd/thanos/query.go
Outdated
} | ||
}, func(error) { | ||
cancel() | ||
stores.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it was closed before?
cmd/thanos/query.go
Outdated
continue | ||
} | ||
// TODO(ivan): resolve dns here maybe? | ||
addrFromFileSD.update(update.Source, update.Services) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate for dups?
6c9aca9
to
1ff8e29
Compare
1ff8e29
to
2105322
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good 👍 like the reuse of Prom code :)
@@ -271,6 +302,46 @@ func runQuery( | |||
stores.Close() | |||
}) | |||
} | |||
// Run File Service Discovery and update the store set when the files are modified. | |||
if fileSD != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit - did we want to put this in a block so we can keep ctx, cancel
as we have done in other blocks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in block, thanks for if
(: right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah its fine, i didnt think about the indenting
docs/components/query.md
Outdated
@@ -129,6 +129,10 @@ Flags: | |||
info endpoint (repeated). | |||
--store=<store> ... Addresses of statically configured store API | |||
servers (repeatable). | |||
--store-sd-file=<path> ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - would change this to match prom file_sd_config
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config so maybe store.file-sd-config
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions
cmd/thanos/query.go
Outdated
@@ -87,6 +95,15 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string | |||
lookupStores[s] = struct{}{} | |||
} | |||
|
|||
var filesd *file.Discovery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/filesd/fileSD/
maybe?
cmd/thanos/query.go
Outdated
@@ -248,6 +271,14 @@ func runQuery( | |||
|
|||
specs = append(specs, &gossipSpec{id: id, addr: ps.StoreAPIAddr, peer: peer}) | |||
} | |||
|
|||
// Add store specs from file sd. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/sd/SD
cmd/thanos/query.go
Outdated
specs = append(specs, query.NewGRPCStoreSpec(addr)) | ||
} | ||
|
||
specs = removeDuplicates(specs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove or we want to fail maybe?
@@ -271,6 +302,46 @@ func runQuery( | |||
stores.Close() | |||
}) | |||
} | |||
// Run File Service Discovery and update the store set when the files are modified. | |||
if fileSD != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in block, thanks for if
(: right?
cmd/thanos/query.go
Outdated
if update == nil { | ||
continue | ||
} | ||
// TODO(ivan): resolve dns here maybe? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this comment is not needed here? Because further PRs are rdy?
} | ||
}, func(error) { | ||
cancelUpdate() | ||
close(fileSDUpdates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm.. Can we ensure we close this ONLY in one place? I feel like both fileSD
and this guy closes this, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File SD doesn't close it. It is responsibility of the one who passes the channel to file SD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so why this comment exists?
// Handle the case that a discoverer exits and closes the channel
// before the context is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, good question. I think this is not needed then. I will remove it. I double checked again but file sd doesn't close this channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to clarify, the check when receiving from the channel is not needed.
cmd/thanos/query.go
Outdated
@@ -339,6 +410,17 @@ func runQuery( | |||
level.Info(logger).Log("msg", "starting query node") | |||
return nil | |||
} | |||
func removeDuplicates(specs []query.StoreSpec) []query.StoreSpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add newline
log line is must have - if not error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would really error out
cmd/thanos/query.go
Outdated
defer f.Unlock() | ||
for _, tg := range tgs { | ||
// Some Discoverers send nil target group so need to check for it to avoid panics. | ||
if tg != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter out wrong cases to avoid indents and have cleaner code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean something like this:
if tg == nil {
continue
}
f.tgs[tg.Source] = tg
docs/components/query.md
Outdated
@@ -129,6 +129,10 @@ Flags: | |||
info endpoint (repeated). | |||
--store=<store> ... Addresses of statically configured store API | |||
servers (repeatable). | |||
--store-sd-file=<path> ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could agree
pkg/discovery/cache.go
Outdated
} | ||
|
||
// NewCache returns a new empty Cache. | ||
func NewCache() *Cache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - a style preference I have would be to make it pkg/discovery/cache/cache.go
which will make it more readable in store.go
as it allows cache.New()
.
if len(*fileSDFiles) > 0 { | ||
conf := &file.SDConfig{ | ||
Files: *fileSDFiles, | ||
RefreshInterval: *fileSDInterval, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only thing i can think of here is what if someone does < 1s
? or not a valid duration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, yeah, it is possible to shoot yourself in the foot with this. However, I would argue that if you are using this flag at all you should know what you are doing? What do you think? We can default to 5s if interval < 5s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let leave it for the moment, we have a sensible default 👍
cmd/thanos/query.go
Outdated
PlaceHolder("<path>").Strings() | ||
|
||
fileSDInterval := modelDuration(cmd.Flag("store.file-sd-config.interval", "Refresh interval to re-read file SD files. (used as a fallback)"). | ||
Default("5s")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think this should be "5m" 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work on this whole chain of PRs 👍
if len(*fileSDFiles) > 0 { | ||
conf := &file.SDConfig{ | ||
Files: *fileSDFiles, | ||
RefreshInterval: *fileSDInterval, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let leave it for the moment, we have a sensible default 👍
Addressed PR comments More pr comments addressed Added logs and metrics when removing dups Removed redundant check Added flag for file sd refresh interval Changed default fileSD interval to 5m make docs [file-sd-part-2] Query e2e test uses different service discovery options (#554) * Query e2e tests different sd options
be86f5c
to
97f393c
Compare
ref: #492
Changes
Added file sd to query. You can specify files to watch via a command line flag. Then the querier will load stores' addresses from the files whenever the files are updated.
We are reusing Prometheus' file sd.
Verification
Tested in a following PR that adds the different SD options to the tests - (#554)