Skip to content

Commit

Permalink
[okex.com] websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
nntaoli committed Mar 11, 2019
1 parent c181c00 commit f9d0bde
Show file tree
Hide file tree
Showing 11 changed files with 895 additions and 137 deletions.
25 changes: 25 additions & 0 deletions Adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,28 @@ func AdaptTradeSide(side string) TradeSide {
return -1
}
}

func AdaptKlinePeriodForOKEx(period int) string {
switch period {
case KLINE_PERIOD_1MIN:
return "1min"
case KLINE_PERIOD_5MIN:
return "5min"
case KLINE_PERIOD_15MIN:
return "15min"
case KLINE_PERIOD_30MIN:
return "30min"
case KLINE_PERIOD_1H:
return "1hour"
case KLINE_PERIOD_4H:
return "4hour"
case KLINE_PERIOD_1DAY:
return "day"
case KLINE_PERIOD_2H:
return "2hour"
case KLINE_PERIOD_1WEEK:
return "week"
default:
return "day"
}
}
27 changes: 18 additions & 9 deletions Metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,24 @@ type Account struct {
}

type Ticker struct {
ContractType string `json:"omitempty"`
Pair CurrencyPair `json:"omitempty"`
Last float64 `json:"last"`
Buy float64 `json:"buy"`
Sell float64 `json:"sell"`
High float64 `json:"high"`
Low float64 `json:"low"`
Vol float64 `json:"vol"`
Date uint64 `json:"date"` // 单位:秒(second)
Pair CurrencyPair `json:"omitempty"`
Last float64 `json:"last,string"`
Buy float64 `json:"buy,string"`
Sell float64 `json:"sell,string"`
High float64 `json:"high,string"`
Low float64 `json:"low,string"`
Vol float64 `json:"vol,string"`
Date uint64 `json:"date"` // 单位:秒(second)
}

type FutureTicker struct {
*Ticker
ContractType string `json:"omitempty"`
ContractId int `json:"contractId"`
LimitHigh float64 `json:"limitHigh,string"`
LimitLow float64 `json:"limitLow,string"`
HoldAmount float64 `json:"hold_amount,string"`
UnitAmount float64 `json:"unitAmount,string"`
}

type DepthRecord struct {
Expand Down
41 changes: 41 additions & 0 deletions Utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package goex

import (
"bytes"
"compress/flate"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"strconv"
)
Expand Down Expand Up @@ -61,6 +66,30 @@ func ToUint64(v interface{}) uint64 {
}
}

func ToInt64(v interface{}) int64 {
if v == nil {
return 0
}

switch v.(type) {
case float64:
return int64(v.(float64))
default:
vv := fmt.Sprint(v)

if vv == "" {
return 0
}

vvv, err := strconv.ParseInt(vv, 0, 64)
if err != nil {
return 0
}

return vvv
}
}

