-
Notifications
You must be signed in to change notification settings - Fork 493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update the UDF socket server to cleanly stop. #1500
Conversation
b27780e
to
8f517a2
Compare
services/udf/service.go
Outdated
@@ -109,7 +111,7 @@ func (s *Service) Refresh(name string) error { | |||
} | |||
|
|||
func (s *Service) loadUDFInfo(name string) (udf.Info, error) { | |||
u, err := s.Create(name, s.logger, nil) | |||
u, err := s.Create(name, "", "", s.logger, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are these nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scratch that. I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally because the UDF is not being created in the context of a task or node.
More specificially because loadUDFInfo only makes the Info
request and never sends the Init
request so the taskID and nodeID will never be sent or subsequently used.
I should probably add a comment to that fact since its not clear.
@@ -50,7 +50,7 @@ type LogService interface { | |||
type UDFService interface { | |||
List() []string | |||
Info(name string) (udf.Info, bool) | |||
Create(name string, l *log.Logger, abortCallback func()) (udf.Interface, error) | |||
Create(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the motivation for exposing the task and node IDs? Is it just adding some additional nice to have information? Or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation came from developing Morgoth more where I realized that as a UDF the process gets lots of connections from Kapacitor but it doesn't know the context of the connection. This way when Morgoth get a connection and is told to initialize it knows under which context it is operating, thus providing the user of Morgoth with more context in its logs and exposed metrics.
In short, it pushes down context so that it is not lost.
return | ||
} | ||
s.stopped = true | ||
s.listener.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is ignoring the possible error here okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, although I don't know why :) It was being ignored previously. And is typically closed via defer, meaning its error is typically ignored.
udf/agent/server.go
Outdated
go func() { | ||
s.wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the immediate Done()
? I haven't seen this pattern before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a bug :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, I always for get to click the start review button. Just the couple things noted!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 🥇
f6399e1
to
1c61319
Compare
Update the UDF socket server to cleanly stop.
Now its possible to use defer to Stop the server safely.
Also add two fields to the initialize response for the task ID and node ID of the UDF. This way the Kapacitor context can be passed down to the UDF node for correct reporting.