Skip to content

Commit

Permalink
(#284) Fix "test" command for long-running jobs
Browse files Browse the repository at this point in the history
We now use a separte Unix socket to stream the job's output
as it runs.

Unfortunately, we must drop support for this command via TCP.
  • Loading branch information
dshearer committed May 26, 2020
1 parent 39f8fd7 commit f702562
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 45 deletions.
2 changes: 1 addition & 1 deletion ipc/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type TestCmd struct {
}

type TestCmdResp struct {
Result string `json:"result"`
UnixSocketPath string `json:"unixSocketPath"`
nonErrorCmdResp
}

Expand Down
20 changes: 18 additions & 2 deletions jobber/cmd_test_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"flag"
"fmt"
"io"
"net"
"os"
"os/user"

Expand Down Expand Up @@ -53,7 +55,21 @@ func doTestCmd(args []string) int {
return 1
}

// handle response
fmt.Printf("%v\n", resp.Result)
// read output from other socket
conn, err := net.Dial("unix", resp.UnixSocketPath)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
return 1
}
defer conn.Close()
var buf [1024]byte
for {
n, err := conn.Read(buf[:])
os.Stdout.Write(buf[:n])
if err == io.EOF {
break
}
}

return 0
}
12 changes: 5 additions & 7 deletions jobberrunner/cmd_test_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ func (self *JobManager) doTestCmd(cmd ipc.TestCmd) ipc.ICmdResp {
return ipc.NewErrorCmdResp(&common.Error{What: "No such job."})
}

// run the job in this thread
runRec := RunJob(nil, job, self.Shell, true)

// make response
if runRec.Err != nil {
return ipc.NewErrorCmdResp(runRec.Err)
common.Logger.Printf("Trying job %v\n", job.Name)
sockPath, err := self.testJobServer.Launch(job)
if err != nil {
return ipc.NewErrorCmdResp(err)
}

return ipc.TestCmdResp{Result: runRec.Describe()}
return ipc.TestCmdResp{UnixSocketPath: *sockPath}
}
25 changes: 14 additions & 11 deletions jobberrunner/ipc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

type IpcService struct {
cmdChan chan<- CmdContainer
serverType IpcServerType
cmdChan chan<- CmdContainer
}

