Skip to content

Commit

Permalink
rename TriggerMark -> TriggerMetadata (#68)
Browse files Browse the repository at this point in the history
- rename TriggerMark -> TriggerMetadata
- rename run_inline -> is_blocking
  • Loading branch information
v9n authored Dec 12, 2024
1 parent b88c795 commit a3c4ce7
Show file tree
Hide file tree
Showing 15 changed files with 577 additions and 544 deletions.
8 changes: 4 additions & 4 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (n *Engine) AggregateChecksResult(address string, payload *avsproto.NotifyT
n.logger.Info("processed aggregator check hit", "operator", address, "task_id", payload.TaskId)
n.lock.Unlock()

data, err := json.Marshal(payload.TriggerMarker)
data, err := json.Marshal(payload.TriggerMetadata)
if err != nil {
n.logger.Error("error serialize trigger to json", err)
return err
Expand Down Expand Up @@ -521,16 +521,16 @@ func (n *Engine) TriggerTask(user *model.User, payload *avsproto.UserTriggerTask
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

data, err := json.Marshal(payload.TriggerMark)
data, err := json.Marshal(payload.TriggerMetadata)
if err != nil {
n.logger.Error("error serialize trigger to json", err)
return nil, status.Errorf(codes.InvalidArgument, codes.InvalidArgument.String())
}

if payload.RunInline {
if payload.IsBlocking {
// Run the task inline, by pass the queue system
executor := NewExecutor(n.db, n.logger)
execution, err := executor.RunTask(task, payload.TriggerMark)
execution, err := executor.RunTask(task, payload.TriggerMetadata)
if err == nil {
return &avsproto.UserTriggerTaskResp{
Result: true,
Expand Down
28 changes: 14 additions & 14 deletions core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error {
return fmt.Errorf("fail to load task: %s", job.Name)
}

triggerMark := &avsproto.TriggerMark{}
triggerMetadata := &avsproto.TriggerMetadata{}
// A task executor data is the trigger mark
// ref: AggregateChecksResult
err = json.Unmarshal(job.Data, triggerMark)
err = json.Unmarshal(job.Data, triggerMetadata)
if err != nil {
return fmt.Errorf("error decode job payload when executing task: %s with job id %d", task.Id, job.ID)
}

_, err = x.RunTask(task, triggerMark)
_, err = x.RunTask(task, triggerMetadata)
return err
}

func (x *TaskExecutor) RunTask(task *model.Task, triggerMark *avsproto.TriggerMark) (*avsproto.Execution, error) {
vm, err := NewVMWithData(task.Id, triggerMark, task.Nodes, task.Edges)
func (x *TaskExecutor) RunTask(task *model.Task, triggerMetadata *avsproto.TriggerMetadata) (*avsproto.Execution, error) {
vm, err := NewVMWithData(task.Id, triggerMetadata, task.Nodes, task.Edges)

if err != nil {
return nil, fmt.Errorf("vm failed to initialize: %w", err)
Expand All @@ -98,17 +98,17 @@ func (x *TaskExecutor) RunTask(task *model.Task, triggerMark *avsproto.TriggerMa
}

execution := &avsproto.Execution{
Id: ulid.Make().String(),
StartAt: t0.Unix(),
EndAt: t1.Unix(),
Success: err == nil,
Error: "",
Steps: vm.ExecutionLogs,
TriggerMark: triggerMark,
Id: ulid.Make().String(),
StartAt: t0.Unix(),
EndAt: t1.Unix(),
Success: err == nil,
Error: "",
Steps: vm.ExecutionLogs,
TriggerMetadata: triggerMetadata,
}

if runTaskErr != nil {
x.logger.Error("error executing task", "error", err, "task_id", task.Id, "triggermark", triggerMark)
x.logger.Error("error executing task", "error", err, "task_id", task.Id, "triggermark", triggerMetadata)
execution.Error = runTaskErr.Error()
}

Expand All @@ -129,7 +129,7 @@ func (x *TaskExecutor) RunTask(task *model.Task, triggerMark *avsproto.TriggerMa
}

if runTaskErr == nil {
x.logger.Info("succesfully executing task", "task_id", task.Id, "triggermark", triggerMark)
x.logger.Info("succesfully executing task", "task_id", task.Id, "triggermark", triggerMetadata)
return execution, nil
}
return execution, fmt.Errorf("Error executing task %s %v", task.Id, runTaskErr)
Expand Down
8 changes: 4 additions & 4 deletions core/taskengine/trigger/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
)

type TriggerMark[T any] struct {
type TriggerMetadata[T any] struct {
TaskID string

Marker T
Expand All @@ -25,10 +25,10 @@ type BlockTrigger struct {
schedule map[int64]map[string]bool

// channel that we will push the trigger information back
triggerCh chan TriggerMark[int64]
triggerCh chan TriggerMetadata[int64]
}

func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMark[int64]) *BlockTrigger {
func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64]) *BlockTrigger {
var err error

logger, err := sdklogging.NewZapLogger(sdklogging.Production)
Expand Down Expand Up @@ -113,7 +113,7 @@ func (b *BlockTrigger) Run(ctx context.Context) error {
z := new(big.Int)
if z.Mod(header.Number, big.NewInt(int64(interval))).Cmp(zero) == 0 {
for taskID, _ := range tasks {
b.triggerCh <- TriggerMark[int64]{
b.triggerCh <- TriggerMetadata[int64]{
TaskID: taskID,
Marker: header.Number.Int64(),
}
Expand Down
6 changes: 3 additions & 3 deletions core/taskengine/trigger/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type EventTrigger struct {
checks sync.Map

// channel that we will push the trigger information back
triggerCh chan TriggerMark[EventMark]
triggerCh chan TriggerMetadata[EventMark]
}

func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMark[EventMark]) *EventTrigger {
func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark]) *EventTrigger {
var err error

logger, err := sdklogging.NewZapLogger(sdklogging.Production)
Expand Down Expand Up @@ -135,7 +135,7 @@ func (evt *EventTrigger) Run(ctx context.Context) error {
check := value.(*Check)
if hit, err := evt.Evaluate(&event, check.Program); err == nil && hit {
evt.logger.Info("check hit, notify aggregator", "task_id", key)
evt.triggerCh <- TriggerMark[EventMark]{
evt.triggerCh <- TriggerMetadata[EventMark]{
TaskID: key.(string),
Marker: EventMark{
BlockNumber: event.BlockNumber,
Expand Down
2 changes: 1 addition & 1 deletion core/taskengine/trigger/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestChainlinkLatestAnswer(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMark[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000))

envs := macros.GetEnvs(map[string]interface{}{
"trigger1": map[string]interface{}{
Expand Down
12 changes: 6 additions & 6 deletions core/taskengine/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (v *VM) Reset() {
v.instructionCount = 0
}

func NewVMWithData(taskID string, triggerMark *avsproto.TriggerMark, nodes []*avsproto.TaskNode, edges []*avsproto.TaskEdge) (*VM, error) {
func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nodes []*avsproto.TaskNode, edges []*avsproto.TaskEdge) (*VM, error) {
v := &VM{
Status: VMStateInitialize,
TaskEdges: edges,
Expand All @@ -92,24 +92,24 @@ func NewVMWithData(taskID string, triggerMark *avsproto.TriggerMark, nodes []*av
v.vars = macros.GetEnvs(map[string]any{})

// popular trigger data for trigger variable
if triggerMark != nil && triggerMark.LogIndex > 0 && triggerMark.TxHash != "" {
if triggerMetadata != nil && triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" {
// if it contains event, we need to fetch and pop
receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMark.TxHash))
receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash))
if err != nil {
return nil, err
}

var event *types.Log
//event := receipt.Logs[triggerMark.LogIndex]
//event := receipt.Logs[triggerMetadata.LogIndex]

for _, l := range receipt.Logs {
if uint64(l.Index) == triggerMark.LogIndex {
if uint64(l.Index) == triggerMetadata.LogIndex {
event = l
}
}

if event == nil {
return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMark.TxHash, triggerMark.LogIndex)
return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex)
}

tokenMetadata, err := GetMetadataForTransfer(event)
Expand Down
2 changes: 1 addition & 1 deletion core/taskengine/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func TestEvaluateEvent(t *testing.T) {
},
}

mark := avsproto.TriggerMark{
mark := avsproto.TriggerMetadata{
BlockNumber: 7212417,
TxHash: "0x53beb2163994510e0984b436ebc828dc57e480ee671cfbe7ed52776c2a4830c8",
LogIndex: 98,
Expand Down
2 changes: 1 addition & 1 deletion examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async function triggerTask(owner, token, taskId, triggerMark) {
"TriggerTask",
// If want to run async, comment this line out
//{ task_id: taskId, triggerMark, },
{ task_id: taskId, triggerMark, run_inline: true },
{ task_id: taskId, triggerMark, is_blocking: true },
metadata
);

Expand Down
46 changes: 23 additions & 23 deletions examples/static_codegen/avs_grpc_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,37 @@ function deserialize_aggregator_CreateTaskResp(buffer_arg) {
return avs_pb.CreateTaskResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_CreateWalletReq(arg) {
if (!(arg instanceof avs_pb.CreateWalletReq)) {
throw new Error('Expected argument of type aggregator.CreateWalletReq');
function serialize_aggregator_GetKeyReq(arg) {
if (!(arg instanceof avs_pb.GetKeyReq)) {
throw new Error('Expected argument of type aggregator.GetKeyReq');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_CreateWalletReq(buffer_arg) {
return avs_pb.CreateWalletReq.deserializeBinary(new Uint8Array(buffer_arg));
function deserialize_aggregator_GetKeyReq(buffer_arg) {
return avs_pb.GetKeyReq.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_CreateWalletResp(arg) {
if (!(arg instanceof avs_pb.CreateWalletResp)) {
throw new Error('Expected argument of type aggregator.CreateWalletResp');
function serialize_aggregator_GetWalletReq(arg) {
if (!(arg instanceof avs_pb.GetWalletReq)) {
throw new Error('Expected argument of type aggregator.GetWalletReq');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_CreateWalletResp(buffer_arg) {
return avs_pb.CreateWalletResp.deserializeBinary(new Uint8Array(buffer_arg));
function deserialize_aggregator_GetWalletReq(buffer_arg) {
return avs_pb.GetWalletReq.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_GetKeyReq(arg) {
if (!(arg instanceof avs_pb.GetKeyReq)) {
throw new Error('Expected argument of type aggregator.GetKeyReq');
function serialize_aggregator_GetWalletResp(arg) {
if (!(arg instanceof avs_pb.GetWalletResp)) {
throw new Error('Expected argument of type aggregator.GetWalletResp');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_aggregator_GetKeyReq(buffer_arg) {
return avs_pb.GetKeyReq.deserializeBinary(new Uint8Array(buffer_arg));
function deserialize_aggregator_GetWalletResp(buffer_arg) {
return avs_pb.GetWalletResp.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_aggregator_IdReq(arg) {
Expand Down Expand Up @@ -240,16 +240,16 @@ getNonce: {
responseSerialize: serialize_aggregator_NonceResp,
responseDeserialize: deserialize_aggregator_NonceResp,
},
createWallet: {
path: '/aggregator.Aggregator/CreateWallet',
getWallet: {
path: '/aggregator.Aggregator/GetWallet',
requestStream: false,
responseStream: false,
requestType: avs_pb.CreateWalletReq,
responseType: avs_pb.CreateWalletResp,
requestSerialize: serialize_aggregator_CreateWalletReq,
requestDeserialize: deserialize_aggregator_CreateWalletReq,
responseSerialize: serialize_aggregator_CreateWalletResp,
responseDeserialize: deserialize_aggregator_CreateWalletResp,
requestType: avs_pb.GetWalletReq,
responseType: avs_pb.GetWalletResp,
requestSerialize: serialize_aggregator_GetWalletReq,
requestDeserialize: deserialize_aggregator_GetWalletReq,
responseSerialize: serialize_aggregator_GetWalletResp,
responseDeserialize: deserialize_aggregator_GetWalletResp,
},
listWallets: {
path: '/aggregator.Aggregator/ListWallets',
Expand Down
Loading

0 comments on commit a3c4ce7

Please sign in to comment.