From cdb582cdf1f35db130cd0d1ab6a5b73ca8f6d38c Mon Sep 17 00:00:00 2001 From: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> Date: Tue, 30 Aug 2022 09:23:24 -0700 Subject: [PATCH] feat(price-feeder): add ftx (#1299) ## Description Adds FTX as a provider closes: #1276 --- ### Author Checklist _All items are required. Please add a note to the item if the item is not applicable and please add links to any relevant follow up issues._ I have... - [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] added `!` to the type prefix if API or client breaking change - [ ] added appropriate labels to the PR - [ ] targeted the correct branch (see [PR Targeting](https://github.com/umee-network/umee/blob/main/CONTRIBUTING.md#pr-targeting)) - [ ] provided a link to the relevant issue or specification - [ ] added a changelog entry to `CHANGELOG.md` - [ ] included comments for [documenting Go code](https://blog.golang.org/godoc) - [ ] updated the relevant documentation or specification - [ ] reviewed "Files changed" and left comments if necessary - [ ] confirmed all CI checks have passed ### Reviewers Checklist _All items are required. Please add a note if the item is not applicable and please add your handle next to the items reviewed if you only reviewed selected items._ I have... - [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] confirmed all author checklist items have been addressed - [ ] reviewed state machine logic - [ ] reviewed API design and naming - [ ] reviewed documentation is accurate - [ ] reviewed tests and test coverage - [ ] manually tested (if applicable) --- price-feeder/CHANGELOG.md | 1 + price-feeder/README.md | 1 + price-feeder/config/config.go | 1 + price-feeder/oracle/oracle.go | 4 +- price-feeder/oracle/provider/coinbase.go | 4 +- price-feeder/oracle/provider/ftx.go | 344 +++++++++++++++++++++++ price-feeder/oracle/provider/ftx_test.go | 105 +++++++ price-feeder/oracle/provider/mock.go | 2 +- price-feeder/oracle/provider/osmosis.go | 9 +- price-feeder/oracle/provider/provider.go | 9 + price-feeder/oracle/types/errors.go | 3 +- 11 files changed, 470 insertions(+), 13 deletions(-) create mode 100644 price-feeder/oracle/provider/ftx.go create mode 100644 price-feeder/oracle/provider/ftx_test.go diff --git a/price-feeder/CHANGELOG.md b/price-feeder/CHANGELOG.md index ebe73716ac..eb6b3f3291 100644 --- a/price-feeder/CHANGELOG.md +++ b/price-feeder/CHANGELOG.md @@ -70,6 +70,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ - [#1175](https://github.com/umee-network/umee/pull/1175) Add ProviderName type to facilitate the reading of maps. - [#1215](https://github.com/umee-network/umee/pull/1215) Moved ProviderName to Name in provider package. - [#1274](https://github.com/umee-network/umee/pull/1274) Add option to set config by env variables. +- [#1299](https://github.com/umee-network/umee/pull/1299) Add FTX as a provider. ## [v0.2.5](https://github.com/umee-network/umee/releases/tag/price-feeder%2Fv0.2.5) - 2022-07-28 diff --git a/price-feeder/README.md b/price-feeder/README.md index 2bce554e76..f695976957 100644 --- a/price-feeder/README.md +++ b/price-feeder/README.md @@ -29,6 +29,7 @@ The list of current supported providers: - [Binance](https://www.binance.com/en) - [Coinbase](https://www.coinbase.com/) +- [FTX](https://ftx.com/) - [Gate](https://www.gate.io/) - [Huobi](https://www.huobi.com/en-us/) - [Kraken](https://www.kraken.com/en-us/) diff --git a/price-feeder/config/config.go b/price-feeder/config/config.go index bce2b5b0c3..5c9d6f1f57 100644 --- a/price-feeder/config/config.go +++ b/price-feeder/config/config.go @@ -38,6 +38,7 @@ var ( provider.ProviderHuobi: {}, provider.ProviderGate: {}, provider.ProviderCoinbase: {}, + provider.ProviderFTX: {}, provider.ProviderMock: {}, } diff --git a/price-feeder/oracle/oracle.go b/price-feeder/oracle/oracle.go index 57dfc52d97..b18cff134e 100644 --- a/price-feeder/oracle/oracle.go +++ b/price-feeder/oracle/oracle.go @@ -277,7 +277,6 @@ func GetComputedPrices( providerPairs map[provider.Name][]types.CurrencyPair, deviations map[string]sdk.Dec, ) (prices map[string]sdk.Dec, err error) { - // convert any non-USD denominated candles into USD convertedCandles, err := convertCandlesToUSD( logger, @@ -467,6 +466,9 @@ func NewProvider( case provider.ProviderGate: return provider.NewGateProvider(ctx, logger, endpoint, providerPairs...) + case provider.ProviderFTX: + return provider.NewFTXProvider(ctx, logger, endpoint, providerPairs...), nil + case provider.ProviderMock: return provider.NewMockProvider(), nil } diff --git a/price-feeder/oracle/provider/coinbase.go b/price-feeder/oracle/provider/coinbase.go index 7432f0bafe..35cdc83bc6 100644 --- a/price-feeder/oracle/provider/coinbase.go +++ b/price-feeder/oracle/provider/coinbase.go @@ -24,7 +24,7 @@ const ( coinbasePingCheck = time.Second * 28 // should be < 30 coinbaseRestHost = "https://api.exchange.coinbase.com" coinbaseRestPath = "/products" - timeLayout = "2006-01-02T15:04:05.000000Z" + coinbaseTimeFmt = "2006-01-02T15:04:05.000000Z" unixMinute = 60000 ) @@ -408,7 +408,7 @@ func (p *CoinbaseProvider) messageReceived(messageType int, bz []byte) { // timeToUnix converts a Time in format "2006-01-02T15:04:05.000000Z" to unix func (tr CoinbaseTradeResponse) timeToUnix() int64 { - t, err := time.Parse(timeLayout, tr.Time) + t, err := time.Parse(coinbaseTimeFmt, tr.Time) if err != nil { return 0 } diff --git a/price-feeder/oracle/provider/ftx.go b/price-feeder/oracle/provider/ftx.go new file mode 100644 index 0000000000..2f305bbaae --- /dev/null +++ b/price-feeder/oracle/provider/ftx.go @@ -0,0 +1,344 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "sync" + "time" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/rs/zerolog" + "github.com/umee-network/umee/price-feeder/oracle/types" +) + +const ( + ftxRestURL = "https://ftx.com/api" + ftxMarketsEndpoint = "/markets" + ftxCandleEndpoint = "/candles" + ftxTimeFmt = "2006-01-02T15:04:05+00:00" + // candleWindowLength is the amount of seconds between + // each candle + candleWindowLength = 15 + cacheInterval = 500 * time.Millisecond +) + +var _ Provider = (*FTXProvider)(nil) + +type ( + // FTXProvider defines an Oracle provider implemented by the FTX public + // API. + // + // REF: https://docs.ftx.com/ + FTXProvider struct { + baseURL string + client *http.Client + + logger zerolog.Logger + mtx sync.RWMutex + + // candleCache is the cache of candle prices for assets. + candleCache map[string][]types.CandlePrice + // marketsCache is the cache of token markets for assets. + marketsCache []FTXMarkets + } + + // FTXMarketsResponse is the response object used for + // available exchange rates and tickers. + FTXMarketsResponse struct { + Success bool `json:"success"` + Markets []FTXMarkets `json:"result"` + } + FTXMarkets struct { + Base string `json:"baseCurrency"` // e.x. "BTC" + Quote string `json:"quoteCurrency"` // e.x. "USDT" + Price float64 `json:"price"` // e.x. 10579.52 + Volume float64 `json:"quoteVolume24h"` // e.x. 28914.76 + } + + // FTXCandleResponse is the response object used for + // candle information. + FTXCandleResponse struct { + Success bool `json:"success"` + Candle []FTXCandle `json:"result"` + } + FTXCandle struct { + Price float64 `json:"close"` // e.x. 11055.25 + Volume float64 `json:"volume"` // e.x. 464193.95725 + StartTime string `json:"startTime"` // e.x. "2019-06-24T17:15:00+00:00" + } +) + +// parseTime parses a string such as "2022-08-29T20:23:00+00:00" into time.Time +func (c FTXCandle) parseTime() (time.Time, error) { + t, err := time.Parse(ftxTimeFmt, c.StartTime) + if err != nil { + return time.Time{}, fmt.Errorf("unable to parse ftx timestamp %w", err) + } + return t, nil +} + +func NewFTXProvider( + ctx context.Context, + logger zerolog.Logger, + endpoint Endpoint, + pairs ...types.CurrencyPair, +) *FTXProvider { + restURL := ftxRestURL + + if endpoint.Name == ProviderFTX { + restURL = endpoint.Rest + } + + ftx := FTXProvider{ + baseURL: restURL, + client: newDefaultHTTPClient(), + logger: logger, + candleCache: nil, + marketsCache: []FTXMarkets{}, + } + + go func() { + logger.Debug().Msg("starting ftx polling...") + err := ftx.pollCache(ctx, pairs...) + if err != nil { + logger.Err(err).Msg("ftx provider unable to poll new data") + } + }() + + return &ftx +} + +// GetTickerPrices returns the tickerPrices based on the provided pairs. +func (p *FTXProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]types.TickerPrice, error) { + markets := p.getMarketsCache() + + baseDenomIdx := make(map[string]types.CurrencyPair) + for _, cp := range pairs { + baseDenomIdx[strings.ToUpper(cp.Base)] = cp + } + + tickerPrices := make(map[string]types.TickerPrice, len(pairs)) + for _, tr := range markets { + symbol := strings.ToUpper(tr.Base) + + cp, ok := baseDenomIdx[symbol] + if !ok { + // skip tokens that are not requested + continue + } + + if _, ok := tickerPrices[symbol]; ok { + return nil, fmt.Errorf("duplicate token found in FTX response: %s", symbol) + } + + priceRaw := strconv.FormatFloat(tr.Price, 'f', -1, 64) + price, err := sdk.NewDecFromStr(priceRaw) + if err != nil { + return nil, fmt.Errorf("failed to read FTX price (%f) for %s", tr.Price, symbol) + } + + volumeRaw := strconv.FormatFloat(tr.Volume, 'f', -1, 64) + volume, err := sdk.NewDecFromStr(volumeRaw) + if err != nil { + return nil, fmt.Errorf("failed to read FTX volume (%f) for %s", tr.Volume, symbol) + } + + tickerPrices[cp.String()] = types.TickerPrice{Price: price, Volume: volume} + } + + for _, cp := range pairs { + if _, ok := tickerPrices[cp.String()]; !ok { + return nil, fmt.Errorf(types.ErrMissingExchangeRate.Error(), cp.String()) + } + } + + return tickerPrices, nil +} + +// GetCandlePrices returns the cached candlePrices based on provided pairs. +func (p *FTXProvider) GetCandlePrices(pairs ...types.CurrencyPair) (map[string][]types.CandlePrice, error) { + candleCache := p.getCandleCache() + if len(candleCache) < 1 { + return nil, fmt.Errorf("candles have not been cached") + } + + candlePrices := make(map[string][]types.CandlePrice, len(pairs)) + for _, pair := range pairs { + if _, ok := candleCache[pair.String()]; !ok { + return nil, fmt.Errorf("missing candles for %s", pair.String()) + } + candlePrices[pair.String()] = candleCache[pair.String()] + } + + return candlePrices, nil +} + +// GetAvailablePairs return all available pairs symbol to susbscribe. +func (p *FTXProvider) GetAvailablePairs() (map[string]struct{}, error) { + markets := p.getMarketsCache() + availablePairs := make(map[string]struct{}, len(markets)) + for _, pair := range markets { + cp := types.CurrencyPair{ + Base: strings.ToUpper(pair.Base), + Quote: strings.ToUpper(pair.Quote), + } + availablePairs[cp.String()] = struct{}{} + } + + return availablePairs, nil +} + +// SubscribeCurrencyPairs performs a no-op since ftx does not use websockets +func (p *FTXProvider) SubscribeCurrencyPairs(pairs ...types.CurrencyPair) error { + return nil +} + +// pollCache polls the markets and candles endpoints, +// and updates the ftx cache. +func (p *FTXProvider) pollCache(ctx context.Context, pairs ...types.CurrencyPair) error { + for { + select { + case <-ctx.Done(): + return nil + + default: + p.logger.Debug().Msg("querying ftx api") + + err := p.pollMarkets() + if err != nil { + return err + } + err = p.pollCandles(pairs...) + if err != nil { + return err + } + + time.Sleep(cacheInterval) + } + } +} + +// pollMarkets retrieves the markets response from the ftx api and +// places it in p.marketsCache. +func (p *FTXProvider) pollMarkets() error { + path := fmt.Sprintf("%s%s", p.baseURL, ftxMarketsEndpoint) + + resp, err := p.client.Get(path) + if err != nil { + return err + } + err = checkHTTPStatus(resp) + if err != nil { + return err + } + defer resp.Body.Close() + + var pairsSummary FTXMarketsResponse + if err := json.NewDecoder(resp.Body).Decode(&pairsSummary); err != nil { + return err + } + + if !pairsSummary.Success { + return fmt.Errorf("ftx markets api returned with failure") + } + + p.setMarketsCache(pairsSummary.Markets) + return nil +} + +// pollMarkets retrieves the candles response from the ftx api and +// places it in p.candleCache. +func (p *FTXProvider) pollCandles(pairs ...types.CurrencyPair) error { + candles := make(map[string][]types.CandlePrice) + now := time.Now() + + for _, pair := range pairs { + if _, ok := candles[pair.Base]; !ok { + candles[pair.String()] = []types.CandlePrice{} + } + + path := fmt.Sprintf("%s%s/%s/%s%s?resolution=%d&start_time=%d&end_time=%d", + p.baseURL, + ftxMarketsEndpoint, + pair.Base, + pair.Quote, + ftxCandleEndpoint, + candleWindowLength, + now.Add(providerCandlePeriod*-1).Unix(), + now.Unix(), + ) + + resp, err := p.client.Get(path) + if err != nil { + return fmt.Errorf("failed to make FTX candle request: %w", err) + } + err = checkHTTPStatus(resp) + if err != nil { + return err + } + + defer resp.Body.Close() + + bz, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read FTX candle response body: %w", err) + } + + var candlesResp FTXCandleResponse + if err := json.Unmarshal(bz, &candlesResp); err != nil { + return fmt.Errorf("failed to unmarshal FTX response body: %w", err) + } + + candlePrices := []types.CandlePrice{} + for _, responseCandle := range candlesResp.Candle { + // the ftx api does not provide the endtime for these candles, + // so we have to calculate it + candleStart, err := responseCandle.parseTime() + if err != nil { + return err + } + candleEnd := candleStart.Add(candleWindowLength).Unix() * int64(time.Second/time.Millisecond) + + closeStr := fmt.Sprintf("%f", responseCandle.Price) + volumeStr := fmt.Sprintf("%f", responseCandle.Volume) + candlePrices = append(candlePrices, types.CandlePrice{ + Price: sdk.MustNewDecFromStr(closeStr), + Volume: sdk.MustNewDecFromStr(volumeStr), + TimeStamp: candleEnd, + }) + } + candles[pair.String()] = candlePrices + } + + p.setCandleCache(candles) + return nil +} + +func (p *FTXProvider) setCandleCache(c map[string][]types.CandlePrice) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.candleCache = c +} + +func (p *FTXProvider) getCandleCache() map[string][]types.CandlePrice { + p.mtx.RLock() + defer p.mtx.RUnlock() + return p.candleCache +} + +func (p *FTXProvider) setMarketsCache(m []FTXMarkets) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.marketsCache = m +} + +func (p *FTXProvider) getMarketsCache() []FTXMarkets { + p.mtx.RLock() + defer p.mtx.RUnlock() + return p.marketsCache +} diff --git a/price-feeder/oracle/provider/ftx_test.go b/price-feeder/oracle/provider/ftx_test.go new file mode 100644 index 0000000000..96ade2e1de --- /dev/null +++ b/price-feeder/oracle/provider/ftx_test.go @@ -0,0 +1,105 @@ +package provider + +import ( + "context" + "strconv" + "testing" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "github.com/umee-network/umee/price-feeder/oracle/types" +) + +func TestFTXProvider_GetTickerPrices(t *testing.T) { + p := NewFTXProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "ATOM", Quote: "USDT"}, + ) + + t.Run("valid_request_single_ticker", func(t *testing.T) { + lastPrice := "34.689998626708984000" + volume := "2396974.000000000000000000" + + lp, _ := strconv.ParseFloat(lastPrice, 32) + v, _ := strconv.ParseFloat(volume, 32) + marketCache := []FTXMarkets{ + { + Base: "ATOM", + Quote: "USDT", + Price: lp, + Volume: v, + }, + } + + p.setMarketsCache(marketCache) + + prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "ATOM", Quote: "USDT"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, sdk.MustNewDecFromStr(lastPrice), prices["ATOMUSDT"].Price) + require.Equal(t, sdk.MustNewDecFromStr(volume), prices["ATOMUSDT"].Volume) + }) + + t.Run("invalid_request_invalid_ticker", func(t *testing.T) { + prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.EqualError(t, err, "missing exchange rate for FOOBAR") + require.Nil(t, prices) + }) +} + +func TestFTXProvider_GetCandlePrices(t *testing.T) { + p := NewFTXProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "ATOM", Quote: "USDT"}, + ) + + t.Run("valid_request_single_candle", func(t *testing.T) { + price := sdk.MustNewDecFromStr("34.689998626708984000") + volume := sdk.MustNewDecFromStr("2396974.000000000000000000") + timeStamp := int64(1000000) + + candleCache := map[string][]types.CandlePrice{ + "ATOMUSDT": { + types.CandlePrice{ + TimeStamp: timeStamp, + Price: price, + Volume: volume, + }, + }, + } + + p.setCandleCache(candleCache) + + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "ATOM", Quote: "USDT"}) + require.NoError(t, err) + require.Len(t, prices, 1) + require.Equal(t, price, prices["ATOMUSDT"][0].Price) + require.Equal(t, volume, prices["ATOMUSDT"][0].Volume) + require.Equal(t, timeStamp, prices["ATOMUSDT"][0].TimeStamp) + }) + + t.Run("invalid_request_invalid_candle", func(t *testing.T) { + prices, err := p.GetCandlePrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) + require.EqualError(t, err, "missing candles for FOOBAR") + require.Nil(t, prices) + }) +} + +func TestFTXProvider_SubscribeCurrencyPairs(t *testing.T) { + p := NewFTXProvider( + context.TODO(), + zerolog.Nop(), + Endpoint{}, + types.CurrencyPair{Base: "ATOM", Quote: "USDT"}, + ) + + t.Run("invalid_subscribe_channels_empty", func(t *testing.T) { + err := p.SubscribeCurrencyPairs([]types.CurrencyPair{}...) + require.NoError(t, err) + }) +} diff --git a/price-feeder/oracle/provider/mock.go b/price-feeder/oracle/provider/mock.go index 7517b869e4..f64b0148ab 100644 --- a/price-feeder/oracle/provider/mock.go +++ b/price-feeder/oracle/provider/mock.go @@ -89,7 +89,7 @@ func (p MockProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]t for t := range tickerMap { if _, ok := tickerPrices[t]; !ok { - return nil, fmt.Errorf("missing exchange rate for %s", t) + return nil, fmt.Errorf(types.ErrMissingExchangeRate.Error(), t) } } diff --git a/price-feeder/oracle/provider/osmosis.go b/price-feeder/oracle/provider/osmosis.go index d4892617a1..9e109d0e47 100644 --- a/price-feeder/oracle/provider/osmosis.go +++ b/price-feeder/oracle/provider/osmosis.go @@ -133,7 +133,7 @@ func (p OsmosisProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[strin for _, cp := range pairs { if _, ok := tickerPrices[cp.String()]; !ok { - return nil, fmt.Errorf("missing exchange rate for %s", cp.String()) + return nil, fmt.Errorf(types.ErrMissingExchangeRate.Error(), cp.String()) } } @@ -227,10 +227,3 @@ func (p OsmosisProvider) GetAvailablePairs() (map[string]struct{}, error) { func (p OsmosisProvider) SubscribeCurrencyPairs(pairs ...types.CurrencyPair) error { return nil } - -func checkHTTPStatus(resp *http.Response) error { - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected status: %s", resp.Status) - } - return nil -} diff --git a/price-feeder/oracle/provider/provider.go b/price-feeder/oracle/provider/provider.go index 20966b0dd8..459ec68868 100644 --- a/price-feeder/oracle/provider/provider.go +++ b/price-feeder/oracle/provider/provider.go @@ -1,6 +1,7 @@ package provider import ( + "fmt" "net/http" "time" @@ -22,6 +23,7 @@ const ( ProviderOkx Name = "okx" ProviderGate Name = "gate" ProviderCoinbase Name = "coinbase" + ProviderFTX Name = "ftx" ProviderMock Name = "mock" ) @@ -97,3 +99,10 @@ func PastUnixTime(t time.Duration) int64 { func SecondsToMilli(t int64) int64 { return t * int64(time.Second/time.Millisecond) } + +func checkHTTPStatus(resp *http.Response) error { + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status: %s", resp.Status) + } + return nil +} diff --git a/price-feeder/oracle/types/errors.go b/price-feeder/oracle/types/errors.go index a5b8391594..107996aae2 100644 --- a/price-feeder/oracle/types/errors.go +++ b/price-feeder/oracle/types/errors.go @@ -8,5 +8,6 @@ const ModuleName = "price-feeder" // Price feeder errors var ( - ErrProviderConnection = sdkerrors.Register(ModuleName, 1, "provider connection") + ErrProviderConnection = sdkerrors.Register(ModuleName, 1, "provider connection") + ErrMissingExchangeRate = sdkerrors.Register(ModuleName, 2, "missing exchange rate for %s") )