func (self *IpcService) Reload(
Expand All @@ -21,7 +22,7 @@ func (self *IpcService) Reload(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand All @@ -42,7 +43,7 @@ func (self *IpcService) ListJobs(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand All @@ -63,7 +64,7 @@ func (self *IpcService) Log(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand All @@ -84,7 +85,7 @@ func (self *IpcService) Test(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand All @@ -105,7 +106,7 @@ func (self *IpcService) Cat(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand All @@ -126,7 +127,7 @@ func (self *IpcService) Pause(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand All @@ -147,7 +148,7 @@ func (self *IpcService) Resume(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand All @@ -168,7 +169,7 @@ func (self *IpcService) Init(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand All @@ -189,7 +190,7 @@ func (self *IpcService) SetJob(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand All @@ -210,7 +211,7 @@ func (self *IpcService) DeleteJob(

// send command
respChan := make(chan ipc.ICmdResp, 1)
self.cmdChan <- CmdContainer{cmd, respChan}
self.cmdChan <- CmdContainer{Cmd: cmd, RespChan: respChan, ServerType: self.serverType}

// get response
resp := <-respChan
Expand Down Expand Up @@ -253,6 +254,7 @@ type udsIpcServer struct {
func NewUdsIpcServer(sockPath string, cmdChan chan<- CmdContainer) IpcServer {
server := &udsIpcServer{sockPath: sockPath}
server.service.cmdChan = cmdChan
server.service.serverType = IpcServerTypeUds
return server
}

Expand Down Expand Up @@ -293,6 +295,7 @@ type inetIcpServer struct {
func NewInetIpcServer(port uint, cmdChan chan<- CmdContainer) IpcServer {
server := &inetIcpServer{port: port}
server.service.cmdChan = cmdChan
server.service.serverType = IpcServerTypeInet
return server
}

Expand Down
56 changes: 33 additions & 23 deletions jobberrunner/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,25 @@ import (

"github.com/dshearer/jobber/common"
"github.com/dshearer/jobber/ipc"
"github.com/dshearer/jobber/jobberrunner/testjob"
"github.com/dshearer/jobber/jobfile"
)

type IpcServerType int

const (
IpcServerTypeUds = 0
IpcServerTypeInet = iota
)

type CmdContainer struct {
Cmd ipc.ICmd
RespChan chan<- ipc.ICmdResp
Cmd ipc.ICmd
RespChan chan<- ipc.ICmdResp
ServerType IpcServerType
}

type JobManager struct {
user *user.User
jobfilePath string
launched bool
jfile *jobfile.JobFile
Expand All @@ -25,6 +35,7 @@ type JobManager struct {
mainThreadCtxCancel context.CancelFunc
mainThreadDoneChan chan interface{}
jobRunner JobRunnerThread
testJobServer *testjob.TestJobServer
Shell string
}

Expand All @@ -35,7 +46,7 @@ func NewJobManager(jobfilePath string) *JobManager {
panic(fmt.Sprintf("Failed to get current user: %v", err))
}

jm := JobManager{Shell: "/bin/sh"}
jm := JobManager{Shell: "/bin/sh", user: usr}
jm.jobfilePath = jobfilePath
tmp, err := jobfile.NewEmptyRawJobFile().Activate(usr)
if err != nil {
Expand All @@ -44,6 +55,11 @@ func NewJobManager(jobfilePath string) *JobManager {
}
jm.jfile = tmp
common.LogToStdoutStderr()

jm.mainThreadCtx, jm.mainThreadCtxCancel = context.WithCancel(context.Background())

jm.testJobServer = testjob.NewTestJobServer(jm.mainThreadCtx, jm.Shell, usr)

return &jm
}

Expand Down Expand Up @@ -74,20 +90,15 @@ func (self *JobManager) replaceCurrJobfile(jfile *jobfile.JobFileRaw) {
WARNING: Don't activate new jobfile before stopping job threads. Cf. issue 288.
*/

// get current user
usr, err := user.Current()
if err != nil {
panic(fmt.Sprintf("Failed to get current user: %v", err))
}

// stop job-runner thread and wait for current runs to end
self.jobRunner.Cancel()
for rec := range self.jobRunner.RunRecChan() {
self.handleRunRec(rec)
}

// activate jobfile
self.jfile, err = jfile.Activate(usr)
var err error
self.jfile, err = jfile.Activate(self.user)
if err != nil {
common.ErrLogger.Printf("Error loading jobfile. Reloading previous one. %v\n", err)
self.jobRunner.Start(self.jfile.Jobs, self.Shell)
Expand Down Expand Up @@ -163,14 +174,8 @@ func (self *JobManager) loadJobfile() error {
2. Return the error
*/

// get current user
usr, err := user.Current()
if err != nil {
panic(fmt.Sprintf("Failed to get current user: %v", err))
}

// open jobfile
jfile, err := self.openJobfile(self.jobfilePath, usr)
jfile, err := self.openJobfile(self.jobfilePath, self.user)

if err == nil || os.IsNotExist(err) {
if jfile == nil {
Expand Down Expand Up @@ -221,9 +226,8 @@ func (self *JobManager) handleRunRec(rec *jobfile.RunRec) {
}

func (self *JobManager) runMainThread() {
ctx, cancel :=
self.mainThreadCtx, self.mainThreadCtxCancel =
context.WithCancel(context.Background())
self.mainThreadCtxCancel = cancel

self.CmdChan = make(chan CmdContainer)
self.mainThreadDoneChan = make(chan interface{})
Expand All @@ -244,7 +248,7 @@ func (self *JobManager) runMainThread() {
Loop:
for {
select {
case <-ctx.Done():
case <-self.mainThreadCtx.Done():
break Loop

case rec, ok := <-self.jobRunner.RunRecChan():
Expand All @@ -257,7 +261,7 @@ func (self *JobManager) runMainThread() {
case cmd, ok := <-self.CmdChan:
if ok {
var shouldExit bool
cmd.RespChan <- self.doCmd(cmd.Cmd, &shouldExit)
cmd.RespChan <- self.doCmd(&cmd, &shouldExit)
if shouldExit {
self.mainThreadCtxCancel()
break Loop
Expand All @@ -278,16 +282,19 @@ func (self *JobManager) runMainThread() {
for rec := range self.jobRunner.RunRecChan() {
self.handleRunRec(rec)
}

// wait for "try" command threads
self.testJobServer.Wait()
}()
}

func (self *JobManager) doCmd(
tmpCmd ipc.ICmd,
tmpCmd *CmdContainer,
shouldExit *bool) ipc.ICmdResp { // runs in main thread

*shouldExit = false

switch cmd := tmpCmd.(type) {
switch cmd := tmpCmd.Cmd.(type) {
case ipc.ReloadCmd:
return self.doReloadCmd(cmd)

Expand All @@ -298,6 +305,9 @@ func (self *JobManager) doCmd(
return self.doLogCmd(cmd)

case ipc.TestCmd:
if tmpCmd.ServerType == IpcServerTypeInet {
return ipc.NewErrorCmdResp(fmt.Errorf("\"test\" is no longer supported over TCP"))
}
return self.doTestCmd(cmd)

case ipc.CatCmd:
Expand Down
4 changes: 3 additions & 1 deletion jobberrunner/sources.mk
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ RUNNER_SOURCES := \
jobberrunner/job_runner_thread.go \
jobberrunner/main.go \
jobberrunner/queue.go \
jobberrunner/sources.mk
jobberrunner/sources.mk \
jobberrunner/testjob/test_job_server.go \
jobberrunner/testjob/test_job_thread.go \

RUNNER_TEST_SOURCES := \
jobberrunner/cmd_init_test.go \
Expand Down
Loading

0 comments on commit f702562

Please sign in to comment.