func ValuesToJson(v url.Values) ([]byte, error) {
parammap := make(map[string]interface{})
for k, vv := range v {
Expand All @@ -72,3 +101,15 @@ func ValuesToJson(v url.Values) ([]byte, error) {
}
return json.Marshal(parammap)
}

func GzipUnCompress(data []byte) ([]byte, error) {
r, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
return ioutil.ReadAll(r)
}

func FlateUnCompress(data []byte) ([]byte, error) {
return ioutil.ReadAll(flate.NewReader(bytes.NewReader(data)))
}
2 changes: 1 addition & 1 deletion okcoin/OKCoin_CN.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var _INERNAL_KLINE_PERIOD_CONVERTER = map[int]string{
// }
//}

func New(client *http.Client, api_key, secret_key string) *OKCoinCN_API {
func NewOKCoinCn(client *http.Client, api_key, secret_key string) *OKCoinCN_API {
return &OKCoinCN_API{client, api_key, secret_key, "https://www.okex.com/api/v1/"}
}

Expand Down
2 changes: 1 addition & 1 deletion okcoin/OKCoin_CN_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"testing"
)

var okcn = New(http.DefaultClient, "", "")
var okcn = NewOKCoinCn(http.DefaultClient, "", "")

func TestOKCoinCN_API_GetTicker(t *testing.T) {
ticker, _ := okcn.GetTicker(goex.BTC_CNY)
Expand Down
187 changes: 187 additions & 0 deletions okcoin/OKEx_Future_Ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package okcoin

import (
"encoding/json"
"errors"
"fmt"
. "github.com/nntaoli-project/GoEx"
"log"
"strings"
"sync"
"time"
)

type OKExFutureWs struct {
*WsBuilder
sync.Once
wsConn *WsConn

tickerCallback func(*FutureTicker)
depthCallback func(*Depth)
tradeCallback func(*Trade, string)
}

func NewOKExFutureWs() *OKExFutureWs {
okWs := &OKExFutureWs{WsBuilder: NewWsBuilder()}
okWs.WsBuilder = okWs.WsBuilder.
WsUrl("wss://real.okex.com:10440/ws/v1").
Heartbeat([]byte("{\"event\": \"ping\"} "), 30*time.Second).
ReconnectIntervalTime(24 * time.Hour).
UnCompressFunc(FlateUnCompress).
ProtoHandleFunc(okWs.handle)
return okWs
}

func (okWs *OKExFutureWs) SetCallbacks(tickerCallback func(*FutureTicker),
depthCallback func(*Depth),
tradeCallback func(*Trade, string)) {
okWs.tickerCallback = tickerCallback
okWs.depthCallback = depthCallback
okWs.tradeCallback = tradeCallback
}

func (okWs *OKExFutureWs) SubscribeTicker(pair CurrencyPair, contract string) error {
if okWs.tickerCallback == nil {
return errors.New("please set ticker callback func")
}
return okWs.subscribe(map[string]interface{}{
"event": "addChannel",
"channel": fmt.Sprintf("ok_sub_futureusd_%s_ticker_%s", strings.ToLower(pair.CurrencyA.Symbol), contract)})
}

func (okWs *OKExFutureWs) SubscribeDepth(pair CurrencyPair, contract string, size int) error {
if okWs.depthCallback == nil {
return errors.New("please set depth callback func")
}
return okWs.subscribe(map[string]interface{}{
"event": "addChannel",
"channel": fmt.Sprintf("ok_sub_futureusd_%s_depth_%s_%d", strings.ToLower(pair.CurrencyA.Symbol), contract, size)})
}

func (okWs *OKExFutureWs) SubscribeTrade(pair CurrencyPair, contract string) error {
if okWs.tradeCallback == nil {
return errors.New("please set trade callback func")
}
return okWs.subscribe(map[string]interface{}{
"event": "addChannel",
"channel": fmt.Sprintf("ok_sub_futureusd_%s_trade_%s", strings.ToLower(pair.CurrencyA.Symbol), contract)})
}

func (okWs *OKExFutureWs) subscribe(sub map[string]interface{}) error {
okWs.connectWs()
return okWs.wsConn.Subscribe(sub)
}

func (okWs *OKExFutureWs) connectWs() {
okWs.Do(func() {
okWs.wsConn = okWs.WsBuilder.Build()
okWs.wsConn.ReceiveMessage()
})
}

func (okWs *OKExFutureWs) handle(msg []byte) error {
//log.Println(string(msg))
if string(msg) == "{\"event\":\"pong\"}" {
// log.Println(string(msg))
okWs.wsConn.UpdateActiveTime()
return nil
}

var resp []WsBaseResp
err := json.Unmarshal(msg, &resp)
if err != nil {
return err
}

if len(resp) < 0 {
return nil
}

if resp[0].Channel == "addChannel" {
log.Println("subscribe:", string(resp[0].Data))
return nil
}

pair, contract, ch := okWs.parseChannel(resp[0].Channel)

if ch == "ticker" {
var t FutureTicker
err := json.Unmarshal(resp[0].Data, &t)
if err != nil {
return err
}
t.ContractType = contract
t.Pair = pair
okWs.tickerCallback(&t)
return nil
}

if ch == "depth" {
var (
d Depth
data struct {
Asks [][]float64 `json:"asks"`
Bids [][]float64 `json:"bids"`
Timestamp int64 `json:"timestamp"`
}
)

err := json.Unmarshal(resp[0].Data, &data)
if err != nil {
return err
}

for _, a := range data.Asks {
d.AskList = append(d.AskList, DepthRecord{a[0], a[1]})
}

for _, b := range data.Bids {
d.BidList = append(d.BidList, DepthRecord{b[0], b[1]})
}

d.Pair = pair
d.ContractType = contract
d.UTime = time.Unix(data.Timestamp/1000, 0)
okWs.depthCallback(&d)

return nil
}

if ch == "trade" {
var data TradeData
err := json.Unmarshal(resp[0].Data, &data)
if err != nil {
return err
}

for _, td := range data {
side := TradeSide(SELL)
if td[4] == "bid" {
side = BUY
}
okWs.tradeCallback(&Trade{Pair: pair, Tid: ToInt64(td[0]), Price: ToFloat64(td[1]),
Amount: ToFloat64(td[2]), Type: side, Date: okWs.adaptTime(td[3])}, contract)
}

return nil
}

return errors.New("unknown channel for " + resp[0].Channel)
}

func (okWs *OKExFutureWs) parseChannel(channel string) (pair CurrencyPair, contract string, ch string) {
metas := strings.Split(channel, "_")
pair = NewCurrencyPair2(strings.ToUpper(metas[3] + "_USD"))
contract = metas[5]
ch = metas[4]
return pair, contract, ch
}

func (okWs *OKExFutureWs) adaptTime(tm string) int64 {
format := "2006-01-02 15:04:05"
day := time.Now().Format("2006-01-02")
local, _ := time.LoadLocation("Asia/Chongqing")
t, _ := time.ParseInLocation(format, day+" "+tm, local)
return t.UnixNano() / 1e6

}
25 changes: 25 additions & 0 deletions okcoin/OKEx_Future_Ws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package okcoin

import (
"github.com/nntaoli-project/GoEx"
"testing"
"time"
)

func TestNewOKExFutureWs(t *testing.T) {
okWs := NewOKExFutureWs()
okWs.ErrorHandleFunc(func(err error) {
t.Log(err)
})
okWs.SetCallbacks(func(ticker *goex.FutureTicker) {
t.Log(ticker, ticker.Ticker)
}, func(depth *goex.Depth) {
t.Log(depth.ContractType, depth.Pair, depth.AskList, depth.BidList)
}, func(trade *goex.Trade, contract string) {
t.Log(contract, trade)
})
okWs.SubscribeTicker(goex.LTC_USD, goex.QUARTER_CONTRACT)
okWs.SubscribeDepth(goex.LTC_USD, goex.QUARTER_CONTRACT, 5)
okWs.SubscribeTrade(goex.LTC_USD, goex.QUARTER_CONTRACT)
time.Sleep(3 * time.Minute)
}
Loading

0 comments on commit f9d0bde

Please sign in to comment.