From efff2b178afd09a6063c42fa74d461558fd02189 Mon Sep 17 00:00:00 2001 From: Alex | Skip Date: Wed, 23 Oct 2024 10:18:15 -0400 Subject: [PATCH] fix: uniswap unable to handle more than 10 tickers (#797) --- pkg/slices/slices.go | 20 ++++++ pkg/slices/slices_test.go | 62 +++++++++++++++++++ providers/apis/defi/ethmulticlient/client.go | 12 ++-- providers/apis/defi/raydium/client.go | 8 +-- providers/apis/defi/uniswapv3/fetcher.go | 31 ++++++---- providers/apis/defi/uniswapv3/fetcher_test.go | 3 +- providers/apis/defi/uniswapv3/utils.go | 2 +- providers/providertest/provider.go | 2 +- providers/providertest/util.go | 3 + 9 files changed, 117 insertions(+), 26 deletions(-) create mode 100644 pkg/slices/slices.go create mode 100644 pkg/slices/slices_test.go diff --git a/pkg/slices/slices.go b/pkg/slices/slices.go new file mode 100644 index 000000000..60c4cdca2 --- /dev/null +++ b/pkg/slices/slices.go @@ -0,0 +1,20 @@ +package slices + +// Chunk chunks a slice into batches of chunkSize. +// example: {1,2,3,4,5}, chunkSize = 2 -> {1,2}, {3,4}, {5} +func Chunk[T any](input []T, chunkSize int) [][]T { + if len(input) <= chunkSize { + return [][]T{input} + } + var chunks [][]T + for i := 0; i < len(input); i += chunkSize { + end := i + chunkSize + + if end > len(input) { + end = len(input) + } + + chunks = append(chunks, input[i:end]) + } + return chunks +} diff --git a/pkg/slices/slices_test.go b/pkg/slices/slices_test.go new file mode 100644 index 000000000..0ef40a528 --- /dev/null +++ b/pkg/slices/slices_test.go @@ -0,0 +1,62 @@ +package slices_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/skip-mev/connect/v2/pkg/slices" +) + +func TestChunkSlice(t *testing.T) { + testCases := []struct { + name string + input []string + chunkSize int + expected [][]string + }{ + { + name: "Empty slice", + input: []string{}, + chunkSize: 3, + expected: [][]string{{}}, + }, + { + name: "Slice smaller than chunk size", + input: []string{"a", "b"}, + chunkSize: 3, + expected: [][]string{{"a", "b"}}, + }, + { + name: "Slice equal to chunk size", + input: []string{"a", "b", "c"}, + chunkSize: 3, + expected: [][]string{{"a", "b", "c"}}, + }, + { + name: "Slice larger than chunk size", + input: []string{"a", "b", "c", "d", "e"}, + chunkSize: 2, + expected: [][]string{{"a", "b"}, {"c", "d"}, {"e"}}, + }, + { + name: "Chunk size of 1", + input: []string{"a", "b", "c"}, + chunkSize: 1, + expected: [][]string{{"a"}, {"b"}, {"c"}}, + }, + { + name: "Large slice with uneven chunks", + input: []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}, + chunkSize: 3, + expected: [][]string{{"1", "2", "3"}, {"4", "5", "6"}, {"7", "8", "9"}, {"10"}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := slices.Chunk(tc.input, tc.chunkSize) + require.Equal(t, tc.expected, result) + }) + } +} diff --git a/providers/apis/defi/ethmulticlient/client.go b/providers/apis/defi/ethmulticlient/client.go index e7f3bd77f..94c28fc3b 100644 --- a/providers/apis/defi/ethmulticlient/client.go +++ b/providers/apis/defi/ethmulticlient/client.go @@ -6,10 +6,10 @@ import ( "net/http" "time" + "github.com/ethereum/go-ethereum/rpc" + "github.com/skip-mev/connect/v2/oracle/config" "github.com/skip-mev/connect/v2/providers/base/api/metrics" - - "github.com/ethereum/go-ethereum/rpc" ) // EVMClient is an interface that abstracts the evm client. @@ -92,17 +92,17 @@ func NewGoEthereumClientImpl( // the corresponding BatchElem. // // Note that batch calls may not be executed atomically on the server side. -func (c *GoEthereumClientImpl) BatchCallContext(ctx context.Context, calls []rpc.BatchElem) (err error) { +func (c *GoEthereumClientImpl) BatchCallContext(ctx context.Context, calls []rpc.BatchElem) error { start := time.Now() defer func() { c.apiMetrics.ObserveProviderResponseLatency(c.api.Name, c.redactedURL, time.Since(start)) }() - if err = c.client.BatchCallContext(ctx, calls); err != nil { + if err := c.client.BatchCallContext(ctx, calls); err != nil { c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeError) - return + return err } c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeOK) - return + return nil } diff --git a/providers/apis/defi/raydium/client.go b/providers/apis/defi/raydium/client.go index d636b4e04..900ac04e6 100644 --- a/providers/apis/defi/raydium/client.go +++ b/providers/apis/defi/raydium/client.go @@ -76,20 +76,20 @@ func (c *JSONRPCClient) GetMultipleAccountsWithOpts( ctx context.Context, accounts []solana.PublicKey, opts *rpc.GetMultipleAccountsOpts, -) (out *rpc.GetMultipleAccountsResult, err error) { +) (*rpc.GetMultipleAccountsResult, error) { start := time.Now() defer func() { c.apiMetrics.ObserveProviderResponseLatency(c.api.Name, c.redactedURL, time.Since(start)) }() - out, err = c.client.GetMultipleAccountsWithOpts(ctx, accounts, opts) + out, err := c.client.GetMultipleAccountsWithOpts(ctx, accounts, opts) if err != nil { c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeError) - return + return nil, err } c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeOK) - return + return out, nil } // solanaClientFromEndpoint creates a new SolanaJSONRPCClient from an endpoint. diff --git a/providers/apis/defi/uniswapv3/fetcher.go b/providers/apis/defi/uniswapv3/fetcher.go index 8124fe56c..8b53769a5 100644 --- a/providers/apis/defi/uniswapv3/fetcher.go +++ b/providers/apis/defi/uniswapv3/fetcher.go @@ -7,15 +7,15 @@ import ( "math/big" "time" - "go.uber.org/zap" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" + "go.uber.org/zap" "github.com/skip-mev/connect/v2/oracle/config" "github.com/skip-mev/connect/v2/oracle/types" + "github.com/skip-mev/connect/v2/pkg/slices" "github.com/skip-mev/connect/v2/providers/apis/defi/ethmulticlient" uniswappool "github.com/skip-mev/connect/v2/providers/apis/defi/uniswapv3/pool" "github.com/skip-mev/connect/v2/providers/base/api/metrics" @@ -158,6 +158,7 @@ func (u *PriceFetcher) Fetch( // Create a batch element for each ticker and pool. batchElems := make([]rpc.BatchElem, len(tickers)) pools := make([]PoolConfig, len(tickers)) + for i, ticker := range tickers { pool, err := u.GetPool(ticker) if err != nil { @@ -192,17 +193,23 @@ func (u *PriceFetcher) Fetch( pools[i] = pool } - // Batch call to the EVM. - if err := u.client.BatchCallContext(ctx, batchElems); err != nil { - u.logger.Debug( - "failed to batch call to ethereum network for all tickers", - zap.Error(err), - ) + // process 10 tickers at a time + const batchSize = 10 + batchChunks := slices.Chunk(batchElems, batchSize) - return types.NewPriceResponseWithErr( - tickers, - providertypes.NewErrorWithCode(err, providertypes.ErrorAPIGeneral), - ) + for _, chunk := range batchChunks { + // Batch call to the EVM. + if err := u.client.BatchCallContext(ctx, chunk); err != nil { + u.logger.Debug( + "failed to batch call to ethereum network for all tickers", + zap.Error(err), + ) + + return types.NewPriceResponseWithErr( + tickers, + providertypes.NewErrorWithCode(err, providertypes.ErrorAPIGeneral), + ) + } } // Parse the result from the batch call for each ticker. diff --git a/providers/apis/defi/uniswapv3/fetcher_test.go b/providers/apis/defi/uniswapv3/fetcher_test.go index 9640312a2..4e2ba968e 100644 --- a/providers/apis/defi/uniswapv3/fetcher_test.go +++ b/providers/apis/defi/uniswapv3/fetcher_test.go @@ -6,10 +6,9 @@ import ( "math/big" "testing" - "go.uber.org/zap" - "github.com/ethereum/go-ethereum/rpc" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/skip-mev/connect/v2/oracle/config" "github.com/skip-mev/connect/v2/oracle/types" diff --git a/providers/apis/defi/uniswapv3/utils.go b/providers/apis/defi/uniswapv3/utils.go index 720234973..da986dffb 100644 --- a/providers/apis/defi/uniswapv3/utils.go +++ b/providers/apis/defi/uniswapv3/utils.go @@ -80,7 +80,7 @@ func (pc *PoolConfig) ValidateBasic() error { } // MustToJSON converts the pool configuration to JSON. -func (pc PoolConfig) MustToJSON() string { +func (pc *PoolConfig) MustToJSON() string { b, err := json.Marshal(pc) if err != nil { panic(err) diff --git a/providers/providertest/provider.go b/providers/providertest/provider.go index 01dc35166..57bc4dba5 100644 --- a/providers/providertest/provider.go +++ b/providers/providertest/provider.go @@ -163,7 +163,7 @@ func (o *TestingOracle) RunMarketMap(ctx context.Context, mm mmtypes.MarketMap, if len(prices) != expectedNumPrices { return nil, fmt.Errorf("expected %d prices, got %d", expectedNumPrices, len(prices)) } - o.Logger.Info("provider prices", zap.Any("prices", prices)) + priceResults = append(priceResults, PriceResult{ Prices: prices, Time: time.Now(), diff --git a/providers/providertest/util.go b/providers/providertest/util.go index 2cc09af33..672224d7e 100644 --- a/providers/providertest/util.go +++ b/providers/providertest/util.go @@ -15,6 +15,9 @@ func FilterMarketMapToProviders(mm mmtypes.MarketMap) map[string]mmtypes.MarketM for _, market := range mm.Markets { // check each provider config for _, pc := range market.ProviderConfigs { + // remove normalizations to isolate markets + pc.NormalizeByPair = nil + // create a market from the given provider config isolatedMarket := mmtypes.Market{ Ticker: market.Ticker,