Skip to content

Commit

Permalink
fix: migrate to coinex v2 (#2338)
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai authored Dec 12, 2024
1 parent 400576b commit 28f78c5
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 88 deletions.
35 changes: 32 additions & 3 deletions node/pkg/websocketfetcher/providers/coinex/coinex.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package coinex

import (
"context"
"encoding/json"

"bisonai.com/miko/node/pkg/websocketfetcher/common"
"bisonai.com/miko/node/pkg/wss"
"github.com/rs/zerolog/log"
"nhooyr.io/websocket"
)

type CoinexFetcher common.Fetcher
Expand All @@ -28,14 +30,18 @@ func New(ctx context.Context, opts ...common.FetcherOption) (common.FetcherInter

subscription := Subscription{
Method: "state.subscribe",
Params: params,
ID: 1,
Params: SubscribeParams{
MarketList: params,
},
ID: 1,
}

ws, err := wss.NewWebsocketHelper(ctx,
wss.WithEndpoint(URL),
wss.WithSubscriptions([]any{subscription}),
wss.WithProxyUrl(config.Proxy))
wss.WithProxyUrl(config.Proxy),
wss.WithCustomReadFunc(fetcher.customReadFunc),
)
if err != nil {
log.Error().Str("Player", "Coinex").Err(err).Msg("error in coinex.New")
return nil, err
Expand Down Expand Up @@ -69,3 +75,26 @@ func (f *CoinexFetcher) handleMessage(ctx context.Context, message map[string]an
func (f *CoinexFetcher) Run(ctx context.Context) {
f.Ws.Run(ctx, f.handleMessage)
}

func (f *CoinexFetcher) customReadFunc(ctx context.Context, conn *websocket.Conn) (map[string]interface{}, error) {
var result map[string]interface{}
_, data, err := conn.Read(ctx)
if err != nil {
log.Error().Str("Player", "coinex").Err(err).Msg("error in coinex.customReadFunc, failed to read from websocket")
return nil, err
}

decompressed, err := common.DecompressGzip(data)
if err != nil {
log.Error().Str("Player", "coinex").Err(err).Msg("error in coinex.customReadFunc, failed to decompress data")
return nil, err
}

err = json.Unmarshal(decompressed, &result)
if err != nil {
log.Error().Str("Player", "coinex").Err(err).Msg("error in coinex.customReadFunc, failed to unmarshal data")
return nil, err
}

return result, nil
}
44 changes: 27 additions & 17 deletions node/pkg/websocketfetcher/providers/coinex/type.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,37 @@
package coinex

const URL = "wss://socket.coinex.com/"
const URL = "wss://socket.coinex.com/v2/spot"

type SubscribeParams struct {
MarketList []string `json:"market_list"`
}

type Subscription struct {
Method string `json:"method"`
Params []string `json:"params"`
ID int `json:"id"`
Method string `json:"method"`
Params SubscribeParams `json:"params"`
ID int `json:"id"`
}

type State struct {
Market string `json:"market"`
Last string `json:"last"`
Open string `json:"open"`
Close string `json:"close"`
High string `json:"high"`
Low string `json:"low"`
Volume string `json:"volume"`
VolumeSell string `json:"volume_sell"`
VolumeBuy string `json:"volume_buy"`
Value string `json:"value"`
Period int `json:"period"`
}

type Ticker struct {
Open string `json:"open"`
Last string `json:"last"`
High string `json:"high"`
Low string `json:"low"`
Deal string `json:"deal"`
Volume string `json:"volume"`
SellTotal string `json:"sell_total"`
BuyTotal string `json:"buy_total"`
Period int `json:"period"`
type Data struct {
StateList []State `json:"state_list"`
}

type Response struct {
Method string `json:"method"`
Params []map[string]Ticker `json:"params"`
ID *int `json:"id"`
Method string `json:"method"`
Data Data `json:"data"`
ID *int `json:"id"`
}
54 changes: 27 additions & 27 deletions node/pkg/websocketfetcher/providers/coinex/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,34 @@ import (
func ResponseToFeedDataList(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) {
feedDataList := []*common.FeedData{}

for _, item := range data.Params {
for key, value := range item {
id, exists := feedMap[key]
if !exists {
log.Warn().Str("Player", "Coinex").Str("key", key).Msg("feed not found")
continue
}
price, err := common.PriceStringToFloat64(value.Last)
if err != nil {
log.Error().Str("Player", "Coinex").Err(err).Msg("error in PriceStringToFloat64")
continue
}
volume, err := common.VolumeStringToFloat64(value.Volume)
if err != nil {
log.Error().Str("Player", "Coinex").Err(err).Msg("error in VolumeStringToFloat64")
continue
}
timestamp := time.Now()

for _, id := range id {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = price
feedData.Timestamp = &timestamp
feedData.Volume = volume
feedDataList = append(feedDataList, feedData)
}
for _, item := range data.Data.StateList {

id, exists := feedMap[item.Market]
if !exists {
log.Warn().Str("Player", "Coinex").Str("key", item.Market).Msg("feed not found")
continue
}
price, err := common.PriceStringToFloat64(item.Last)
if err != nil {
log.Error().Str("Player", "Coinex").Err(err).Msg("error in PriceStringToFloat64")
continue
}
volume, err := common.VolumeStringToFloat64(item.Volume)
if err != nil {
log.Error().Str("Player", "Coinex").Err(err).Msg("error in VolumeStringToFloat64")
continue
}
timestamp := time.Now()

for _, id := range id {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = price
feedData.Timestamp = &timestamp
feedData.Volume = volume
feedDataList = append(feedDataList, feedData)
}

}

return feedDataList, nil
Expand Down
75 changes: 34 additions & 41 deletions node/pkg/websocketfetcher/tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,46 +489,39 @@ func TestMessageToStruct(t *testing.T) {

t.Run("TestMessageToStructCoinex", func(t *testing.T) {
jsonStr := `{
"method": "state.update",
"params": [
{
"BTCUSDT": {
"open": "68217.41",
"last": "67649.41",
"high": "69037.30",
"low": "66678.34",
"deal": "12707367.97477132400000000000",
"volume": "187.73201806",
"sell_total": "4.47703848",
"buy_total": "2.37157686",
"period": 86400
},
"ETHUSDT": {
"open": "3781.50",
"last": "3786.55",
"high": "3844.89",
"low": "3725.78",
"deal": "5307095.72019325730000000000",
"volume": "1402.85217886",
"sell_total": "68.74665541",
"buy_total": "76.70077360",
"period": 86400
},
"MATICUSDT": {
"open": "0.6971",
"last": "0.6975",
"high": "0.709000000000",
"low": "0.6832",
"deal": "184896.63142549148200000000",
"volume": "265725.85573677",
"sell_total": "8251.71403154",
"buy_total": "8521.89898481",
"period": 86400
}
}
],
"id": null
}`
"method": "state.update",
"data": {
"state_list": [
{
"market": "LATUSDT",
"last": "0.008157",
"open": "0.008286",
"close": "0.008157",
"high": "0.008390",
"low": "0.008106",
"volume": "807714.49139758",
"volume_sell": "286170.69645599",
"volume_buy": "266161.23236408",
"value": "6689.21644207",
"period": 86400
},
{
"market": "ELONUSDT",
"last": "0.000000152823",
"open": "0.000000158650",
"close": "0.000000152823",
"high": "0.000000159474",
"low": "0.000000147026",
"volume": "88014042237.15",
"volume_sell": "11455578769.13",
"volume_buy": "17047669612.10",
"value": "13345.65122447",
"period": 86400
}
]
},
"id": null
}`
var txResult map[string]any
err := json.Unmarshal([]byte(jsonStr), &txResult)
if err != nil {
Expand All @@ -541,7 +534,7 @@ func TestMessageToStruct(t *testing.T) {
}

assert.Equal(t, "state.update", txData.Method)
assert.Equal(t, "67649.41", txData.Params[0]["BTCUSDT"].Last)
assert.Equal(t, "0.008157", txData.Data.StateList[0].Last)
})

t.Run("TestMessageToStructBitstamp", func(t *testing.T) {
Expand Down

0 comments on commit 28f78c5

Please sign in to comment.