Skip to content

Commit

Permalink
pack more information about job and exec id
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Dec 12, 2024
1 parent 8425da0 commit e15d001
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 87 deletions.
27 changes: 18 additions & 9 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,20 +530,29 @@ func (n *Engine) TriggerTask(user *model.User, payload *avsproto.UserTriggerTask
if payload.RunInline {
// Run the task inline, by pass the queue system
executor := NewExecutor(n.db, n.logger)
executor.RunTask(task, payload.TriggerMark)
} else {
jid, err := n.queue.Enqueue(ExecuteTask, payload.TaskId, data)
if err != nil {
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_StorageUnavailable), StorageQueueUnavailableError)
execution, err := executor.RunTask(task, payload.TriggerMark)
if err == nil {
return &avsproto.UserTriggerTaskResp{
Result: true,
ExecutionId: execution.Id,
}, nil
}

n.logger.Info("enqueue task into the queue system", "task_id", payload.TaskId, "jid", jid)
return &avsproto.UserTriggerTaskResp{
Result: true,
}, nil
Result: false,
}, err
}

return nil, nil
jid, err := n.queue.Enqueue(ExecuteTask, payload.TaskId, data)
if err != nil {
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_StorageUnavailable), StorageQueueUnavailableError)
}

n.logger.Info("enqueue task into the queue system", "task_id", payload.TaskId, "jid", jid)
return &avsproto.UserTriggerTaskResp{
Result: true,
JobId: fmt.Sprintf("%d", jid),
}, nil
}

// List Execution for a given task id
Expand Down
11 changes: 6 additions & 5 deletions core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error {
return fmt.Errorf("error decode job payload when executing task: %s with job id %d", task.Id, job.ID)
}

return x.RunTask(task, triggerMark)
_, err = x.RunTask(task, triggerMark)
return err
}

func (x *TaskExecutor) RunTask(task *model.Task, triggerMark *avsproto.TriggerMark) error {
func (x *TaskExecutor) RunTask(task *model.Task, triggerMark *avsproto.TriggerMark) (*avsproto.Execution, error) {
vm, err := NewVMWithData(task.Id, triggerMark, task.Nodes, task.Edges)

if err != nil {
return fmt.Errorf("vm failed to initialize: %w", err)
return nil, fmt.Errorf("vm failed to initialize: %w", err)
}

t0 := time.Now()
Expand Down Expand Up @@ -129,9 +130,9 @@ func (x *TaskExecutor) RunTask(task *model.Task, triggerMark *avsproto.TriggerMa

if runTaskErr == nil {
x.logger.Info("succesfully executing task", "task_id", task.Id, "triggermark", triggerMark)
return nil
return execution, nil
}
return fmt.Errorf("Error executing task %s %v", task.Id, runTaskErr)
return execution, fmt.Errorf("Error executing task %s %v", task.Id, runTaskErr)
}

type ContractProcessor struct {
Expand Down
2 changes: 1 addition & 1 deletion examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async function triggerTask(owner, token, taskId, triggerMark) {
const result = await asyncRPC(
client,
"TriggerTask",
{ task_id: taskId, triggerMark },
{ task_id: taskId, triggerMark, },
metadata
);

Expand Down
161 changes: 89 additions & 72 deletions protobuf/avs.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions protobuf/avs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ message UserTriggerTaskResp {
bool result = 1;
// if trigger inline, the execution id will be returned
string execution_id = 2;
// when running async, we get back a job id
string job_id = 3;
}

service Aggregator {
Expand Down

0 comments on commit e15d001

Please sign in to comment.