Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ dev:
run-scheduler:
go run main.go scheduler --config conf.toml

.PHONY: run-notifier
run-notifier:
go run main.go notifier --config conf.toml

.PHONY: services
services:
docker-compose up -d
20 changes: 20 additions & 0 deletions cmd/notifier/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package notifier

import (
"github.com/spf13/cobra"
taskexec "github.com/stuckinforloop/ticker/internal/task_exec"
"github.com/stuckinforloop/ticker/worker"
)

// NotifierCmd represents the notifier command
var NotifierCmd = &cobra.Command{
Use: "notifier",
Short: "starts notifier for ticker",
Run: func(cmd *cobra.Command, args []string) {
w := worker.New()
taskExecDAO := taskexec.NewTaskExecDAO(w.DAO)
w.Run(taskExecDAO.UpdateTaskStatusNotify)
},
}

func init() {}
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stuckinforloop/ticker/cmd/executor"
"github.com/stuckinforloop/ticker/cmd/notifier"
"github.com/stuckinforloop/ticker/cmd/scheduler"
"github.com/stuckinforloop/ticker/cmd/server"
)
Expand Down Expand Up @@ -71,5 +72,6 @@ func initConfig() {
func addSubCommands() {
rootCmd.AddCommand(executor.ExecutorCmd)
rootCmd.AddCommand(scheduler.SchedulerCmd)
rootCmd.AddCommand(notifier.NotifierCmd)
rootCmd.AddCommand(server.ServerCmd)
}
2 changes: 1 addition & 1 deletion internal/queue/broker/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (q *SQS) Dequeue(
},
QueueUrl: aws.String(q.getQueueURL(queueName)),
VisibilityTimeout: aws.Int64(visibilityTimeout),
// WaitTimeSeconds: aws.Int64(waitTimeout),
WaitTimeSeconds: aws.Int64(20), // For long polling
})

if err != nil {
Expand Down
161 changes: 160 additions & 1 deletion internal/task_exec/task_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ const (
StatusCompleted Status = "completed"
)

const ExecutorQueueName = "task-executor-1.fifo"
const (
ExecutorQueueName = "task-executor-1.fifo"
NotifierQueueName = "task-status-updates"
)

type TaskExec struct {
ID string `json:"id"`
Expand Down Expand Up @@ -54,6 +57,15 @@ type ExecutorPayload struct {
NotifyEvery *int `json:"notify_every"`
}

type NotifierPayload struct {
TaskID string `json:"task_id"`
TaskExecID string `json:"task_exec_id"`
Status Status `json:"status"`
StartedAt *int64 `json:"started_at"`
FinishedAt *int64 `json:"finished_at"`
Response map[string]any `json:"response"`
}

func (dao *TaskExecDAO) ScheduleTasks(ctx context.Context) error {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -86,6 +98,57 @@ func (dao *TaskExecDAO) ScheduleTasks(ctx context.Context) error {
return nil
}

func (dao *TaskExecDAO) UpdateTaskStatusNotify(ctx context.Context) error {
done := make(chan bool)

go func() {
for {
select {
case <-done:
return
default:
dao.Logger.Info("Attempting dequeue")
messageId, message, err := dao.Queue.Dequeue(ctx, NotifierQueueName, 30)
if err != nil {
dao.Logger.Error("Dequeue failed", zap.Error(err))
continue // Continue to retry
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to put a limit on this in future

}

if message == "" && messageId == "" {
dao.Logger.Info("Dequeue: empty results")
continue
}

dao.Logger.Info("Dequeue success",
zap.String("message", message),
zap.String("message_id", messageId))

err = dao.updateTaskExecStatus(ctx, message)
if err != nil {
dao.Logger.Error("Update exec failed", zap.Error(err))
}
// dao.notify(message)

err = dao.Queue.Acknowledge(ctx, messageId, NotifierQueueName)
if err != nil {
dao.Logger.Error("Ack failed", zap.Error(err))
}

dao.Logger.Info("Ack success",
zap.String("message", message),
zap.String("message_id", messageId))
}
}
}()

select {
case <-ctx.Done():
done <- true
}

return nil
}

func (dao *TaskExecDAO) listTasks(ctx context.Context) ([]task.Task, error) {
dao.Logger.Info("Attempting enqueue")

Expand Down Expand Up @@ -252,6 +315,36 @@ func (dao *TaskExecDAO) findTaskExec(
return &t, nil
}

func (dao *TaskExecDAO) getTaskExecById(
ctx context.Context, id string,
) (*TaskExec, error) {
db := dao.RO()
query := `
SELECT
id, task_id, status, run_at, started_at,
finished_at, response, created_at, updated_at
FROM task_execs
WHERE id = $1
`
t := TaskExec{}
response := []byte{}
if err := db.QueryRowContext(ctx, query, id).Scan(
&t.ID, &t.TaskID, &t.Status, &t.RunAt, &t.StartedAt,
&t.FinishedAt, &response, &t.CreatedAt, &t.UpdatedAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}

return nil, fmt.Errorf("get task_exec by id: %w", err)
}

if err := json.Unmarshal(response, &t.Response); err != nil {
return nil, fmt.Errorf("unmarshal response: %w", err)
}

return &t, nil
}

func (dao *TaskExecDAO) createTaskExec(
ctx context.Context, t *TaskExec,
) (*TaskExec, error) {
Expand All @@ -277,3 +370,69 @@ func (dao *TaskExecDAO) createTaskExec(

return t, nil
}

func (dao *TaskExecDAO) updateTaskExecStatus(
ctx context.Context, message string,
) error {
payload := NotifierPayload{}
err := json.Unmarshal([]byte(message), &payload)
if err != nil {
return fmt.Errorf("notifier unmarshal: %w", err)
}

exec, err := dao.getTaskExecById(ctx, payload.TaskExecID)
if err != nil {
return fmt.Errorf("notifier find task_exec: %w", err)
}

exec.Status = payload.Status
if payload.StartedAt != nil {
exec.StartedAt = payload.StartedAt
}
if payload.FinishedAt != nil {
exec.FinishedAt = payload.FinishedAt
}
if payload.Response != nil {
exec.Response = payload.Response
}

return dao.updateTaskExec(ctx, exec)
}

func (dao *TaskExecDAO) updateTaskExec(
ctx context.Context, t *TaskExec,
) error {
t.UpdatedAt = dao.TimeNow()

response, err := json.Marshal(t.Response)
if err != nil {
return fmt.Errorf("marshal response: %w", err)
}

db := dao.RW()
query := `
UPDATE task_execs
SET
status = $1,
started_at = $2,
finished_at = $3,
response = $4,
updated_at = $5
WHERE id = $6
`
if _, err := db.ExecContext(ctx, query,
t.Status,
t.StartedAt,
t.FinishedAt,
response,
t.UpdatedAt,
t.ID,
); err != nil {
return fmt.Errorf("update task_exec: %w", err)
}

dao.Logger.Info("Task Exec update success",
zap.String("task_exec_id", t.ID))

return nil
}