Skip to content

Commit

Permalink
Merge pull request #146 from JacobPlaster/add-update-order-to-v2-ws
Browse files Browse the repository at this point in the history
Add update order to v2 ws
  • Loading branch information
prdn authored Jan 15, 2019
2 parents 0168843 + 9a0870f commit 48518f6
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 32 deletions.
71 changes: 71 additions & 0 deletions examples/v2/ws-update-order/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"log"
"os"
"time"
"context"

"github.com/bitfinexcom/bitfinex-api-go/v2"
"github.com/bitfinexcom/bitfinex-api-go/v2/websocket"
)

func SubmitTestOrder(c *websocket.Client) {
log.Printf("Submitting new order")
err := c.SubmitOrder(context.Background(), &bitfinex.OrderNewRequest{
Symbol: "tBTCUSD",
CID: 123,
Amount: 0.02,
Type: "EXCHANGE LIMIT",
Price: 5000,
})
if err != nil {
log.Fatal(err)
}
}

func UpdateTestOrder(orderId int64, c *websocket.Client) {
log.Printf("Updating order")
err := c.SubmitUpdateOrder(context.Background(), &bitfinex.OrderUpdateRequest{
ID: orderId,
Amount: 0.04,
})
if err != nil {
log.Fatal(err)
}
}

func main() {
key := os.Getenv("BFX_KEY")
secret := os.Getenv("BFX_SECRET")
p := websocket.NewDefaultParameters()
p.URL = "wss://test.bitfinex.com/ws/2"
c := websocket.NewWithParams(p).Credentials(key, secret)

err := c.Connect()
if err != nil {
log.Fatalf("connecting authenticated websocket: %s", err)
}
defer c.Close()

// Begin listening to incoming messages

for obj := range c.Listen() {
switch obj.(type) {
case error:
log.Printf("channel closed: %s", obj)
break
case *websocket.AuthEvent:
// on authorize create new order
SubmitTestOrder(c)
case *bitfinex.OrderNew:
// new order received so update it
id := obj.(*bitfinex.OrderNew).ID
UpdateTestOrder(id, c)
default:
log.Printf("MSG RECV: %#v", obj)
}
}

time.Sleep(time.Second * 10)
}
18 changes: 18 additions & 0 deletions tests/integration/v2/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type listener struct {
marginUpdate chan *bitfinex.MarginInfoUpdate
funding chan *bitfinex.FundingInfo
orderNew chan *bitfinex.OrderNew
orderUpdate chan *bitfinex.OrderUpdate
errors chan error
}

Expand All @@ -51,6 +52,7 @@ func newListener() *listener {
marginBase: make(chan *bitfinex.MarginInfoBase, 10),
marginUpdate: make(chan *bitfinex.MarginInfoUpdate, 10),
orderNew: make(chan *bitfinex.OrderNew, 10),
orderUpdate: make(chan *bitfinex.OrderUpdate, 10),
funding: make(chan *bitfinex.FundingInfo, 10),
}
}
Expand Down Expand Up @@ -307,6 +309,20 @@ func (l *listener) nextOrderNew() (*bitfinex.OrderNew, error) {
}
}

func (l *listener) nextOrderUpdate() (*bitfinex.OrderUpdate, error) {
timeout := make(chan bool)
go func() {
time.Sleep(time.Second * 2)
close(timeout)
}()
select {
case ev := <-l.orderUpdate:
return ev, nil
case <-timeout:
return nil, errors.New("timed out waiting for OrderUpdate")
}
}

