Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the UDF socket server to cleanly stop. #1500

Merged
merged 2 commits into from
Jul 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

- [#1400](https://github.com/influxdata/kapacitor/issues/1400): Allow for `.yml` file extensions in `define-topic-handler`
- [#1402](https://github.com/influxdata/kapacitor/pull/1402): Fix http server error logging.
- [#1500](https://github.com/influxdata/kapacitor/pull/1500): Fix bugs with stopping running UDF agent.

## v1.3.1 [2017-06-02]

Expand Down
6 changes: 3 additions & 3 deletions integrations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func compareAlertData(exp, got alert.Data) (bool, string) {
type UDFService struct {
ListFunc func() []string
InfoFunc func(name string) (udf.Info, bool)
CreateFunc func(name string, l *log.Logger, abortCallback func()) (udf.Interface, error)
CreateFunc func(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error)
}

func (u UDFService) List() []string {
Expand All @@ -148,8 +148,8 @@ func (u UDFService) Info(name string) (udf.Info, bool) {
return u.InfoFunc(name)
}

func (u UDFService) Create(name string, l *log.Logger, abortCallback func()) (udf.Interface, error) {
return u.CreateFunc(name, l, abortCallback)
func (u UDFService) Create(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error) {
return u.CreateFunc(name, taskID, nodeID, l, abortCallback)
}

type taskStore struct{}
Expand Down
10 changes: 8 additions & 2 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5930,11 +5930,11 @@ stream
return
}
uio := udf_test.NewIO()
udfService.CreateFunc = func(name string, l *log.Logger, abortCallback func()) (udf.Interface, error) {
udfService.CreateFunc = func(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error) {
if name != "customFunc" {
return nil, fmt.Errorf("unknown function %s", name)
}
return udf_test.New(uio, l), nil
return udf_test.New(taskID, nodeID, uio, l), nil
}

tmInit := func(tm *kapacitor.TaskMaster) {
Expand All @@ -5950,6 +5950,12 @@ stream
t.Error("expected init message")
}
init := i.Init
if got, exp := init.TaskID, "TestStream_CustomFunctions"; got != exp {
t.Errorf("unexpected task ID got %q exp %q", got, exp)
}
if got, exp := init.NodeID, "customFunc4"; got != exp {
t.Errorf("unexpected task ID got %q exp %q", got, exp)
}

if got, exp := len(init.Options), 2; got != exp {
t.Fatalf("unexpected number of options in init request, got %d exp %d", got, exp)
Expand Down
9 changes: 7 additions & 2 deletions services/udf/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *Service) Info(name string) (udf.Info, bool) {
}

func (s *Service) Create(
name string,
name, taskID, nodeID string,
l *log.Logger,
abortCallback func(),
) (udf.Interface, error) {
Expand All @@ -70,6 +70,7 @@ func (s *Service) Create(
if conf.Socket != "" {
// Create socket UDF
return kapacitor.NewUDFSocket(
taskID, nodeID,
kapacitor.NewSocketConn(conf.Socket),
l,
time.Duration(conf.Timeout),
Expand All @@ -87,6 +88,7 @@ func (s *Service) Create(
Env: env,
}
return kapacitor.NewUDFProcess(
taskID, nodeID,
command.ExecCommander,
cmdSpec,
l,
Expand All @@ -109,7 +111,10 @@ func (s *Service) Refresh(name string) error {
}

func (s *Service) loadUDFInfo(name string) (udf.Info, error) {
u, err := s.Create(name, s.logger, nil)
// loadUDFInfo creates a UDF connection outside the context of a task or node
// because it only makes the Info request and never makes an Init request.
// As such it does not need to provide actual task and node IDs.
u, err := s.Create(name, "", "", s.logger, nil)
if err != nil {
return udf.Info{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type LogService interface {
type UDFService interface {
List() []string
Info(name string) (udf.Info, bool)
Create(name string, l *log.Logger, abortCallback func()) (udf.Interface, error)
Create(name, taskID, nodeID string, l *log.Logger, abortCallback func()) (udf.Interface, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for exposing the task and node IDs? Is it just adding some additional nice to have information? Or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation came from developing Morgoth more where I realized that as a UDF the process gets lots of connections from Kapacitor but it doesn't know the context of the connection. This way when Morgoth get a connection and is told to initialize it knows under which context it is operating, thus providing the user of Morgoth with more context in its logs and exposed metrics.

In short, it pushes down context so that it is not lost.

}

var ErrTaskMasterClosed = errors.New("TaskMaster is closed")
Expand Down
18 changes: 18 additions & 0 deletions udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func newUDFNode(et *ExecutingTask, n *pipeline.UDFNode, l *log.Logger) (*UDFNode
// Create the UDF
f, err := et.tm.UDFService.Create(
n.UDFName,
et.Task.ID,
n.Name(),
l,
un.abortedCallback,
)
Expand Down Expand Up @@ -143,6 +145,9 @@ func (n *UDFNode) snapshot() ([]byte, error) {
// over STDIN and STDOUT. Lines received over STDERR are logged
// via normal Kapacitor logging.
type UDFProcess struct {
taskName string
nodeName string

server *udf.Server
commander command.Commander
cmdSpec command.Spec
Expand All @@ -162,13 +167,16 @@ type UDFProcess struct {
}

func NewUDFProcess(
taskName, nodeName string,
commander command.Commander,
cmdSpec command.Spec,
l *log.Logger,
timeout time.Duration,
abortCallback func(),
) *UDFProcess {
return &UDFProcess{
taskName: taskName,
nodeName: nodeName,
commander: commander,
cmdSpec: cmdSpec,
logger: l,
Expand Down Expand Up @@ -208,6 +216,8 @@ func (p *UDFProcess) Open() error {
outBuf := bufio.NewReader(stdout)

p.server = udf.NewServer(
p.taskName,
p.nodeName,
outBuf,
stdin,
p.logger,
Expand Down Expand Up @@ -269,6 +279,9 @@ func (p *UDFProcess) Out() <-chan edge.Message { return p.server.Out()
func (p *UDFProcess) Info() (udf.Info, error) { return p.server.Info() }

type UDFSocket struct {
taskName string
nodeName string

server *udf.Server
socket Socket

Expand All @@ -285,12 +298,15 @@ type Socket interface {
}

func NewUDFSocket(
taskName, nodeName string,
socket Socket,
l *log.Logger,
timeout time.Duration,
abortCallback func(),
) *UDFSocket {
return &UDFSocket{
taskName: taskName,
nodeName: nodeName,
socket: socket,
logger: l,
timeout: timeout,
Expand All @@ -308,6 +324,8 @@ func (s *UDFSocket) Open() error {
outBuf := bufio.NewReader(out)

s.server = udf.NewServer(
s.taskName,
s.nodeName,
outBuf,
in,
s.logger,
Expand Down
Loading