diff --git a/.changeset/eight-tips-bathe.md b/.changeset/eight-tips-bathe.md new file mode 100644 index 00000000000..7c540971a9a --- /dev/null +++ b/.changeset/eight-tips-bathe.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#internal Optimize beholder validator in system tests (part 2) diff --git a/.changeset/slick-drinks-like.md b/.changeset/slick-drinks-like.md new file mode 100644 index 00000000000..a96cba901e4 --- /dev/null +++ b/.changeset/slick-drinks-like.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#internal Update PoR workflow to use chainlink BalanceReader bindings diff --git a/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.mod b/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.mod index eb5a4a44fac..edb58415963 100644 --- a/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.mod +++ b/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.mod @@ -8,6 +8,7 @@ require ( github.com/ethereum/go-ethereum v1.16.2 github.com/smartcontractkit/chain-selectors v1.0.67 github.com/smartcontractkit/chainlink-common v0.9.6 + github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251003122604-772b72191274 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250918131840-564fe2776a35 github.com/smartcontractkit/cre-sdk-go v0.8.0 github.com/smartcontractkit/cre-sdk-go/capabilities/blockchain/evm v0.8.0 @@ -17,13 +18,26 @@ require ( ) require ( + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/StackExchange/wmi v1.2.1 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.20.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/consensys/gnark-crypto v0.18.0 // indirect + github.com/crate-crypto/go-eth-kzg v1.3.0 // indirect + github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/deckarep/golang-set/v2 v2.6.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect + github.com/ethereum/c-kzg-4844/v2 v2.1.0 // indirect + github.com/ethereum/go-verkle v0.2.2 // indirect + github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/holiman/uint256 v1.3.2 // indirect github.com/invopop/jsonschema v0.13.0 // indirect github.com/mailru/easyjson v0.9.0 // indirect @@ -36,11 +50,16 @@ require ( github.com/prometheus/procfs v0.16.1 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect + github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/smartcontractkit/libocr v0.0.0-20250707144819-babe0ec4e358 // indirect github.com/stretchr/testify v1.11.1 // indirect + github.com/supranational/blst v0.3.14 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect golang.org/x/crypto v0.40.0 // indirect + golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.34.0 // indirect google.golang.org/protobuf v1.36.7 // indirect ) diff --git a/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.sum b/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.sum index d72619fd38a..75fb5647595 100644 --- a/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.sum +++ b/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/go.sum @@ -1,41 +1,162 @@ +github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= +github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= +github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= +github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= +github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.20.0 h1:2F+rfL86jE2d/bmw7OhqUg2Sj/1rURkBn3MdfoPyRVU= +github.com/bits-and-blooms/bitset v1.20.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= +github.com/cespare/cp v1.1.1 h1:nCb6ZLdB7NRaqsm91JtQTAme2SKJzXVsdPIPkyJr1MU= +github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= +github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= +github.com/cockroachdb/pebble v1.1.5 h1:5AAWCBWbat0uE0blr8qzufZP5tBjkRyy/jWe1QWLnvw= +github.com/cockroachdb/pebble v1.1.5/go.mod h1:17wO9el1YEigxkP/YtV8NtCivQDgoCyBg5c4VR/eOWo= +github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= +github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/consensys/gnark-crypto v0.18.0 h1:vIye/FqI50VeAr0B3dx+YjeIvmc3LWz4yEfbWBpTUf0= +github.com/consensys/gnark-crypto v0.18.0/go.mod h1:L3mXGFTe1ZN+RSJ+CLjUt9x7PNdx8ubaYfDROyp2Z8c= +github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= +github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/crate-crypto/go-eth-kzg v1.3.0 h1:05GrhASN9kDAidaFJOda6A4BEvgvuXbazXg/0E3OOdI= +github.com/crate-crypto/go-eth-kzg v1.3.0/go.mod h1:J9/u5sWfznSObptgfa92Jq8rTswn6ahQWEuiLHOjCUI= +github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a h1:W8mUrRp6NOVl3J+MYp5kPMoUZPp7aOYHtaua31lwRHg= +github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a/go.mod h1:sTwzHBvIzm2RfVCGNEBZgRyjwK40bVoun3ZnGOCafNM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/decred/dcrd/crypto/blake256 v1.1.0 h1:zPMNGQCm0g4QTY27fOCorQW7EryeQ/U0x++OzVrdms8= github.com/decred/dcrd/crypto/blake256 v1.1.0/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40= +github.com/deepmap/oapi-codegen v1.8.2 h1:SegyeYGcdi0jLLrpbCMoJxnUUn8GBXHsvr4rbzjuhfU= +github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw= github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= +github.com/emicklei/dot v1.6.2 h1:08GN+DD79cy/tzN6uLCT84+2Wk9u+wvqP+Hkx/dIR8A= +github.com/emicklei/dot v1.6.2/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= +github.com/ethereum/c-kzg-4844/v2 v2.1.0 h1:gQropX9YFBhl3g4HYhwE70zq3IHFRgbbNPw0Shwzf5w= +github.com/ethereum/c-kzg-4844/v2 v2.1.0/go.mod h1:TC48kOKjJKPbN7C++qIgt0TJzZ70QznYR7Ob+WXl57E= github.com/ethereum/go-ethereum v1.16.2 h1:VDHqj86DaQiMpnMgc7l0rwZTg0FRmlz74yupSG5SnzI= github.com/ethereum/go-ethereum v1.16.2/go.mod h1:X5CIOyo8SuK1Q5GnaEizQVLHT/DfsiGWuNeVdQcEMNA= +github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= +github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= +github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY= +github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 h1:f6D9Hr8xV8uYKlyuj8XIruxlh9WjVjdh1gIicAS7ays= +github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= +github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= +github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= +github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E= +github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= +github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/graph-gophers/graphql-go v1.3.0 h1:Eb9x/q6MFpCLz7jBCiP/WTxjSDrYLR1QY41SORZyNJ0= +github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= +github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= +github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= +github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4= +github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc= +github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= +github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.3.2 h1:a9EgMPSC1AAaj1SZL5zIQD3WbwTuHrMGOerLjGmM/TA= github.com/holiman/uint256 v1.3.2/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= +github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= +github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= +github.com/influxdata/influxdb-client-go/v2 v2.4.0 h1:HGBfZYStlx3Kqvsv1h2pJixbCl/jhnFtxpKFAv9Tu5k= +github.com/influxdata/influxdb-client-go/v2 v2.4.0/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8= +github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs= +github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM= +github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= +github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= +github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leanovate/gopter v0.2.11 h1:vRjThO1EKPb/1NsDXuDrzldR28RLkBflWYcU9CvzWu4= +github.com/leanovate/gopter v0.2.11/go.mod h1:aK3tzZP/C+p1m3SPRE4SYZFGP7jjkuSI4f7Xvpt0S9c= github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= +github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= +github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= +github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A= +github.com/mitchellh/pointerstructure v1.2.0/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8ohIXc3tViBH44KcwB2g4= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM= +github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0= +github.com/pion/dtls/v2 v2.2.7 h1:cSUBsETxepsCSFSxC3mc/aDo14qQLMSL+O6IjG28yV8= +github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/stun/v2 v2.0.0 h1:A5+wXKLAypxQri59+tmQKVs7+l6mMM+3d+eER9ifRU0= +github.com/pion/stun/v2 v2.0.0/go.mod h1:22qRSh08fSEttYUmJZGlriq9+03jtVmXNODgLccj8GQ= +github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c= +github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= +github.com/pion/transport/v3 v3.0.1 h1:gDTlPJwROfSfz6QfSi0ZmeCSkFcnWWiiR9ES0ouANiM= +github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= @@ -46,16 +167,26 @@ github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2 github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= +github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= +github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU= +github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.67 h1:gxTqP/JC40KDe3DE1SIsIKSTKTZEPyEU1YufO1admnw= github.com/smartcontractkit/chain-selectors v1.0.67/go.mod h1:xsKM0aN3YGcQKTPRPDDtPx2l4mlTN1Djmg0VVXV40b8= github.com/smartcontractkit/chainlink-common v0.9.6 h1:xE6x3kujV0NC7AMo/LdCszVTc2ZR9bQWOlZvYiYvuoY= github.com/smartcontractkit/chainlink-common v0.9.6/go.mod h1:1r3aM96KHAESfnayJ3BTHCkP1qJS1BEG1r4czeoaXlA= +github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251003122604-772b72191274 h1:7vcazdecB5LjHJzhFoj4BfpkhFqv+fVPsNeuBSQVaSA= +github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251003122604-772b72191274/go.mod h1:oyfOm4k0uqmgZIfxk1elI/59B02shbbJQiiUdPdbMgI= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250918131840-564fe2776a35 h1:hhKdzgNZT+TnohlmJODtaxlSk+jyEO79YNe8zLFtp78= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250918131840-564fe2776a35/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q= github.com/smartcontractkit/cre-sdk-go v0.8.0 h1:QHYnz6MgBGFRaTOrP9Nx4HSHUpxYWgRzXGdsAucKAiI= @@ -70,20 +201,43 @@ github.com/smartcontractkit/libocr v0.0.0-20250707144819-babe0ec4e358 h1:+NVzR5L github.com/smartcontractkit/libocr v0.0.0-20250707144819-babe0ec4e358/go.mod h1:Acy3BTBxou83ooMESLO90s8PKSu7RvLCzwSTbxxfOK0= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/supranational/blst v0.3.14 h1:xNMoHRJOTwMn63ip6qoWJ2Ymgvj7E2b9jY2FAwY+qRo= +github.com/supranational/blst v0.3.14/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w= +github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= +golang.org/x/exp v0.0.0-20250711185948-6ae5c78190dc h1:TS73t7x3KarrNd5qAipmspBDS1rkMcgVG/fS1aRb4Rc= +golang.org/x/exp v0.0.0-20250711185948-6ae5c78190dc/go.mod h1:A+z0yzpGtvnG90cToK5n2tu8UJVP2XUATh+r+sfOOOc= golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 h1:fc6jSaCT0vBduLYZHYrBBNY4dsWuvgyff9noRNDdBeE= google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= @@ -93,6 +247,10 @@ google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/main.go b/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/main.go index 1fdeabe4ee4..135c305d03e 100644 --- a/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/main.go +++ b/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/main.go @@ -8,7 +8,6 @@ import ( "fmt" "log/slog" "math/big" - "strings" "time" "gopkg.in/yaml.v3" @@ -27,31 +26,10 @@ import ( "github.com/smartcontractkit/cre-sdk-go/cre/wasm" "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" + "github.com/smartcontractkit/chainlink-evm/gethwrappers/keystone/generated/balance_reader" types "github.com/smartcontractkit/chainlink/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based/types" ) -const balanceReaderABIJson = `[ - { - "inputs":[ - { - "internalType":"address[]", - "name":"addresses", - "type":"address[]" - } - ], - "name":"getNativeBalances", - "outputs":[ - { - "internalType":"uint256[]", - "name":"", - "type":"uint256[]" - } - ], - "stateMutability":"view", - "type":"function" - } -]` - func RunProofOfReservesWorkflow(config types.WorkflowConfig, logger *slog.Logger, secretsProvider cre.SecretsProvider) (cre.Workflow[types.WorkflowConfig], error) { return cre.Workflow[types.WorkflowConfig]{ cre.Handler( @@ -90,7 +68,7 @@ func onTrigger(config types.WorkflowConfig, runtime cre.Runtime, payload *cron.P runtime.Logger().With().Info(fmt.Sprintf("[logger] Got on-chain balance with BalanceAt() for address %s: %s", addressToRead1, balanceAtResult.String())) // get balance with CallContract - readBalancesParsedABI, err := getReadBalancesContractABI(runtime, balanceReaderABIJson) + readBalancesParsedABI, err := getReadBalancesContractABI(runtime) if err != nil { runtime.Logger().Error(fmt.Sprintf("failed to get ReadBalances ABI: %v", err)) return "", fmt.Errorf("failed to get ReadBalances ABI: %w", err) @@ -179,17 +157,18 @@ func onTrigger(config types.WorkflowConfig, runtime cre.Runtime, payload *cron.P return message, nil } -func getReadBalancesContractABI(runtime cre.Runtime, balanceReaderABI string) (abi.ABI, error) { - parsedABI, err := abi.JSON(strings.NewReader(balanceReaderABI)) - if err != nil { - runtime.Logger().Error(fmt.Sprintf("failed to parse ABI: %v", err)) - return abi.ABI{}, fmt.Errorf("failed to parse ABI: %w", err) +func getReadBalancesContractABI(runtime cre.Runtime) (*abi.ABI, error) { + runtime.Logger().Info("getting Balance Reader contract ABI") + readBalancesABI, abiErr := balance_reader.BalanceReaderMetaData.GetAbi() + if abiErr != nil { + runtime.Logger().Error("failed to get Balance Reader contract ABI", "error", abiErr) + return nil, fmt.Errorf("failed to get Balance Reader contract ABI: %w", abiErr) } - runtime.Logger().With().Info(fmt.Sprintln("Parsed ABI successfully")) - return parsedABI, nil + runtime.Logger().Info("successfully got Balance Reader contract ABI") + return readBalancesABI, nil } -func readBalancesFromContract(addresses []common.Address, readBalancesABI abi.ABI, evmClient evm.Client, runtime cre.Runtime, config types.WorkflowConfig) (*evm.CallContractReply, error) { +func readBalancesFromContract(addresses []common.Address, readBalancesABI *abi.ABI, evmClient evm.Client, runtime cre.Runtime, config types.WorkflowConfig) (*evm.CallContractReply, error) { methodName := "getNativeBalances" packedData, err := readBalancesABI.Pack(methodName, addresses) if err != nil { diff --git a/go.md b/go.md index 948a9fe38e2..a09218ced71 100644 --- a/go.md +++ b/go.md @@ -354,6 +354,7 @@ flowchart LR chainlink/core/scripts/cre/environment/examples/workflows/v2/cron --> cre-sdk-go/capabilities/scheduler/cron click chainlink/core/scripts/cre/environment/examples/workflows/v2/cron href "https://github.com/smartcontractkit/chainlink" chainlink/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based --> chainlink-common + chainlink/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based --> chainlink-evm/gethwrappers chainlink/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based --> cre-sdk-go/capabilities/blockchain/evm chainlink/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based --> cre-sdk-go/capabilities/networking/http chainlink/core/scripts/cre/environment/examples/workflows/v2/proof-of-reserve/cron-based --> cre-sdk-go/capabilities/scheduler/cron diff --git a/system-tests/tests/go.mod b/system-tests/tests/go.mod index 8857fc84db1..151fc109388 100644 --- a/system-tests/tests/go.mod +++ b/system-tests/tests/go.mod @@ -31,6 +31,7 @@ replace github.com/smartcontractkit/chainlink/system-tests/tests/regression/cre/ require ( github.com/Masterminds/semver/v3 v3.4.0 + github.com/avast/retry-go/v4 v4.6.1 github.com/confluentinc/confluent-kafka-go v1.9.2 github.com/ethereum/go-ethereum v1.16.3 github.com/fbsobreira/gotron-sdk v0.0.0-20250403083053-2943ce8c759b @@ -124,7 +125,6 @@ require ( github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/atombender/go-jsonschema v0.16.1-0.20240916205339-a74cd4e2851c // indirect github.com/avast/retry-go v3.0.0+incompatible // indirect - github.com/avast/retry-go/v4 v4.6.1 // indirect github.com/awalterschulze/gographviz v2.0.3+incompatible // indirect github.com/aws/aws-sdk-go v1.55.7 // indirect github.com/aws/aws-sdk-go-v2 v1.38.1 // indirect diff --git a/system-tests/tests/regression/cre/cre_regression_suite_test.go b/system-tests/tests/regression/cre/cre_regression_suite_test.go index 1ff2b169775..0fce3048419 100644 --- a/system-tests/tests/regression/cre/cre_regression_suite_test.go +++ b/system-tests/tests/regression/cre/cre_regression_suite_test.go @@ -22,8 +22,8 @@ Inside `core/scripts/cre/environment` directory 2. Identify the appropriate topology that you want to test 3. Stop and clear any existing environment: `go run . env stop -a` 4. Run: `CTF_CONFIGS= go run . env start && ./bin/ctf obs up` to start env + observability - 5. Optionally run blockscout `./bin/ctf bs up` - 6. Execute the tests in `system-tests/tests/regression/cre`: `go test -timeout 15m -run "^Test_CRE_V2"`. + 5. Optionally run the Blockscout (chain explorer) `./bin/ctf bs up` + 6. Execute the tests in `system-tests/tests/regression/cre`: `go test -timeout 15m -run "^Test_CRE_V2"` */ func Test_CRE_V2_Consensus_Regression(t *testing.T) { // a template for Consensus negative tests names to avoid duplication diff --git a/system-tests/tests/regression/cre/v2_evm_regression_test.go b/system-tests/tests/regression/cre/v2_evm_regression_test.go index 75320f6e35e..dd62adef9eb 100644 --- a/system-tests/tests/regression/cre/v2_evm_regression_test.go +++ b/system-tests/tests/regression/cre/v2_evm_regression_test.go @@ -121,7 +121,7 @@ var evmNegativeTestsFilterLogsWithInvalidAddress = []evmNegativeTest{ } var evmNegativeTestsFilterLogsWithInvalidFromBlock = []evmNegativeTest{ - // FilterLogs - invalid TromBlock/ToBlock values + // FilterLogs - invalid FromBlock/ToBlock values // Distance between blocks should not be more than 100 {"negative number", "-1", filterLogsInvalidFromBlock, "block number -1 is not supported"}, {"zero", "0", filterLogsInvalidFromBlock, "block number 0 is not supported"}, @@ -214,7 +214,7 @@ func EVMReadFailsTest(t *testing.T, testEnv *ttypes.TestEnvironment, evmNegative t_helpers.CompileAndDeployWorkflow(t, testEnv, testLogger, workflowName, &workflowConfig, workflowFileLocation) expectedError := evmNegativeTest.expectedError - timeout := 90 * time.Second + timeout := 2 * time.Minute err := t_helpers.AssertBeholderMessage(listenerCtx, t, expectedError, testLogger, messageChan, kafkaErrChan, timeout) require.NoError(t, err, "EVM Read Fail test failed") testLogger.Info().Msg("EVM Read Fail test successfully completed") diff --git a/system-tests/tests/test-helpers/beholder_provider.go b/system-tests/tests/test-helpers/beholder_provider.go index 5bf6ba8c524..fdef2e30508 100644 --- a/system-tests/tests/test-helpers/beholder_provider.go +++ b/system-tests/tests/test-helpers/beholder_provider.go @@ -3,37 +3,46 @@ package helpers import ( "context" "fmt" + "math/rand" "os" "os/exec" + "strings" "time" + "github.com/avast/retry-go/v4" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/pkg/errors" "github.com/rs/zerolog" "google.golang.org/protobuf/proto" + commonevents "github.com/smartcontractkit/chainlink-protos/workflows/go/common" + workflowevents "github.com/smartcontractkit/chainlink-protos/workflows/go/events" "github.com/smartcontractkit/chainlink-testing-framework/framework" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/environment/config" ) const ( // Channel buffer sizes - messageChannelBufferSize = 40 - errorChannelBufferSize = 1 - channelFullRetryTimeout = 100 * time.Millisecond - - // Kafka configuration - kafkaSessionTimeoutMs = 10000 - kafkaReadTimeoutMs = 0 // Non-blocking read - - // Timing configuration - messageReadInterval = 50 * time.Millisecond - - // CloudEvents protobuf offset + defaultMessageBufferSize = 200 + defaultErrorBufferSize = 100 + + // Kafka timings + beholderStartTimeout = 2 * time.Minute // timeout for starting Beholder stack + maxConsumerConnectivityTimeout = 60 * time.Second // max timeout before Kafka consumer reconnection + kafkaSessionTimeoutMs = 20000 // keep it high enough to let Beholder messages incoming + messageReadInterval = 50 * time.Millisecond + + // CloudEvents binary format + // protobufOffset represents the number of bytes to skip in CloudEvents binary format messages + // before the protobuf payload begins. This is a CloudEvents specification detail where the + // first 6 bytes contain CloudEvents metadata in binary content mode. protobufOffset = 6 - // Expected CloudEvents header + // CloudEvents header for message type routing ceTypeHeader = "ce_type" + + // Error messages + errBeholderOrConfigNil = "beholder or config is nil" ) type Beholder struct { @@ -41,276 +50,748 @@ type Beholder struct { lggr zerolog.Logger } +// All fields are optional; sensible defaults are applied when nil or empty. +type ConsumerOptions struct { + GroupID string // The consumer group to ensure independent message consumption. Defaults to "beholder-consumer". + Topic string // If empty, uses the first topic from config. + MessageBuffer int + ErrorBuffer int + CommitSync bool // Default: true.Enables synchronous commits, safer (guaranteed commit). Async is less safe (potential reprocessing). + IsolationReadCommitted bool // Ensures only committed messages are read. Defaults to "false". +} + +// NewBeholder creates a Beholder instance, even if it's not already running. func NewBeholder(lggr zerolog.Logger, relativePathToRepoRoot, environmentDir string) (*Beholder, error) { - err := startBeholderStackIfIsNotRunning(relativePathToRepoRoot, environmentDir) - if err != nil { - return nil, errors.Wrap(err, "failed to ensure beholder stack is running") + if err := startBeholderIfNotRunning(relativePathToRepoRoot, environmentDir); err != nil { + return nil, errors.Wrap(err, "Beholder failed to start") } chipConfig, err := loadBeholderStackCache(relativePathToRepoRoot) if err != nil { return nil, errors.Wrap(err, "failed to load beholder stack cache") } + return &Beholder{cfg: chipConfig, lggr: lggr}, nil } +// startBeholderIfNotRunning starts the Beholder stack if it's not already running. +func startBeholderIfNotRunning(relativePathToRepoRoot, environmentDir string) error { + if config.ChipIngressStateFileExists(relativePathToRepoRoot) { + framework.L.Info().Msg("No need to start Beholder - it is already running") + return nil + } + + framework.L.Info().Dur("timeout", beholderStartTimeout).Msg("Beholder state file not found. Starting Beholder...") + ctx, cancel := context.WithTimeout(context.Background(), beholderStartTimeout) + defer cancel() + + cmd := exec.CommandContext(ctx, "go", "run", ".", "env", "beholder", "start") + cmd.Dir = environmentDir + cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr + + if err := cmd.Run(); err != nil { + if ctx.Err() == context.DeadlineExceeded { + return errors.Wrap(err, "timeout starting Beholder") + } + return errors.Wrap(err, "failed to start Beholder") + } + + framework.L.Info().Msg("Beholder started successfully") + return nil +} + +// loadBeholderStackCache loads and validates the Beholder configuration. func loadBeholderStackCache(relativePathToRepoRoot string) (*config.ChipIngressConfig, error) { c := &config.ChipIngressConfig{} - if loadErr := c.Load(config.MustChipIngressStateFileAbsPath(relativePathToRepoRoot)); loadErr != nil { - return nil, errors.Wrap(loadErr, "failed to load beholder stack cache") + if err := c.Load(config.MustChipIngressStateFileAbsPath(relativePathToRepoRoot)); err != nil { + return nil, errors.Wrap(err, "load cache") } + if c.ChipIngress.Output.RedPanda.KafkaExternalURL == "" { - return nil, errors.New("kafka external url is not set in the cache") + return nil, errors.New("kafka external url not set in cache") } if len(c.Kafka.Topics) == 0 { - return nil, errors.New("kafka topics are not set in the cache") + return nil, errors.New("kafka topics not set in cache") } return c, nil } -func startBeholderStackIfIsNotRunning(relativePathToRepoRoot, environmentDir string) error { - if !config.ChipIngressStateFileExists(relativePathToRepoRoot) { - framework.L.Info().Str("state file", config.MustChipIngressStateFileAbsPath(relativePathToRepoRoot)).Msg("Beholder state file was not found. Starting Beholder...") - cmd := exec.Command("go", "run", ".", "env", "beholder", "start") - cmd.Dir = environmentDir - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmdErr := cmd.Run() - if cmdErr != nil { - return errors.Wrap(cmdErr, "failed to start Beholder") - } - } - framework.L.Info().Msg("Beholder is running.") - return nil -} +/* +SubscribeToBeholderMessages starts a Kafka consumer and returns message/error channels. -func (b *Beholder) SubscribeToBeholderMessages( - ctx context.Context, - messageTypes map[string]func() proto.Message, +1. Tests Kafka broker connectivity before starting the listener (FATAL - fails fast if not accessible) +2. Validates Beholder heartbeat to ensure it's alive and healthy (FATAL - fails fast if not detected) +3. Validates topic existence and accessibility during subscription +4. Verifies topic metadata and partition availability +5. Coordinates consumer readiness to prevent race conditions with producers + +Parameters: + - ctx: Context for lifecycle management + - messageTypes: Map of CloudEvents ce_type to protobuf factory functions + +Returns: + - Message channel (closed when consumer stops) + - Error channel (buffered, reports fatal errors) +*/ +func (b *Beholder) SubscribeToBeholderMessages(ctx context.Context, messageTypes map[string]func() proto.Message, ) (<-chan proto.Message, <-chan error) { - kafkaErrChan := make(chan error, errorChannelBufferSize) - messageChan := make(chan proto.Message, messageChannelBufferSize) - readyChan := make(chan bool, 1) - - // Start listening for messages in the background - go func() { - // Recover from panics - defer func() { - if r := recover(); r != nil { - b.lggr.Error().Interface("panic", r).Msg("Panic in Kafka listener goroutine") - select { - case kafkaErrChan <- errors.Errorf("panic in listener: %v", r): - default: - } - } - }() + // If the Beholder is not initialized, return an error channel + if b == nil || b.cfg == nil { + errCh := make(chan error, 1) + errCh <- errors.New(errBeholderOrConfigNil) + close(errCh) + return nil, errCh + } - kafkaURL := b.cfg.ChipIngress.Output.RedPanda.KafkaExternalURL - topic := b.cfg.Kafka.Topics[0] - listenForKafkaMessages(ctx, b.lggr, kafkaURL, topic, messageTypes, messageChan, kafkaErrChan, readyChan) - }() + // Create options internally with unique group ID (to enable tests parallelization) + opts := &ConsumerOptions{ + GroupID: fmt.Sprintf("beholder-consumer-%d", time.Now().UnixNano()), + Topic: b.cfg.Kafka.Topics[0], + MessageBuffer: defaultMessageBufferSize, + ErrorBuffer: defaultErrorBufferSize, + CommitSync: true, // guaranteed Kafka acknowledgment + IsolationReadCommitted: false, + } + + msgCh := make(chan proto.Message, opts.MessageBuffer) + errCh := make(chan error, opts.ErrorBuffer) + readyCh := make(chan struct{}, 1) + + // Pre-flight validation: Kafka connectivity (fatal - fail early if Kafka is not accessible) + b.lggr.Debug().Msg("Performing Kafka connectivity validation...") + if err := b.validateConsumerConnectivity(ctx); err != nil { + b.lggr.Error().Err(err).Msg("Kafka connectivity validation failed") + errCh <- errors.Wrap(err, "kafka connectivity validation failed") + close(errCh) + close(msgCh) + return msgCh, errCh + } - // Wait for consumer to be ready before returning channels - // This ensures proper coordination between consumer readiness and workflow execution + // Pre-flight validation: Beholder heartbeat (fatal - fail early if Beholder is not healthy) + b.lggr.Debug().Msg("Performing Beholder heartbeat validation...") + if err := b.validateBeholderHeartbeat(ctx); err != nil { + b.lggr.Error().Err(err).Msg("Beholder heartbeat validation failed") + errCh <- errors.Wrap(err, "beholder heartbeat validation failed") + close(errCh) + close(msgCh) + return msgCh, errCh + } + + // Start consumer in background and wait for consumer readiness to coordinate with producers/workflows + go b.consume(ctx, messageTypes, opts, msgCh, errCh, readyCh) select { - case <-readyChan: - b.lggr.Info().Msg("Kafka consumer is ready and subscribed - safe to start workflow execution") - case <-time.After(15 * time.Second): // Increased timeout for CI environments + case <-readyCh: + b.lggr.Info().Msg("Kafka consumer ready and subscribed - safe to start workflow execution") + case <-time.After(maxConsumerConnectivityTimeout): // Increased timeout for CI environments select { - case kafkaErrChan <- errors.New("timeout waiting for consumer to be ready"): + case errCh <- errors.New("timeout waiting for Kafka consumer readiness"): default: } - b.lggr.Error().Msg("Timeout waiting for Kafka consumer to be ready - check broker connectivity") + b.lggr.Error().Msg("Timeout waiting for Kafka consumer readiness - check broker connectivity") case <-ctx.Done(): - b.lggr.Info().Msg("Context cancelled while waiting for consumer readiness") + b.lggr.Info().Msg("Context cancelled while waiting for Kafka consumer readiness") } - return messageChan, kafkaErrChan + return msgCh, errCh } -// Helper function to get map keys for logging -func getMapKeys(m map[string]func() proto.Message) []string { - keys := make([]string, 0, len(m)) - for k := range m { - keys = append(keys, k) +// validateKafkaConnectivity explicitly validates Kafka broker connectivity. +func (b *Beholder) validateConsumerConnectivity(ctx context.Context) error { + vctx, cancel := context.WithTimeout(ctx, maxConsumerConnectivityTimeout) + defer cancel() + + consumer, err := b.createValidationConsumer(vctx, "validation") + if err != nil { + return err } - return keys + defer func() { + if closeErr := consumer.Close(); closeErr != nil { + b.lggr.Warn().Err(closeErr).Msg("Failed to close validation consumer") + } + }() + + topic := b.cfg.Kafka.Topics[0] + if _, err := b.validateTopicMetadata(consumer, topic); err != nil { + return err + } + + b.lggr.Info(). + Str("broker", b.cfg.ChipIngress.Output.RedPanda.KafkaExternalURL). + Str("topic", topic). + Msg("Kafka connectivity validation successful") + return nil } -func listenForKafkaMessages( - ctx context.Context, - logger zerolog.Logger, - brokerAddress string, - topic string, - messageTypes map[string]func() proto.Message, // ce_type -> protobuf factory function - messageChan chan proto.Message, // channel to send deserialized messages - errChan chan<- error, - readyChan chan<- bool, -) { - logger.Info().Str("broker", brokerAddress).Str("topic", topic).Msg("Starting Kafka listener with readiness signaling") +// validateBeholderHeartbeat validates that Beholder is alive and sending heartbeat messages. +// Retries up to 3 times with a fixed 5-second delay between attempts. +func (b *Beholder) validateBeholderHeartbeat(ctx context.Context) error { + const ( + maxRetries = 3 + retryDelay = 5 * time.Second + ) + + b.lggr.Info(). + Int("max_retries", maxRetries). + Dur("retry_delay", retryDelay). + Dur("max_timeout", maxConsumerConnectivityTimeout). + Int("session_timeout_ms", kafkaSessionTimeoutMs). + Msg("Starting Beholder heartbeat validation...") + + return retry.Do( + func() error { + return b.validateBeholderHeartbeatOnce(ctx) + }, + retry.Context(ctx), + retry.Attempts(maxRetries), + retry.Delay(retryDelay), + retry.DelayType(retry.FixedDelay), + retry.LastErrorOnly(true), + retry.OnRetry(func(n uint, err error) { + b.lggr.Warn(). + Err(err). + Uint("attempt", n+1). + Uint("max_retries", maxRetries). + Dur("retry_delay", retryDelay). + Msg("Beholder heartbeat validation attempt failed, retrying...") + }), + ) +} + +// validateBeholderHeartbeatOnce performs a single heartbeat validation attempt. +func (b *Beholder) validateBeholderHeartbeatOnce(ctx context.Context) error { + hctx, cancel := context.WithTimeout(ctx, maxConsumerConnectivityTimeout) + defer cancel() - // Ensure channel is closed when function exits to prevent goroutine leaks + consumer, err := b.createValidationConsumer(hctx, "heartbeat-validation") + if err != nil { + return err + } defer func() { - close(messageChan) - logger.Info().Msg("Listener message channel closed") + if closeErr := consumer.Close(); closeErr != nil { + b.lggr.Warn().Err(closeErr).Msg("Failed to close heartbeat validation consumer") + } }() + b.lggr.Info().Msg("Created consumer for heartbeat validation") + + // Use blocking ReadMessage with timeout instead of ticker pattern + for { + select { + case <-hctx.Done(): + return errors.New("timeout waiting for Beholder heartbeat") + default: + } - kafkaConfig := &kafka.ConfigMap{ - "bootstrap.servers": brokerAddress, - "group.id": fmt.Sprintf("workshop-listener-%d", time.Now().Unix()), // Unique group per listener + msg, err := consumer.ReadMessage(messageReadInterval) + if err != nil { + // Benign timeout - no messages available yet + var kerr kafka.Error + if errors.As(err, &kerr) && kerr.Code() == kafka.ErrTimedOut { + continue + } + b.lggr.Error().Int("session_timeout_ms", kafkaSessionTimeoutMs).Err(err).Msg("Failed to read message during heartbeat validation (consider increasing the session timeout)") + return errors.Wrap(err, "failed to read message during heartbeat validation") + } + + // Check if this is a BaseMessage + ceType, ok := getHeaderValue(ceTypeHeader, msg) + if !ok || ceType != "BaseMessage" { + continue + } + + // Validate message length for CloudEvents binary format + if len(msg.Value) <= protobufOffset { + continue + } + + // Unmarshal BaseMessage + baseMsg := &commonevents.BaseMessage{} + if err := proto.Unmarshal(msg.Value[protobufOffset:], baseMsg); err != nil { + b.lggr.Debug().Err(err).Msg("Failed to unmarshal BaseMessage during heartbeat validation") + continue + } + + // Check if this is a heartbeat message + if !isHeartbeatMessage(baseMsg) { + b.lggr.Debug(). + Str("msg", baseMsg.Msg). + Msg("Received BaseMessage but not a heartbeat; continuing to listen") + continue + } + + // Found heartbeat! + b.lggr.Info(). + Str("msg", baseMsg.Msg). + Interface("labels", baseMsg.Labels). + Msg("Beholder heartbeat detected successfully") + return nil + } +} + +// createValidationConsumer creates a temporary Kafka consumer for validation purposes. +func (b *Beholder) createValidationConsumer(ctx context.Context, groupIDPrefix string) (*kafka.Consumer, error) { + if b == nil || b.cfg == nil { + return nil, errors.New(errBeholderOrConfigNil) + } + + groupID := fmt.Sprintf("%s-%d", groupIDPrefix, time.Now().UnixNano()) + cfg := &kafka.ConfigMap{ + "bootstrap.servers": b.cfg.ChipIngress.Output.RedPanda.KafkaExternalURL, + "group.id": groupID, "auto.offset.reset": "latest", "session.timeout.ms": kafkaSessionTimeoutMs, - "enable.auto.commit": true, // Commit messages after processing - "isolation.level": "read_committed", // Only read committed messages } - consumer, err := kafka.NewConsumer(kafkaConfig) + consumer, err := b.createAndSubscribeConsumer(cfg, b.cfg.Kafka.Topics[0]) if err != nil { - errChan <- errors.Wrap(err, "failed to create consumer") - return + return nil, err } - defer consumer.Close() - logger.Info().Msg("Kafka consumer created successfully") - err = consumer.Subscribe(topic, nil) + return consumer, nil +} + +// isHeartbeatMessage checks if a BaseMessage is a Beholder heartbeat. +// Heartbeat format: msg="heartbeat" and labels.system="Application" +func isHeartbeatMessage(msg *commonevents.BaseMessage) bool { + if msg == nil { + return false + } + + if msg.Msg != "heartbeat" { + return false + } + + if msg.Labels == nil { + return false + } + + systemLabel, exists := msg.Labels["system"] + if !exists { + return false + } + + // Case-insensitive comparison for robustness + return strings.EqualFold(systemLabel, "Application") +} + +// consume runs the Kafka consumer loop with offset management and automatic reconnection. +func (b *Beholder) consume( + ctx context.Context, + messageTypes map[string]func() proto.Message, + opts *ConsumerOptions, + out chan proto.Message, + errCh chan<- error, + readyCh chan<- struct{}, +) { + defer close(out) + + // Exponential backoff + backoff := 2 * time.Second + maxBackoffTimeout := 30 * time.Second + backoffFactor := 2.0 + attempt := 0 + // Reconnection loop + for { + select { + case <-ctx.Done(): + b.lggr.Info().Msg("Context cancelled; exiting Kafka consumer loop") + return + default: + // Continue to connection attempt + } + + err := b.consumeWithReconnect(ctx, messageTypes, opts, out, errCh, readyCh) + if err == nil { + // Clean exit (context cancelled) + return + } + + // Calculate backoff with jitter + attempt++ + jitter := time.Duration(rand.Float64() * float64(backoff) * 0.1) // 10% jitter + sleepDuration := backoff + jitter + if sleepDuration > maxBackoffTimeout { + sleepDuration = maxBackoffTimeout + } + + b.lggr.Warn(). + Dur("backoff", sleepDuration). + Int("attempt", attempt). + Err(err). + Msg("Reconnecting Kafka consumer with exponential backoff...") + + select { + case <-ctx.Done(): + b.lggr.Info().Msg("Context cancelled while attempting to reconnect Kafka consumer") + return + case <-time.After(sleepDuration): + b.lggr.Info().Int("attempt", attempt).Msg("Attempting to reconnect Kafka consumer...") + // Increase backoff for next iteration + backoff = time.Duration(float64(backoff) * backoffFactor) + if backoff > maxBackoffTimeout { + backoff = maxBackoffTimeout + } + } + } +} + +// consumeWithReconnect runs a single consumer session with UserLogs timeout tracking. +func (b *Beholder) consumeWithReconnect( + ctx context.Context, + messageTypes map[string]func() proto.Message, + opts *ConsumerOptions, + out chan proto.Message, + errCh chan<- error, + readyCh chan<- struct{}, +) error { + // The isolation level determines which messages the Kafka consumer is allowed to read: + // - [used by default] "read_uncommitted": The consumer can read all messages. + // - [beholder does not use Kafka transactions] "read_committed": The consumer will only read messages from transactions that have been successfully committed, ensuring no uncommitted or aborted data is delivered. + // This setting is important for applications that require strong data consistency and want to avoid processing uncommitted or potentially rolled-back messages. + isolationLevel := "read_uncommitted" + if opts.IsolationReadCommitted { // false by default + isolationLevel = "read_committed" + } + + cfg := &kafka.ConfigMap{ + "bootstrap.servers": b.cfg.ChipIngress.Output.RedPanda.KafkaExternalURL, + "group.id": opts.GroupID, + "auto.offset.reset": "latest", // Only process new messages by default + "session.timeout.ms": kafkaSessionTimeoutMs, + "enable.auto.commit": false, // Manual commit for safety. Prevents premature commit. + "enable.auto.offset.store": false, // Explicit commit control + "isolation.level": isolationLevel, + } + + consumer, err := b.createAndSubscribeConsumer(cfg, opts.Topic) if err != nil { - errChan <- errors.Wrap(err, "failed to subscribe to topic "+topic) - return + return err } + defer func() { + if closeErr := consumer.Close(); closeErr != nil { + b.lggr.Warn().Err(closeErr).Msg("Failed to close Kafka consumer") + } + }() + b.lggr.Info().Msg("Kafka consumer created successfully") - logger.Info().Str("topic", topic).Msg("Subscribed to topic (consuming from latest offset)") + // Verify and log subscription details + if err := b.logSubscriptionInfo(consumer, opts, errCh); err != nil { + return err + } - // Record start time AFTER consumer is ready to avoid race condition - startTime := time.Now() - logger.Info().Time("start_time", startTime).Msg("Consumer ready - will process messages from this point forward") + // Verify topic accessibility and log consumer ready + if err := b.validateConsumerReadiness(consumer, opts, errCh); err != nil { + return err + } - // Signal that consumer is ready - this is the key improvement for coordination + // This code signals (in a non-blocking way) that the Kafka consumer is ready to receive messages. + // It attempts to send an empty struct to the readyCh channel, but if the channel is full, it does nothing. select { - case readyChan <- true: - logger.Info().Msg("Signaled consumer readiness - workflow execution can now begin safely") + case readyCh <- struct{}{}: default: - logger.Debug().Msg("Ready channel already signaled or closed") + b.lggr.Warn().Msg("Kafka consumer readiness already signaled") } - ticker := time.NewTicker(messageReadInterval) - defer ticker.Stop() + interestedTypes := getMessageTypeKeys(messageTypes) + b.lggr.Debug().Strs("interested_types", interestedTypes).Msg("Starting message listening loop") - interestedTypes := getMapKeys(messageTypes) - logger.Debug().Strs("interested_types", interestedTypes).Msg("Starting message listening loop") + // Recreate the timer on each UserLogs message to avoid timer reset race conditions + timeoutTimer := time.NewTimer(maxConsumerConnectivityTimeout) + defer timeoutTimer.Stop() - // Start consuming messages] for { select { case <-ctx.Done(): - logger.Warn().Msg("Context cancelled, stopping Kafka listener") - return - case <-ticker.C: - msg, err := consumer.ReadMessage(kafkaReadTimeoutMs) // Non-blocking read + b.lggr.Info().Msg("Context cancelled; exiting consumer loop") + return nil + + case <-timeoutTimer.C: + // No UserLogs received within the timeout period + b.lggr.Warn(). + Dur("timeout", maxConsumerConnectivityTimeout). + Msg("No UserLogs received within timeout period, triggering Kafka consumer reconnection") + return errors.New("no UserLogs received within timeout period") + + default: + // Use blocking ReadMessage with short timeout + msg, err := consumer.ReadMessage(messageReadInterval) if err != nil { - // Check if it's just a timeout (no messages available) - var kafkaErr kafka.Error - if errors.As(err, &kafkaErr) && kafkaErr.Code() == kafka.ErrTimedOut { - // Don't log timeouts as they're expected + // Benign timeout - no messages available + var kerr kafka.Error + if errors.As(err, &kerr) && kerr.Code() == kafka.ErrTimedOut { continue } - logger.Error().Err(err).Msg("Consumer error") - errChan <- errors.Wrap(err, "failed to consume message") - return - } - - // More lenient timestamp filtering - only skip very old messages (30+ seconds) - msgTime := msg.Timestamp - const oldMessageThreshold = 30 * time.Second - if !msgTime.IsZero() && msgTime.Before(startTime.Add(-oldMessageThreshold)) { - logger.Debug(). - Time("msg_time", msgTime). - Time("start_time", startTime). - Dur("old_message_threshold", oldMessageThreshold). - Msg("Skipping old messages") - continue + logError(b.lggr, errCh, errors.Wrap(err, "failed to read message")) + return err } - logger.Debug(). + b.lggr.Debug(). Str("key", string(msg.Key)). Int("value_length", len(msg.Value)). - Str("topic", *msg.TopicPartition.Topic). Int32("partition", msg.TopicPartition.Partition). Int64("offset", int64(msg.TopicPartition.Offset)). - Time("timestamp", msgTime). - Msg("Received new message") - - ceType, err := getValueFromHeader(ceTypeHeader, msg) - if err != nil { - logger.Debug().Err(err).Msg("Failed to get ce_type, skipping") + Time("timestamp", msg.Timestamp). + Msg("Received Kafka message") + + // Extract and validate ce_type header + ceType, ok := getHeaderValue(ceTypeHeader, msg) + if !ok { + b.lggr.Debug(). + Int64("offset", int64(msg.TopicPartition.Offset)). + Msg("Skipping message without ce_type header") continue } - - logger.Debug().Str("ce_type", ceType).Msg("Message type determined") + b.lggr.Debug(). + Str("ce_type", ceType). + Int64("offset", int64(msg.TopicPartition.Offset)). + Int32("partition", msg.TopicPartition.Partition). + Msg("Message type determined") // Check if we're interested in this message type factory, interested := messageTypes[ceType] if !interested { - logger.Debug(). + b.lggr.Debug(). Str("ce_type", ceType). + Int64("offset", int64(msg.TopicPartition.Offset)). Strs("interested_types", interestedTypes). - Msg("Skipping message type (not in interested types)") + Msg("Skipping other (uninterested) message type") continue } - // CloudEvents with ce_datacontenttype: application/protobuf - // The protobuf data starts at offset 6 (after 6-byte binary header) + // Validate message length for CloudEvents binary format if len(msg.Value) <= protobufOffset { - logger.Debug(). - Int("message_length", len(msg.Value)). - Int("required_offset", protobufOffset). - Msg("Message too short for binary-wrapped protobuf") + b.lggr.Debug(). + Int("len", len(msg.Value)). + Int("offset", protobufOffset). + Msg("Message too short for protobuf payload; skipping") continue } - protobufData := msg.Value[protobufOffset:] - message := factory() // Create new instance using factory function passed in messageTypes map + // Create and unmarshal protobuf message + pm := factory() + if pm == nil { + b.lggr.Warn().Str("ce_type", ceType).Msg("Factory returned nil; skipping") + continue + } - err = proto.Unmarshal(protobufData, message) - if err != nil { - logger.Error(). - Err(err). - Int("protobuf_offset", protobufOffset). - Str("ce_type", ceType). - Msg("Failed to deserialize protobuf") + if err := proto.Unmarshal(msg.Value[protobufOffset:], pm); err != nil { + b.lggr.Error().Err(err).Str("ce_type", ceType).Msg("Failed to unmarshal protobuf; skipping") continue } - // Successfully processed the message! Send it back through channel - logger.Debug().Str("ce_type", ceType).Msg("Successfully deserialized message, sending to channel") + // Reset timeout if we received a UserLogs message + if _, isUserLogs := pm.(*workflowevents.UserLogs); isUserLogs { + // Go recommendation: don't reuse timers, create new ones to avoid race conditions + if !timeoutTimer.Stop() { + // Timer already fired, drain it to prevent blocking + <-timeoutTimer.C + } + timeoutTimer = time.NewTimer(maxConsumerConnectivityTimeout) + b.lggr.Info(). + Int64("offset", int64(msg.TopicPartition.Offset)). + Int32("partition", msg.TopicPartition.Partition). + Dur("timeout", maxConsumerConnectivityTimeout). + Msg("UserLogs received - reconnection timeout reset") + } + // Send to output channel (blocking to prevent message loss) select { - case messageChan <- message: - logger.Debug().Msg("Message sent to channel successfully") - case <-ctx.Done(): - logger.Info().Msg("Context cancelled while sending message") - return - default: - // Channel is full - try with a brief timeout instead of dropping immediately - select { - case messageChan <- message: - logger.Warn().Msg("Message sent to channel after brief delay (channel was full)") - case <-time.After(channelFullRetryTimeout): - logger.Error().Msg("Message channel full for too long, dropping message") - case <-ctx.Done(): - return + case out <- pm: + // Commit offset after successful delivery + if err := b.commitMessage(consumer, msg, opts.CommitSync); err != nil { + logError(b.lggr, errCh, err) + return err } + b.lggr.Debug(). + Str("ce_type", ceType). + Int64("offset", int64(msg.TopicPartition.Offset)). + Int32("partition", msg.TopicPartition.Partition). + Msg("Successfully processed and committed message") + + case <-ctx.Done(): + b.lggr.Info().Msg("Context cancelled while delivering message") + return nil } } } } -func getValueFromHeader(expectedHeader string, msg *kafka.Message) (string, error) { - for _, header := range msg.Headers { - if header.Key == expectedHeader { - return string(header.Value), nil +// commitMessage tells Kafka that the message processed and offset committed +// without commits, if the consumer crashes and restarts, it would re-read all messages from the beginning +// +// Two commit modes: +// 1. Synchronous - SLOWER but SAFER: StoreMessage + Commit. Guarantees the offset is persisted before continuing. +// 2. Asynchronous - FASTER but less safe: CommitMessage, don't wait for confirmation from Kafka, offset commit happens in the background +// +// Default: false.Re-reading some messages on crash/restart is acceptable +func (b *Beholder) commitMessage(consumer *kafka.Consumer, msg *kafka.Message, syncCommit bool) error { + if syncCommit { + // Synchronous: Store offset first, then commit synchronously + if _, err := consumer.StoreMessage(msg); err != nil { + return errors.Wrap(err, "store offset") + } + if _, err := consumer.Commit(); err != nil { + return errors.Wrap(err, "commit sync") + } + } else { + // Asynchronous: One-step fire-and-forget (stores + commits in one call) + if _, err := consumer.CommitMessage(msg); err != nil { + return errors.Wrap(err, "commit message async") + } + } + + return nil +} + +// getHeaderValue extracts a header value from a Kafka message. +func getHeaderValue(key string, msg *kafka.Message) (string, bool) { + for _, h := range msg.Headers { + if h.Key == key { + return string(h.Value), true + } + } + return "", false +} + +// getMessageTypeKeys returns the keys from the message types map for logging. +func getMessageTypeKeys(m map[string]func() proto.Message) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + +// createAndSubscribeConsumer creates a Kafka consumer and subscribes to a topic. +func (b *Beholder) createAndSubscribeConsumer(cfg *kafka.ConfigMap, topic string) (*kafka.Consumer, error) { + consumer, err := kafka.NewConsumer(cfg) + if err != nil { + b.lggr.Error().Err(err).Msg("failed to create Kafka consumer") + return nil, errors.Wrap(err, "failed to create Kafka consumer") + } + + // Use SubscribeTopics for future multi-topic support + if err := consumer.SubscribeTopics([]string{topic}, nil); err != nil { + if closeErr := consumer.Close(); closeErr != nil { + b.lggr.Warn().Err(closeErr).Msg("Failed to close consumer after subscription failure") } + b.lggr.Error().Err(err).Str("topic", topic).Msg("failed to subscribe to topic") + return nil, errors.Wrapf(err, "failed to subscribe to topic %q", topic) + } + + return consumer, nil +} + +// logSubscriptionInfo fetches and logs subscription and partition assignment details. +func (b *Beholder) logSubscriptionInfo(consumer *kafka.Consumer, opts *ConsumerOptions, errCh chan<- error) error { + // Verify subscription by fetching from consumer + subscription, subErr := consumer.Subscription() + if subErr != nil { + logError(b.lggr, errCh, errors.Wrap(subErr, "failed to get subscription info")) + return subErr + } + + // Get partition assignment (may be empty initially, will be assigned after first poll) + assignment, assignErr := consumer.Assignment() + if assignErr != nil { + b.lggr.Debug().Err(assignErr).Msg("Could not get partition assignment yet (will be assigned after first poll)") + } + + logEvent := b.lggr.Info(). + Strs("subscribed_topics", subscription). + Str("group_id", opts.GroupID) + + if len(assignment) > 0 { + partitions := getPartitionFromAssignment(assignment) + logEvent.Ints("assigned_partitions", partitions) + } + + logEvent.Msg("Kafka consumer subscribed successfully") + return nil +} + +// validateConsumerReadiness verifies topic accessibility and logs consumer ready status. +func (b *Beholder) validateConsumerReadiness(consumer *kafka.Consumer, opts *ConsumerOptions, errCh chan<- error) error { + // Get topic metadata to verify accessibility + md, err := b.validateTopicMetadata(consumer, opts.Topic) + if err != nil { + logError(b.lggr, errCh, err) + return err + } + + // Log consumer ready with partition count + b.logConsumerReady(consumer, opts, len(md.Topics[opts.Topic].Partitions)) + return nil +} + +// validateTopicMetadata fetches topic metadata and validates accessibility. +func (b *Beholder) validateTopicMetadata(consumer *kafka.Consumer, topic string) (*kafka.Metadata, error) { + md, err := consumer.GetMetadata(&topic, false, int(maxConsumerConnectivityTimeout/time.Millisecond)) + if err != nil { + return nil, errors.Wrap(err, "failed to get metadata") + } + + if md == nil { + return nil, errors.New("metadata is nil") + } + + // Safely check if topic exists in metadata + topicMd, exists := md.Topics[topic] + if !exists { + return nil, errors.Errorf("topic %q not found in metadata", topic) + } + + // Validate topic error code and partitions + if topicMd.Error.Code() != kafka.ErrNoError { + return nil, errors.Errorf("topic %q has error: %v", topic, topicMd.Error) + } + + if len(topicMd.Partitions) == 0 { + return nil, errors.Errorf("topic %q has no partitions", topic) + } + + return md, nil +} + +// logConsumerReady logs consumer ready status with subscription and partition details. +func (b *Beholder) logConsumerReady(consumer *kafka.Consumer, opts *ConsumerOptions, totalPartitions int) { + // Get updated partition assignment after metadata verification + subscription, _ := consumer.Subscription() + assignment, _ := consumer.Assignment() + + readyLog := b.lggr.Info(). + Strs("subscribed_topics", subscription). + Str("group_id", opts.GroupID). + Int("total_partitions", totalPartitions) + + if len(assignment) > 0 { + partitions := getPartitionFromAssignment(assignment) + readyLog.Ints("assigned_partitions", partitions) + } + + readyLog.Msg("Consumer ready") +} + +// getPartitionFromAssignment extracts partition numbers from TopicPartition slice. +func getPartitionFromAssignment(assignment []kafka.TopicPartition) []int { + partitions := make([]int, len(assignment)) + for i, tp := range assignment { + partitions[i] = int(tp.Partition) + } + return partitions +} + +// logError logs an error and attempts to send it to the error channel. +// If the error channel is full (i.e., the send would block), it silently skips sending +// to avoid blocking the caller. This is useful in goroutines where you want to report +// errors but not risk deadlock if the channel is not being drained. +func logError(l zerolog.Logger, errCh chan<- error, err error) { + l.Error().Err(err).Msg("Kafka consumer error") + select { + case errCh <- err: + // Error sent to channel + default: + // Channel full, skip sending to avoid blocking } - return "", fmt.Errorf("%s not found in headers", expectedHeader) } diff --git a/system-tests/tests/test-helpers/t_helpers.go b/system-tests/tests/test-helpers/t_helpers.go index 036f0f80833..e6519b8763a 100644 --- a/system-tests/tests/test-helpers/t_helpers.go +++ b/system-tests/tests/test-helpers/t_helpers.go @@ -145,10 +145,15 @@ func StartBeholder(t *testing.T, testLogger zerolog.Logger, testEnv *ttypes.Test beholderMsgChan, beholderErrChan := beholder.SubscribeToBeholderMessages(listenerCtx, messageTypes) - // No more draining - let all messages through for processing - // The consumer is ready and any messages received are legitimate - testLogger.Info().Msg("Beholder listener ready - all messages will be processed") + // Fail fast if there is an error from the heartbeat validation subscription + select { + case err := <-beholderErrChan: + require.NoError(t, err, "Beholder subscription failed during initialization") + default: + // No immediate error, proceed + } + testLogger.Info().Msg("Beholder listener ready") return listenerCtx, beholderMsgChan, beholderErrChan } @@ -175,7 +180,7 @@ func AssertBeholderMessage(ctx context.Context, t *testing.T, expectedLog string foundErrorLog <- true } case *workflowevents.UserLogs: - testLogger.Info().Msg("🎉 Received UserLogs message in test") + testLogger.Info().Msg("➡️ Beholder message received in test. Asserting...") receivedUserLogs++ for _, logLine := range typedMsg.LogLines { @@ -194,7 +199,7 @@ func AssertBeholderMessage(ctx context.Context, t *testing.T, expectedLog string testLogger.Warn(). Str("expected_log", expectedLog). Str("found_message", strings.TrimSpace(logLine.Message)). - Msg("Received UserLogs message, but it does not match expected log") + Msg("[soft assertion] Received UserLogs message, but it does not match expected log") } default: // ignore other message types