Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing result to subsequent steps, implement JavaScript runner #70

Merged
merged 21 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {
avsWriter, err := chainio.BuildAvsWriterFromConfig(c)
if err != nil {
c.Logger.Errorf("Cannot create avsWriter", "err", err)
return nil, err
// TODO: Upgrade EigenSDK to use the new Slash Manager
// EigenLayer has update the contract and we cannot fetch the slasher anymore, we should upgrade the EigenSDK, right now we don't use it so it's ok to ignore this error
//return nil, err
}

go func() {
Expand All @@ -122,8 +124,9 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {
clients, err := clients.BuildAll(chainioConfig, c.EcdsaPrivateKey, c.Logger)
if err != nil {
c.Logger.Errorf("Cannot create sdk clients", "err", err)
panic(err)
//return nil, err
// TODO: Upgrade EigenSDK to use the new Slash Manager
// EigenLayer has update the contract and we cannot fetch the slasher anymore, we should upgrade the EigenSDK, right now we don't use it so it's ok to ignore this error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, this was the culprit of the slashing error, makes sense. 👌

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d add a // TODO: to the beginning, so we can search all TODOs in codebase later.

//panic(err)
}
c.Logger.Info("create avsrrader and client", "avsReader", avsReader, "clients", clients)
}()
Expand Down
10 changes: 9 additions & 1 deletion core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ func (n *Engine) StreamCheckToOperator(payload *avsproto.SyncMessagesReq, srv av
continue
}

if !n.CanStreamCheck(address) {
continue
}

for _, task := range n.tasks {
if _, ok := n.trackSyncedTasks[address].TaskID[task.Id]; ok {
continue
Expand Down Expand Up @@ -563,7 +567,6 @@ func (n *Engine) TriggerTask(user *model.User, payload *avsproto.UserTriggerTask
if payload.IsBlocking {
// Run the task inline, by pass the queue system
executor := NewExecutor(n.db, n.logger)
fmt.Println("metadata", payload.TriggerMetadata)
execution, err := executor.RunTask(task, payload.TriggerMetadata)
if err == nil {
return &avsproto.UserTriggerTaskResp{
Expand Down Expand Up @@ -798,3 +801,8 @@ func (n *Engine) NewSeqID() (string, error) {
}
return strconv.FormatInt(int64(num), 10), nil
}

func (n *Engine) CanStreamCheck(address string) bool {
// Only enable for our own operator first, once it's stable we will roll out to all
return address == "0x997e5d40a32c44a3d93e59fc55c4fd20b7d2d49d" || address == "0xc6b87cc9e85b07365b6abefff061f237f7cf7dc3"
}
4 changes: 4 additions & 0 deletions core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error {

func (x *TaskExecutor) RunTask(task *model.Task, triggerMetadata *avsproto.TriggerMetadata) (*avsproto.Execution, error) {
vm, err := NewVMWithData(task.Id, triggerMetadata, task.Nodes, task.Edges)
if err != nil {
return nil, err
}

vm.WithLogger(x.logger)
initialTaskStatus := task.Status

if err != nil {
Expand Down
96 changes: 96 additions & 0 deletions core/taskengine/macros/exp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common"
ethmath "github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/go-resty/resty/v2"

"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
Expand All @@ -19,6 +20,9 @@ var (
rpcConn *ethclient.Client
)

type Builtin struct {
}

func SetRpc(rpcURL string) {
if conn, err := ethclient.Dial(rpcURL); err == nil {
rpcConn = conn
Expand Down Expand Up @@ -125,8 +129,39 @@ func ToBigInt(val string) *big.Int {
return b
}

func (bi *Builtin) ToBigInt(val string) *big.Int {
return ToBigInt(val)
}

func (bi *Builtin) ChainlinkLatestRoundData(tokenPair string) *big.Int {
return chainlinkLatestRoundData(tokenPair)
}
func (bi *Builtin) ChainlinkLatestAnswer(tokenPair string) *big.Int {
return chainlinkLatestAnswer(tokenPair)
}

func (bi *Builtin) BigCmp(a *big.Int, b *big.Int) (r int) {
return BigCmp(a, b)
}

func (bi *Builtin) BigGt(a *big.Int, b *big.Int) bool {
return BigGt(a, b)
}

func (bi *Builtin) BigLt(a *big.Int, b *big.Int) bool {
return BigLt(a, b)
}

func (bi *Builtin) ParseUnit(val string, decimal uint) *big.Int {
return ParseUnit(val, decimal)
}

var (
exprEnv = map[string]any{
// bind and simular JS fetch api
"fetch": Fetch,

// macro to do IO from JS
"readContractData": readContractData,

"priceChainlink": chainlinkLatestAnswer,
Expand All @@ -141,6 +176,67 @@ var (
}
)

// FetchResponse mimics the JS fetch Response object
type FetchResponse struct {
Status int
StatusText string
Body string
Headers map[string][]string
}

// FetchOptions allows specifying method, headers, and body
type FetchOptions struct {
Method string
Headers map[string]string
Body interface{}
}

// Fetch mimics the JS fetch function using Resty
func Fetch(url string) *FetchResponse {
options := FetchOptions{}

client := resty.New()
// Create request
request := client.R()

// Set headers
if options.Headers != nil {
request.SetHeaders(options.Headers)
}

// Set body
if options.Body != nil {
request.SetBody(options.Body)
}

// Send request based on method
var resp *resty.Response
var err error
switch options.Method {
case "POST":
resp, err = request.Post(url)
case "PUT":
resp, err = request.Put(url)
case "DELETE":
resp, err = request.Delete(url)
default:
resp, err = request.Get(url) // Default to GET
}

// Handle errors
if err != nil {
return nil
}

// Build FetchResponse
return &FetchResponse{
Status: resp.StatusCode(),
StatusText: resp.Status(),
Body: string(resp.Body()),
Headers: resp.Header(),
}
}

func GetEnvs(extra map[string]any) map[string]interface{} {
envs := map[string]any{}

Expand Down
34 changes: 10 additions & 24 deletions core/taskengine/trigger/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (

"github.com/AvaProtocol/ap-avs/core/taskengine/macros"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"github.com/dop251/goja"
"github.com/ginkgoch/godash/v2"

"github.com/ethereum/go-ethereum"
Expand All @@ -26,7 +25,7 @@ type EventMark struct {
}

type Check struct {
Program *vm.Program
Program string
TaskMetadata *avsproto.SyncMessagesResp_TaskMetadata
}

Expand Down Expand Up @@ -71,24 +70,7 @@ func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark]) *E

// TODO: track remainExecution and expriedAt before merge
func (t *EventTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) error {
// Dummy value to get type
envs := macros.GetEnvs(map[string]interface{}{
"trigger1": map[string]interface{}{
"data": map[string]interface{}{
"address": "dummy",
"topics": godash.Map([]common.Hash{}, func(topic common.Hash) string {
return "0x"
}),
"data": "0x",
"tx_hash": "dummy",
},
},
})
program, err := expr.Compile(check.GetTrigger().GetEvent().GetExpression(), expr.Env(envs), expr.AsBool())
if err != nil {
return err
}

program := check.GetTrigger().GetEvent().GetExpression()
t.checks.Store(check.TaskId, &Check{
Program: program,
TaskMetadata: check,
Expand Down Expand Up @@ -167,7 +149,8 @@ func (evt *EventTrigger) Run(ctx context.Context) error {
return err
}

func (evt *EventTrigger) Evaluate(event *types.Log, program *vm.Program) (bool, error) {
func (evt *EventTrigger) Evaluate(event *types.Log, program string) (bool, error) {
jsvm := goja.New()
envs := macros.GetEnvs(map[string]interface{}{
"trigger1": map[string]interface{}{
"data": map[string]interface{}{
Expand All @@ -180,12 +163,15 @@ func (evt *EventTrigger) Evaluate(event *types.Log, program *vm.Program) (bool,
},
},
})
for k, v := range envs {
jsvm.Set(k, v)
}

result, err := expr.Run(program, envs)
result, err := jsvm.RunString(program)

if err != nil {
return false, err
}

return result.(bool), err
return result.Export().(bool), err
}
67 changes: 43 additions & 24 deletions core/taskengine/trigger/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (

"github.com/AvaProtocol/ap-avs/core/taskengine/macros"
"github.com/AvaProtocol/ap-avs/core/testutil"
"github.com/expr-lang/expr"
)

func TestChainlinkLatestAnswer(t *testing.T) {
func TestTriggerExpression(t *testing.T) {
event, err := testutil.GetEventForTx("0x8f7c1f698f03d6d32c996b679ea1ebad45bbcdd9aa95d250dda74763cc0f508d", 82)

if err != nil {
Expand All @@ -20,37 +19,57 @@ func TestChainlinkLatestAnswer(t *testing.T) {
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))

envs := macros.GetEnvs(map[string]interface{}{
"trigger1": map[string]interface{}{
"data": map[string]interface{}{
"topics": []string{
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0xabcdef",
"0xc114fb059434563dc65ac8d57e7976e3eac534f4",
},
},
},
})

program, err := expr.Compile(`
trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && trigger1.data.topics[2] == "0xc114fb059434563dc65ac8d57e7976e3eac534f4"
`, expr.Env(envs), expr.AsBool())

if err != nil {
panic(err)
}
program := `trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && trigger1.data.topics[2] == "0xc114fb059434563dc65ac8d57e7976e3eac534f4"`

result, err := eventTrigger.Evaluate(event, program)
if !result {
t.Errorf("expect expression to be match, but got false: error: %v", err)
}

program, err = expr.Compile(`
(trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && trigger1.data.topics[2] == "abc")
`)
program = `trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && trigger1.data.topics[2] == "abc"`

result, err = eventTrigger.Evaluate(event, program)
if result {
t.Errorf("expect expression to be not match, but got match: error: %v", err)
}

event, err = testutil.GetEventForTx("0x8f7c1f698f03d6d32c996b679ea1ebad45bbcdd9aa95d250dda74763cc0f508d", 81)
program = `trigger1.data.address == "0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789" && trigger1.data.topics[0] == "0xbb47ee3e183a558b1a2ff0874b079f3fc5478b7454eacf2bfc5af2ff5878f972"`
result, err = eventTrigger.Evaluate(event, program)
if result {
t.Errorf("expect expression to be not match, but got match: error: %v", err)
}
}

func TestTriggerWithContractReadBindingInExpression(t *testing.T) {
// This event is transfering usdc
event, err := testutil.GetEventForTx("0x4bb728dfbe58d7c641c02a214cac6156a0d6a0fe648cb27a7de229a3160e91b1", 145)

macros.SetRpc(testutil.GetTestRPCURL())
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))

// USDC pair from chainlink, usually USDC price is ~99cent but never approach $1
// for an unknow reason the decimal is 8 instead of 6
program := `trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && bigGt(chainlinkPrice("0xA2F78ab2355fe2f984D808B5CeE7FD0A93D5270E"), toBigInt("1000000000"))`

result, err := eventTrigger.Evaluate(event, program)
if err != nil {
t.Errorf("expected no error when evaluate program but got error: %s", err)
}
if result {
t.Errorf("expect expression to be false, but got true: error: %v", err)
}

program = `trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && bigGt(chainlinkPrice("0xA2F78ab2355fe2f984D808B5CeE7FD0A93D5270E"), toBigInt("95000000"))`

result, err = eventTrigger.Evaluate(event, program)
if err != nil {
t.Errorf("expected no error when evaluate program but got error: %s", err)
}
if !result {
t.Errorf("expect expression to be false, but got true: error: %v", err)
}
}
Loading
Loading