// strongly types messages and places them into a channel
func (l *listener) run(ch <-chan interface{}) {
go func() {
Expand Down Expand Up @@ -351,6 +367,8 @@ func (l *listener) run(ch <-chan interface{}) {
l.marginUpdate <- msg.(*bitfinex.MarginInfoUpdate)
case *bitfinex.OrderNew:
l.orderNew <- msg.(*bitfinex.OrderNew)
case *bitfinex.OrderUpdate:
l.orderUpdate <- msg.(*bitfinex.OrderUpdate)
case *bitfinex.FundingInfo:
l.funding <- msg.(*bitfinex.FundingInfo)
case *bitfinex.PositionSnapshot:
Expand Down
96 changes: 96 additions & 0 deletions tests/integration/v2/mock_ws_private_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,3 +439,99 @@ func TestCancel(t *testing.T) {
}
assert(t, &bitfinex.OrderCancel{ID: 1234567, CID: 123, Symbol: "tBTCUSD", MTSCreated: 1515179518260, MTSUpdated: 1515179520203, Type: "LIMIT", Status: "CANCELED", Price: 900.0, Amount: 1, AmountOrig: 1}, oc)
}

func TestUpdateOrder(t *testing.T) {
// create transport & nonce mocks
async := newTestAsync()
nonce := &IncrementingNonceGenerator{}

// create client
ws := websocket.NewWithAsyncFactoryNonce(newTestAsyncFactory(async), nonce).Credentials("apiKeyABC", "apiSecretXYZ")

// setup listener
listener := newListener()
listener.run(ws.Listen())

// set ws options
//ws.SetReadTimeout(time.Second * 2)
ws.Connect()
defer ws.Close()

// begin test
async.Publish(`{"event":"info","version":2}`)
_, err := listener.nextInfoEvent()
if err != nil {
t.Fatal(err)
}

// initial logon info--Authentication & WalletUpdate assertions in prior tests
async.Publish(`{"event":"auth","status":"OK","chanId":0,"userId":1,"subId":"nonce1","auth_id":"valid-auth-guid","caps":{"orders":{"read":1,"write":0},"account":{"read":1,"write":0},"funding":{"read":1,"write":0},"history":{"read":1,"write":0},"wallets":{"read":1,"write":0},"withdraw":{"read":0,"write":0},"positions":{"read":1,"write":0}}}`)
async.Publish(`[0,"ps",[["tBTCUSD","ACTIVE",7,916.52002351,0,0,null,null,null,null]]]`)
async.Publish(`[0,"ws",[["exchange","BTC",30,0,null],["exchange","USD",80000,0,null],["exchange","ETH",100,0,null],["margin","BTC",10,0,null],["margin","USD",9987.16871968,0,null],["funding","BTC",10,0,null],["funding","USD",10000,0,null]]]`)
// consume & assert snapshots
listener.nextPositionSnapshot()
listener.nextWalletSnapshot()

// submit order
err = ws.SubmitOrder(context.Background(), &bitfinex.OrderNewRequest{
Symbol: "tBTCUSD",
CID: 123,
Amount: -0.456,
Type: "LIMIT",
Price: 900.0,
})
if err != nil {
t.Fatal(err)
}

// assert outgoing order request
if len(async.Sent) <= 1 {
t.Fatalf("expected >1 sent messages, got %d", len(async.Sent))
}
assert(t, &bitfinex.OrderNewRequest{Symbol: "tBTCUSD", CID: 123, Amount: -0.456, Type: "LIMIT", Price: 900.0}, async.Sent[1].(*bitfinex.OrderNewRequest))

// order pending new
async.Publish(`[0,"n",[null,"on-req",null,null,[1234567,null,123,"tBTCUSD",null,null,1,1,"LIMIT",null,null,null,null,null,null,null,900,null,null,null,null,null,null,0,null,null],null,"SUCCESS","Submitting limit buy order for 1.0 BTC."]]`)
// order working--limit order
async.Publish(`[0,"on",[1234567,0,123,"tBTCUSD",1515179518260,1515179518315,1,1,"LIMIT",null,null,null,0,"ACTIVE",null,null,900,0,null,null,null,null,null,0,0,0]]`)

// eat order ack notification
listener.nextNotification()

on, err := listener.nextOrderNew()
if err != nil {
t.Fatal(err)
}

// assert order new update
assert(t, &bitfinex.OrderNew{ID: 1234567, CID: 123, Symbol: "tBTCUSD", MTSCreated: 1515179518260, MTSUpdated: 1515179518315, Type: "LIMIT", Amount: 1, AmountOrig: 1, Status: "ACTIVE", Price: 900.0}, on)

// publish update request
req := &bitfinex.OrderUpdateRequest{
ID: on.ID,
Amount: 0.04,
Price: 1200,
}
pre := async.SentCount()
err = ws.SubmitUpdateOrder(context.Background(), req)
if err != nil {
t.Fatal(err)
}
if err := async.waitForMessage(pre); err != nil {
t.Fatal(err.Error())
}
// assert sent message
assert(t, req, async.Sent[pre].(*bitfinex.OrderUpdateRequest))

// cancel ack notify
async.Publish(`[0,"n",[1547469854094,"ou-req",null,null,[1234567,0,123,"tBTCUSD",1547469854025,1547469854042,0.04,0.04,"LIMIT",null,null,null,0,"ACTIVE",null,null,1200,0,0,0,null,null,null,0,0,null,null,null,"API>BFX",null,null,null],null,"SUCCESS","Submitting update to exchange limit buy order for 0.04 BTC."]]`)
// cancel confirm
async.Publish(`[0,"ou",[1234567,0,123,"tBTCUSD",1547469854025,1547469854121,0.04,0.04,"LIMIT",null,null,null,0,"ACTIVE",null,null,1200,0,0,0,null,null,null,0,0,null,null,null,"API>BFX",null,null,null]]`)

// assert cancel ack
ou, err := listener.nextOrderUpdate()
if err != nil {
t.Fatal(err)
}
assert(t, &bitfinex.OrderUpdate{ID:1234567, GID:0, CID:123, Symbol:"tBTCUSD", MTSCreated:1547469854025, MTSUpdated:1547469854121, Amount:0.04, AmountOrig:0.04, Type:"LIMIT", TypePrev:"", Flags:0, Status:"ACTIVE", Price:1200, PriceAvg:0, PriceTrailing:0, PriceAuxLimit:0, Notify:false, Hidden:false, PlacedID:0}, ou)
}
63 changes: 63 additions & 0 deletions v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ type OrderNewRequest struct {
Price float64 `json:"price,string"`
PriceTrailing float64 `json:"price_trailing,string,omitempty"`
PriceAuxLimit float64 `json:"price_aux_limit,string,omitempty"`
PriceOcoStop float64 `json:"price_oco_stop,string,omitempty"`
Hidden bool `json:"hidden,omitempty"`
PostOnly bool `json:"postonly,omitempty"`
Close bool `json:"close,omitempty"`
OcoOrder bool `json:"oco_order,omitempty"`
TimeInForce string `json:"tif,omitempty"`
}

// MarshalJSON converts the order object into the format required by the bitfinex
Expand All @@ -165,6 +169,8 @@ func (o *OrderNewRequest) MarshalJSON() ([]byte, error) {
Price float64 `json:"price,string"`
PriceTrailing float64 `json:"price_trailing,string,omitempty"`
PriceAuxLimit float64 `json:"price_aux_limit,string,omitempty"`
PriceOcoStop float64 `json:"price_oco_stop,string,omitempty"`
TimeInForce string `json:"tif,omitempty"`
Flags int `json:"flags,omitempty"`
}{
GID: o.GID,
Expand All @@ -175,6 +181,7 @@ func (o *OrderNewRequest) MarshalJSON() ([]byte, error) {
Price: o.Price,
PriceTrailing: o.PriceTrailing,
PriceAuxLimit: o.PriceAuxLimit,
PriceOcoStop: o.PriceOcoStop,
}

if o.Hidden {
Expand All @@ -185,10 +192,66 @@ func (o *OrderNewRequest) MarshalJSON() ([]byte, error) {
aux.Flags = aux.Flags + OrderFlagPostOnly
}

if o.OcoOrder {
aux.Flags = aux.Flags + OrderFlagOCO
}

if o.Close {
aux.Flags = aux.Flags + OrderFlagClose
}

body := []interface{}{0, "on", nil, aux}
return json.Marshal(&body)
}

type OrderUpdateRequest struct {
ID int64 `json:"id"`
GID int64 `json:"gid,omitempty"`
Price float64 `json:"price,string,omitempty"`
Amount float64 `json:"amount,string,omitempty"`
Delta float64 `json:"price,string,omitempty"`
PriceTrailing float64 `json:"price_trailing,string,omitempty"`
PriceAuxLimit float64 `json:"price_aux_limit,string,omitempty"`
Hidden bool `json:"hidden,omitempty"`
PostOnly bool `json:"postonly,omitempty"`
}

// MarshalJSON converts the order object into the format required by the bitfinex
// websocket service.
func (o *OrderUpdateRequest) MarshalJSON() ([]byte, error) {
aux := struct {
ID int64 `json:"id"`
GID int64 `json:"gid,omitempty"`
Price float64 `json:"price,string,omitempty"`
Amount float64 `json:"amount,string,omitempty"`
Delta float64 `json:"price,string,omitempty"`
PriceTrailing float64 `json:"price_trailing,string,omitempty"`
PriceAuxLimit float64 `json:"price_aux_limit,string,omitempty"`
Hidden bool `json:"hidden,omitempty"`
PostOnly bool `json:"postonly,omitempty"`
Flags int `json:"flags,omitempty"`
}{
ID: o.ID,
GID: o.GID,
Amount: o.Amount,
Price: o.Price,
PriceTrailing: o.PriceTrailing,
PriceAuxLimit: o.PriceAuxLimit,
Delta: o.Delta,
}

if o.Hidden {
aux.Flags = aux.Flags + OrderFlagHidden
}

if o.PostOnly {
aux.Flags = aux.Flags + OrderFlagPostOnly
}

body := []interface{}{0, "ou", nil, aux}
return json.Marshal(&body)
}

// OrderCancelRequest represents an order cancel request.
// An order can be cancelled using the internal ID or a
// combination of Client ID (CID) and the daten for the given
Expand Down
4 changes: 4 additions & 0 deletions v2/websocket/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (c *Client) SubmitOrder(ctx context.Context, order *bitfinex.OrderNewReques
return c.asynchronous.Send(ctx, order)
}

func (c *Client) SubmitUpdateOrder(ctx context.Context, orderUpdate *bitfinex.OrderUpdateRequest) error {
return c.asynchronous.Send(ctx, orderUpdate)
}

// SubmitCancel sends a cancel request.
func (c *Client) SubmitCancel(ctx context.Context, cancel *bitfinex.OrderCancelRequest) error {
return c.asynchronous.Send(ctx, cancel)
Expand Down
47 changes: 26 additions & 21 deletions v2/websocket/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,36 @@ func (c *Client) handleChannel(msg []byte) error {
}

func (c *Client) handleChecksumChannel(chanId int64, checksum int) error {
sub, _ := c.subscriptions.lookupByChannelID(chanId)
sub, err := c.subscriptions.lookupByChannelID(chanId)
if err != nil {
return err
}
symbol := sub.Request.Symbol
// force to signed integer
bChecksum := uint32(checksum)
oChecksum := c.orderbooks[symbol].Checksum()
// compare bitfinex checksum with local checksum
if bChecksum == oChecksum {
log.Printf("Orderbook '%s' checksum verification successful.", symbol)
} else {
fmt.Printf("Orderbook '%s' checksum is invalid. Data Out of sync, reconnecting.", symbol)
err := c.sendUnsubscribeMessage(context.Background(), chanId)
if err != nil {
return err
}
sub, err := c.subscriptions.lookupByChannelID(chanId)
if err != nil {
return err
}
newSub := &SubscriptionRequest{
SubID: c.nonce.GetNonce(), // generate new subID
Event: sub.Request.Event,
Channel: sub.Request.Channel,
Symbol: sub.Request.Symbol,
if orderbook, ok := c.orderbooks[symbol]; ok {
oChecksum := orderbook.Checksum()
// compare bitfinex checksum with local checksum
if bChecksum == oChecksum {
log.Printf("Orderbook '%s' checksum verification successful.", symbol)
} else {
fmt.Printf("Orderbook '%s' checksum is invalid got %d bot got %d. Data Out of sync, reconnecting.",
symbol, bChecksum, oChecksum)
err := c.sendUnsubscribeMessage(context.Background(), chanId)
if err != nil {
return err
}
newSub := &SubscriptionRequest{
SubID: c.nonce.GetNonce(), // generate new subID
Event: sub.Request.Event,
Channel: sub.Request.Channel,
Symbol: sub.Request.Symbol,
}
_, err_sub := c.Subscribe(context.Background(), newSub)
if err_sub != nil {
log.Printf("could not resubscribe: %s", err_sub.Error())
}
}
c.Subscribe(context.Background(), newSub)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion v2/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func extractSymbolResolutionFromKey(subscription string) (symbol string, resolut
func (c *Client) registerPublicFactories() {
c.registerFactory(ChanTicker, newTickerFactory(c.subscriptions))
c.registerFactory(ChanTrades, newTradeFactory(c.subscriptions))
c.registerFactory(ChanBook, newBookFactory(c.subscriptions, c.orderbooks))
c.registerFactory(ChanBook, newBookFactory(c.subscriptions, c.orderbooks, c.parameters.ManageOrderbook))
c.registerFactory(ChanCandles, newCandlesFactory(c.subscriptions))
}

Expand Down
Loading

0 comments on commit 48518f6

Please sign in to comment.