From 106631b10645660b0a481a6fd48963ff651ffee7 Mon Sep 17 00:00:00 2001 From: Max Date: Sun, 2 Apr 2023 15:07:29 +0800 Subject: [PATCH] [add] start tasks and schedules --- cmd/start.go | 32 +++++++++++++++++++++----------- schedule/schedule.go | 31 ++++++++++++++++++++++++++++++- schedule/schedule_test.go | 9 +++++++++ task/task.go | 31 ++++++++++++++++++++++++++++++- task/task_test.go | 9 +++++++++ 5 files changed, 99 insertions(+), 13 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index 701164428e..bd25a2f60d 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -22,10 +22,12 @@ import ( "github.com/yaoapp/kun/log" "github.com/yaoapp/yao/config" "github.com/yaoapp/yao/engine" + ischedule "github.com/yaoapp/yao/schedule" "github.com/yaoapp/yao/service" "github.com/yaoapp/yao/setup" "github.com/yaoapp/yao/share" "github.com/yaoapp/yao/studio" + itask "github.com/yaoapp/yao/task" ) var startDebug = false @@ -134,6 +136,15 @@ var startCmd = &cobra.Command{ } + // Start Tasks + itask.Start() + defer itask.Stop() + + // Start Schedules + ischedule.Start() + defer ischedule.Stop() + + // Start HTTP Server srv, err := service.Start(config.Conf) defer func() { service.Stop(srv) @@ -145,25 +156,24 @@ var startCmd = &cobra.Command{ os.Exit(1) } - // print the messages under the production mode - if mode == "production" { - printApis(true) - printTasks(true) - printSchedules(true) - printConnectors(true) - printStores(true) - } - - // start watching + // Start watching watchDone := make(chan uint8, 1) if mode == "development" && !startDisableWatching { - // Watching fmt.Println(color.WhiteString("\n---------------------------------")) fmt.Println(color.WhiteString(L("Watching"))) fmt.Println(color.WhiteString("---------------------------------")) go service.Watch(srv, watchDone) } + // Print the messages under the production mode + if mode == "production" { + printApis(true) + printTasks(true) + printSchedules(true) + printConnectors(true) + printStores(true) + } + for { select { case v := <-srv.Event(): diff --git a/schedule/schedule.go b/schedule/schedule.go index 5ef28cab45..eebeb8acb9 100644 --- a/schedule/schedule.go +++ b/schedule/schedule.go @@ -1,20 +1,49 @@ package schedule import ( + "fmt" + "strings" + "github.com/yaoapp/gou/application" "github.com/yaoapp/gou/schedule" + "github.com/yaoapp/kun/log" "github.com/yaoapp/yao/config" "github.com/yaoapp/yao/share" ) // Load load schedule func Load(cfg config.Config) error { + messages := []string{} exts := []string{"*.sch.yao", "*.sch.json", "*.sch.jsonc"} - return application.App.Walk("schedules", func(root, file string, isdir bool) error { + err := application.App.Walk("schedules", func(root, file string, isdir bool) error { if isdir { return nil } _, err := schedule.Load(file, share.ID(root, file)) + if err != nil { + messages = append(messages, err.Error()) + } return err }, exts...) + + if len(messages) > 0 { + return fmt.Errorf(strings.Join(messages, ";\n")) + } + return err +} + +// Start schedules +func Start() { + for name, sch := range schedule.Schedules { + sch.Start() + log.Info("[Schedule] %s start", name) + } +} + +// Stop schedules +func Stop() { + for name, sch := range schedule.Schedules { + sch.Stop() + log.Info("[Schedule] %s stop", name) + } } diff --git a/schedule/schedule_test.go b/schedule/schedule_test.go index af0fbd0da4..9ef0aaf00d 100644 --- a/schedule/schedule_test.go +++ b/schedule/schedule_test.go @@ -21,6 +21,15 @@ func TestLoad(t *testing.T) { check(t) } +func TestStartStop(t *testing.T) { + test.Prepare(t, config.Conf) + defer test.Clean() + + Load(config.Conf) + Start() + defer Stop() +} + func check(t *testing.T) { ids := map[string]bool{} for id := range schedule.Schedules { diff --git a/task/task.go b/task/task.go index 53c12fae77..ef6fe9fc09 100644 --- a/task/task.go +++ b/task/task.go @@ -1,20 +1,49 @@ package task import ( + "fmt" + "strings" + "github.com/yaoapp/gou/application" "github.com/yaoapp/gou/task" + "github.com/yaoapp/kun/log" "github.com/yaoapp/yao/config" "github.com/yaoapp/yao/share" ) // Load load task func Load(cfg config.Config) error { + messages := []string{} exts := []string{"*.yao", "*.json", "*.jsonc"} - return application.App.Walk("tasks", func(root, file string, isdir bool) error { + err := application.App.Walk("tasks", func(root, file string, isdir bool) error { if isdir { return nil } _, err := task.Load(file, share.ID(root, file)) + if err != nil { + messages = append(messages, err.Error()) + } return err }, exts...) + + if len(messages) > 0 { + return fmt.Errorf(strings.Join(messages, ";\n")) + } + return err +} + +// Start tasks +func Start() { + for name, t := range task.Tasks { + go t.Start() + log.Info("[Task] %s start", name) + } +} + +// Stop tasks +func Stop() { + for name, t := range task.Tasks { + t.Stop() + log.Info("[Task] %s stop", name) + } } diff --git a/task/task_test.go b/task/task_test.go index 1cb0cc1680..c2d5dee079 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -17,6 +17,15 @@ func TestLoad(t *testing.T) { check(t) } +func TestStartStop(t *testing.T) { + test.Prepare(t, config.Conf) + defer test.Clean() + + Load(config.Conf) + Start() + defer Stop() +} + func check(t *testing.T) { ids := map[string]bool{} for id := range task.Tasks {