diff --git a/Makefile b/Makefile index 4050d0d..35df3d6 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,10 @@ dev: run-scheduler: go run main.go scheduler --config conf.toml +.PHONY: run-notifier +run-notifier: + go run main.go notifier --config conf.toml + .PHONY: services services: docker-compose up -d \ No newline at end of file diff --git a/cmd/notifier/notifier.go b/cmd/notifier/notifier.go new file mode 100644 index 0000000..a6942ad --- /dev/null +++ b/cmd/notifier/notifier.go @@ -0,0 +1,20 @@ +package notifier + +import ( + "github.com/spf13/cobra" + taskexec "github.com/stuckinforloop/ticker/internal/task_exec" + "github.com/stuckinforloop/ticker/worker" +) + +// NotifierCmd represents the notifier command +var NotifierCmd = &cobra.Command{ + Use: "notifier", + Short: "starts notifier for ticker", + Run: func(cmd *cobra.Command, args []string) { + w := worker.New() + taskExecDAO := taskexec.NewTaskExecDAO(w.DAO) + w.Run(taskExecDAO.UpdateTaskStatusNotify) + }, +} + +func init() {} diff --git a/cmd/root.go b/cmd/root.go index 22c4d96..086c50f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -8,6 +8,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/stuckinforloop/ticker/cmd/executor" + "github.com/stuckinforloop/ticker/cmd/notifier" "github.com/stuckinforloop/ticker/cmd/scheduler" "github.com/stuckinforloop/ticker/cmd/server" ) @@ -71,5 +72,6 @@ func initConfig() { func addSubCommands() { rootCmd.AddCommand(executor.ExecutorCmd) rootCmd.AddCommand(scheduler.SchedulerCmd) + rootCmd.AddCommand(notifier.NotifierCmd) rootCmd.AddCommand(server.ServerCmd) } diff --git a/internal/queue/broker/sqs.go b/internal/queue/broker/sqs.go index 47398eb..eecd690 100644 --- a/internal/queue/broker/sqs.go +++ b/internal/queue/broker/sqs.go @@ -71,7 +71,7 @@ func (q *SQS) Dequeue( }, QueueUrl: aws.String(q.getQueueURL(queueName)), VisibilityTimeout: aws.Int64(visibilityTimeout), - // WaitTimeSeconds: aws.Int64(waitTimeout), + WaitTimeSeconds: aws.Int64(20), // For long polling }) if err != nil { diff --git a/internal/task_exec/task_exec.go b/internal/task_exec/task_exec.go index 8a3f552..3ad7710 100644 --- a/internal/task_exec/task_exec.go +++ b/internal/task_exec/task_exec.go @@ -23,7 +23,10 @@ const ( StatusCompleted Status = "completed" ) -const ExecutorQueueName = "task-executor-1.fifo" +const ( + ExecutorQueueName = "task-executor-1.fifo" + NotifierQueueName = "task-status-updates" +) type TaskExec struct { ID string `json:"id"` @@ -54,6 +57,15 @@ type ExecutorPayload struct { NotifyEvery *int `json:"notify_every"` } +type NotifierPayload struct { + TaskID string `json:"task_id"` + TaskExecID string `json:"task_exec_id"` + Status Status `json:"status"` + StartedAt *int64 `json:"started_at"` + FinishedAt *int64 `json:"finished_at"` + Response map[string]any `json:"response"` +} + func (dao *TaskExecDAO) ScheduleTasks(ctx context.Context) error { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() @@ -86,6 +98,57 @@ func (dao *TaskExecDAO) ScheduleTasks(ctx context.Context) error { return nil } +func (dao *TaskExecDAO) UpdateTaskStatusNotify(ctx context.Context) error { + done := make(chan bool) + + go func() { + for { + select { + case <-done: + return + default: + dao.Logger.Info("Attempting dequeue") + messageId, message, err := dao.Queue.Dequeue(ctx, NotifierQueueName, 30) + if err != nil { + dao.Logger.Error("Dequeue failed", zap.Error(err)) + continue // Continue to retry + } + + if message == "" && messageId == "" { + dao.Logger.Info("Dequeue: empty results") + continue + } + + dao.Logger.Info("Dequeue success", + zap.String("message", message), + zap.String("message_id", messageId)) + + err = dao.updateTaskExecStatus(ctx, message) + if err != nil { + dao.Logger.Error("Update exec failed", zap.Error(err)) + } + // dao.notify(message) + + err = dao.Queue.Acknowledge(ctx, messageId, NotifierQueueName) + if err != nil { + dao.Logger.Error("Ack failed", zap.Error(err)) + } + + dao.Logger.Info("Ack success", + zap.String("message", message), + zap.String("message_id", messageId)) + } + } + }() + + select { + case <-ctx.Done(): + done <- true + } + + return nil +} + func (dao *TaskExecDAO) listTasks(ctx context.Context) ([]task.Task, error) { dao.Logger.Info("Attempting enqueue") @@ -252,6 +315,36 @@ func (dao *TaskExecDAO) findTaskExec( return &t, nil } +func (dao *TaskExecDAO) getTaskExecById( + ctx context.Context, id string, +) (*TaskExec, error) { + db := dao.RO() + query := ` + SELECT + id, task_id, status, run_at, started_at, + finished_at, response, created_at, updated_at + FROM task_execs + WHERE id = $1 + ` + t := TaskExec{} + response := []byte{} + if err := db.QueryRowContext(ctx, query, id).Scan( + &t.ID, &t.TaskID, &t.Status, &t.RunAt, &t.StartedAt, + &t.FinishedAt, &response, &t.CreatedAt, &t.UpdatedAt); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + + return nil, fmt.Errorf("get task_exec by id: %w", err) + } + + if err := json.Unmarshal(response, &t.Response); err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } + + return &t, nil +} + func (dao *TaskExecDAO) createTaskExec( ctx context.Context, t *TaskExec, ) (*TaskExec, error) { @@ -277,3 +370,69 @@ func (dao *TaskExecDAO) createTaskExec( return t, nil } + +func (dao *TaskExecDAO) updateTaskExecStatus( + ctx context.Context, message string, +) error { + payload := NotifierPayload{} + err := json.Unmarshal([]byte(message), &payload) + if err != nil { + return fmt.Errorf("notifier unmarshal: %w", err) + } + + exec, err := dao.getTaskExecById(ctx, payload.TaskExecID) + if err != nil { + return fmt.Errorf("notifier find task_exec: %w", err) + } + + exec.Status = payload.Status + if payload.StartedAt != nil { + exec.StartedAt = payload.StartedAt + } + if payload.FinishedAt != nil { + exec.FinishedAt = payload.FinishedAt + } + if payload.Response != nil { + exec.Response = payload.Response + } + + return dao.updateTaskExec(ctx, exec) +} + +func (dao *TaskExecDAO) updateTaskExec( + ctx context.Context, t *TaskExec, +) error { + t.UpdatedAt = dao.TimeNow() + + response, err := json.Marshal(t.Response) + if err != nil { + return fmt.Errorf("marshal response: %w", err) + } + + db := dao.RW() + query := ` + UPDATE task_execs + SET + status = $1, + started_at = $2, + finished_at = $3, + response = $4, + updated_at = $5 + WHERE id = $6 + ` + if _, err := db.ExecContext(ctx, query, + t.Status, + t.StartedAt, + t.FinishedAt, + response, + t.UpdatedAt, + t.ID, + ); err != nil { + return fmt.Errorf("update task_exec: %w", err) + } + + dao.Logger.Info("Task Exec update success", + zap.String("task_exec_id", t.ID)) + + return nil +}