Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --cleanup-workspace-dir in start-worker subcommand #53

Merged
merged 4 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ worker:
# Default timeout of the task handler.
# This value will be used when TaskSpec.timeoutSeconds is not set or 0.
defaultTimeout: 30m0s
# Cleanup workspace dir or not when each task handler execution finished.
cleanupWorkspaceDir: false
# Task Handler Command
# A Worker spawns a process with the command for each received tasks
commands:
Expand Down Expand Up @@ -633,6 +635,8 @@ pftaskqueue get-worker [queue] --state=[all,running,succeeded,failed,lost,tosalv
```
┌ {workspace direcoty} # pftaskqueue passes the dir name to stdin of task handler process
│ # also exported as PFTQ_TASK_HANDLER_WORKSPACE_DIR
│ # Note: this directory will be deleted after task handler finished
│ # when taskHandler.CleanupWorkspaceDir is true in worker configuration
│ # pftaskqueue prepares whole the contents
├── input
Expand Down
3 changes: 3 additions & 0 deletions cmd/start_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func init() {
flag.Duration("default-command-timeout", cmdOpts.Worker.TaskHandler.DefaultCommandTimeout, "default timeout for executing command for tasks. the value will be used when the taskspec has no timeout spec")
viperBindPFlag("Worker.TaskHandler.DefaultTimeout", cmdOpts.Worker.TaskHandler.DefaultCommandTimeout.String(), flag.Lookup("default-command-timeout"))

flag.Bool("cleanup-workspace-dir", cmdOpts.Worker.TaskHandler.CleanupWorkspaceDir, "cleanup workspace dir or not when each command execution finished")
viperBindPFlag("Worker.TaskHandler.CleanupWorkspaceDir", strconv.FormatBool(cmdOpts.Worker.TaskHandler.CleanupWorkspaceDir), flag.Lookup("cleanup-workspace-dir"))

flag.Bool("exit-on-suspend", cmdOpts.Worker.ExitOnSuspend, "if set, worker exits when queue is suspended")
viperBindPFlag("Worker.ExitOnSuspend", strconv.FormatBool(cmdOpts.Worker.ExitOnSuspend), flag.Lookup("exit-on-suspend"))

Expand Down
1 change: 1 addition & 0 deletions pkg/apis/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type WorkerSpec struct {
type TaskHandlerSpec struct {
DefaultCommandTimeout time.Duration `json:"defaultTimeout" yaml:"defaultTimeout" mapstructure:"defaultTimeout" default:"30m" validate:"required"`
Commands []string `json:"commands" yaml:"commands" default:"[\"cat\"]" validate:"required"`
CleanupWorkspaceDir bool `json:"cleanupWorkspace" yaml:"cleanupWorkspace" default:"false"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, we should add this option in WoekerSpec.

}

type HeartBeatSpec struct {
Expand Down
16 changes: 12 additions & 4 deletions pkg/backend/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"io/ioutil"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -740,11 +741,18 @@ var _ = Describe("Backend", func() {
Expect(tasks[0].UID).NotTo(Equal(tasks[1].UID))

pending := mustPendingQueueLength(queue.UID.String(), 2)
Expect(pending[0]).To(Equal(tasks[1].UID))
Expect(pending[1]).To(Equal(tasks[0].UID))
taskUIDs := mustTasksSetSize(queue.UID.String(), 2)
Expect(taskUIDs[0]).To(Equal(tasks[1].UID))
Expect(taskUIDs[1]).To(Equal(tasks[0].UID))

sort.Strings(pending)
sort.Strings(taskUIDs)
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].UID < tasks[j].UID
})

Expect(pending[0]).To(Equal(tasks[0].UID))
Expect(pending[1]).To(Equal(tasks[1].UID))
Expect(taskUIDs[0]).To(Equal(tasks[0].UID))
Expect(taskUIDs[1]).To(Equal(tasks[1].UID))
assertKeyContents(backend.taskKey(queue.UID.String(), tasks[0].UID), tasks[0])
assertKeyContents(backend.taskKey(queue.UID.String(), tasks[1].UID), tasks[1])
})
Expand Down
20 changes: 18 additions & 2 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ func (w *Worker) runCommand(logger zerolog.Logger, t *task.Task) (task.TaskResul
}

workspacePath, envvars, err := w.prepareTaskHandlerDirAndEnvvars(t)
if w.config.TaskHandler.CleanupWorkspaceDir {
defer func() {
if workspacePath == "" {
workspacePath = w.workspacePath(t)
}
if _, err := os.Stat(workspacePath); errors.Is(err, os.ErrNotExist) {
logger.Info().Str("workspaceDir", workspacePath).Msg("Skip cleaning up because workspace dir does not exist")
return
}
if err := os.RemoveAll(workspacePath); err != nil {
logger.Error().Err(err).Str("workspaceDir", workspacePath).Msg("Failed to cleanup workspace dir")
return
}
logger.Info().Str("workspaceDir", workspacePath).Msg("Cleaned up workspace dir")
}()
}
if err != nil {
msg := "Can't prepare workspace dir for task handler process"
result := task.TaskResult{
Expand Down Expand Up @@ -395,11 +411,11 @@ func (w *Worker) runCommand(logger zerolog.Logger, t *task.Task) (task.TaskResul
return result, postHooks
}

func (w *Worker) thisWorkerWorkDir() string {
func (w *Worker) WorkerWorkDir() string {
return filepath.Join(w.config.WorkDir, w.uid.String())
}
func (w *Worker) workspacePath(t *task.Task) string {
return filepath.Join(w.thisWorkerWorkDir(), t.Status.CurrentRecord.ProcessUID)
return filepath.Join(w.WorkerWorkDir(), t.Status.CurrentRecord.ProcessUID)
}

// {task workspace path}/
Expand Down
5 changes: 5 additions & 0 deletions pkg/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var _ = Describe("Worker", func() {
Concurrency: 4,
TaskHandler: apiworker.TaskHandlerSpec{
DefaultCommandTimeout: 1 * time.Minute,
CleanupWorkspaceDir: true,
Commands: []string{
"bash",
"-c",
Expand Down Expand Up @@ -189,6 +190,10 @@ var _ = Describe("Worker", func() {
suspend.State = taskqueue.TaskQueueStateSuspend
testutil.MustQueueWithState(bcknd, suspend)
Eventually(workerDone, 30*time.Second).Should(Receive(BeNil()))

fileList, err := ioutil.ReadDir(worker.WorkerWorkDir())
Expect(err).NotTo(HaveOccurred())
Expect(len(fileList)).To(BeZero())
})
})
})