Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rfq): active quoting API #3128

Merged
merged 109 commits into from
Oct 2, 2024
Merged
Changes from 1 commit
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
bb18412
WIP: initial websocket wiring
dwasse Sep 13, 2024
31e52d8
WIP: add ws client and handling
dwasse Sep 13, 2024
e8ab231
Fix: receive respsects context
dwasse Sep 13, 2024
782cffd
Cleanup: split into rfq.go
dwasse Sep 13, 2024
6344a37
Fix: build
dwasse Sep 13, 2024
8aa16cb
Feat: add mocked ws client
dwasse Sep 13, 2024
4764926
Fix: build
dwasse Sep 16, 2024
afb2f19
Feat: add SubscribeActiveQuotes() to client
dwasse Sep 16, 2024
e30cd63
Feat: add PutUserQuoteRequest() to api client
dwasse Sep 16, 2024
3c10c02
Fix: build
dwasse Sep 16, 2024
bdae4ca
WIP: rfq tests with ws auth
dwasse Sep 16, 2024
138297d
WIP: test fixes
dwasse Sep 17, 2024
fc1ea97
Feat: working TestHandleActiveRFQ
dwasse Sep 17, 2024
6ae7a71
Feat: add expired request case
dwasse Sep 17, 2024
7cdcade
WIP: functionalize test relayer resps
dwasse Sep 17, 2024
01d83dc
Feat: add runMockRelayer with multiple relayers
dwasse Sep 17, 2024
ee408a9
Feat: add MultipleRelayers case
dwasse Sep 17, 2024
94a8f4d
Feat: add FallbackToPassive case
dwasse Sep 17, 2024
c39d62c
Fix: bigint ptr issue
dwasse Sep 17, 2024
6beb23a
Cleanup: bump expiration window
dwasse Sep 17, 2024
fdf9d12
WIP: logs
dwasse Sep 17, 2024
e23175f
Feat: split into separate tests
dwasse Sep 17, 2024
4b99340
Cleanup: logs
dwasse Sep 17, 2024
c557a28
Feat: add PassiveBestQuote case
dwasse Sep 17, 2024
888ce50
WIP: update db interface with new models
dwasse Sep 17, 2024
3293166
Feat: impl new db funcs
dwasse Sep 17, 2024
63f1a1e
Feat: insert models within api server
dwasse Sep 17, 2024
594d6ea
Feat: update quote request / response statuses
dwasse Sep 17, 2024
7e7c5a1
Fix: db error handling
dwasse Sep 17, 2024
7dcdf59
Fix: api tests
dwasse Sep 17, 2024
8cae8e4
Feat: add initial response validation
dwasse Sep 17, 2024
94ee250
Feat: impl pingpong
dwasse Sep 18, 2024
46d04e2
Fix: register models
dwasse Sep 18, 2024
36701ba
Feat: verify quote request in SingleRelayer case
dwasse Sep 18, 2024
2616b54
Feat: verify more db requests
dwasse Sep 18, 2024
fe7a774
Cleanup: common vars
dwasse Sep 18, 2024
60db841
Cleanup: break down handleActiveRFQ into sub funcs
dwasse Sep 18, 2024
83b7f6d
Cleanup: comments
dwasse Sep 18, 2024
32065ee
Cleanup: remove unused mock
dwasse Sep 18, 2024
c5e9a00
Fix: builds
dwasse Sep 18, 2024
e7d08e7
Feat: make relayer response data optional to signify null resp
dwasse Sep 19, 2024
8e405e5
Fix: response primary key on quote id
dwasse Sep 19, 2024
c6db31f
Fix: build
dwasse Sep 19, 2024
7812573
Feat: update swagger docs
dwasse Sep 19, 2024
a01fb9a
WIP: generic pubsub
dwasse Sep 20, 2024
c27ef32
Feat: add basic PubSubManager
dwasse Sep 20, 2024
b296da8
Feat: implement subscription / unsubscription operations
dwasse Sep 20, 2024
4384fb2
Feat: respond to subscribe operation
dwasse Sep 20, 2024
be695ea
Feat: add runWsListener helper
dwasse Sep 20, 2024
5656bae
Cleanup: reduce chan buffer
dwasse Sep 20, 2024
1c3870c
Cleanup: lints
dwasse Sep 20, 2024
2051b30
Cleanup: break down into smaller funcs
dwasse Sep 20, 2024
f0928c4
Cleanup: refactor ws client
dwasse Sep 20, 2024
4683974
Cleanup: more lints
dwasse Sep 20, 2024
33c24a3
Fix: build
dwasse Sep 20, 2024
7aee229
Cleanup: lints
dwasse Sep 20, 2024
ff0aece
Feat: mark as fulfilled when updating request status
dwasse Sep 20, 2024
91c1bf5
Cleanup: lint
dwasse Sep 20, 2024
cdee6ea
Skip broken test for now
dwasse Sep 20, 2024
8ccbb3f
Cleanup: lint
dwasse Sep 20, 2024
f2920e2
Feat: add open_quote_requests endpoint with test
dwasse Sep 20, 2024
83a3603
Feat: add new open request model
dwasse Sep 20, 2024
f112235
Update swagger
dwasse Sep 20, 2024
292cd37
go mod tidy
trajan0x Sep 23, 2024
dd961c1
fix error
trajan0x Sep 23, 2024
2368313
Fix: respecting context
dwasse Sep 24, 2024
d7948d4
Replace: Fulfilled -> Closed
dwasse Sep 24, 2024
161ea2e
Cleanup: use errors.New()
dwasse Sep 24, 2024
c240cd3
Feat: ReceiveQuoteResponse specifies request id
dwasse Sep 25, 2024
c8b5435
Cleanup: remove logs
dwasse Sep 25, 2024
7fa8003
Feat: add some tracing
dwasse Sep 25, 2024
b05e6b7
Feat: add IntegratorID
dwasse Sep 25, 2024
f2a4be9
Feat: remove repetitive fields from relayer quote response, move requ…
dwasse Sep 25, 2024
f203e7c
Cleanup: use new routes
dwasse Sep 25, 2024
0835aae
Cleanup: migrate req/res struct naming
dwasse Sep 25, 2024
2996aaa
Cleanup: update swagger
dwasse Sep 25, 2024
89c565e
Cleanup: lint
dwasse Sep 25, 2024
0a2b46a
[goreleaser]
dwasse Sep 25, 2024
8850cf0
Feat: run ws endpoint within existing server
dwasse Sep 27, 2024
2bae6b1
[goreleaser]
dwasse Sep 27, 2024
af384d4
Fix: build
dwasse Sep 27, 2024
3ae9552
[goreleaser]
dwasse Sep 27, 2024
d3f839f
Feat: add more tracing
dwasse Sep 27, 2024
925617e
[goreleaser]
dwasse Sep 27, 2024
7ff7c81
feat(rfq-relayer): relayer supports active quoting (#3198)
dwasse Sep 30, 2024
99c9d5c
Fix: build
dwasse Sep 30, 2024
6d6d172
Cleanup: lint
dwasse Sep 30, 2024
c40dada
Cleanup: lint
dwasse Oct 1, 2024
7878364
Cleanup: update swagger
dwasse Oct 1, 2024
2c46bcb
Feat: client sends pings, server sends pongs
dwasse Oct 1, 2024
1025c6c
[goreleaser]
dwasse Oct 1, 2024
65ddc92
Cleanup: remove unused func
dwasse Oct 1, 2024
16b3a5b
WIP: ws error handling
dwasse Oct 1, 2024
d71d686
[goreleaser]
dwasse Oct 1, 2024
a0591d6
Feat: ws client uses errgroup
dwasse Oct 1, 2024
3bc93ab
Cleanup: remove log
dwasse Oct 1, 2024
aa50d07
[goreleaser]
dwasse Oct 1, 2024
b4a25e1
Replace: PutUserQuoteResponse -> PutRFQResponse
dwasse Oct 1, 2024
26c6bbc
Feat: add QuoteID to PutRFQResponse
dwasse Oct 1, 2024
04ff76b
[goreleaser]
dwasse Oct 1, 2024
3324e53
Cleanup: lint
dwasse Oct 2, 2024
cb7dde0
Fix: build
dwasse Oct 2, 2024
cbc6e18
Cleanup: lint
dwasse Oct 2, 2024
5cb6050
[goreleaser]
dwasse Oct 2, 2024
e687ece
Add logs
dwasse Oct 2, 2024
c8a5868
[goreleaser]
dwasse Oct 2, 2024
8bad457
Add logs
dwasse Oct 2, 2024
7e88a97
[goreleaser]
dwasse Oct 2, 2024
526f2af
Cleanup: remove logs
dwasse Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleanup: remove unused func
dwasse committed Oct 1, 2024

Verified

This commit was signed with the committer’s verified signature.
jasontedor Jason Tedor
commit 65ddc921c508c7de5be4bf6c8d208d3a84e7850d
20 changes: 0 additions & 20 deletions services/rfq/api/rest/ws.go
Original file line number Diff line number Diff line change
@@ -51,10 +51,10 @@
case c.requestChan <- quoteRequest:
// successfully sent, register a response channel
c.responseChans.Store(quoteRequest.RequestID, make(chan *model.WsRFQResponse))
case <-c.doneChan:
return fmt.Errorf("websocket client is closed")
case <-ctx.Done():
return nil

Check warning on line 57 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L54-L57

Added lines #L54 - L57 were not covered by tests
}
return nil
}
@@ -62,8 +62,8 @@
func (c *wsClient) ReceiveQuoteResponse(ctx context.Context, requestID string) (resp *model.WsRFQResponse, err error) {
responseChan, ok := c.responseChans.Load(requestID)
if !ok {
return nil, fmt.Errorf("no response channel for request %s", requestID)
}

Check warning on line 66 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L65-L66

Added lines #L65 - L66 were not covered by tests
defer c.responseChans.Delete(requestID)

for {
@@ -71,10 +71,10 @@
case resp = <-responseChan:
// successfully received
return resp, nil
case <-c.doneChan:
return nil, fmt.Errorf("websocket client is closed")
case <-ctx.Done():
return nil, fmt.Errorf("expiration reached without response")

Check warning on line 77 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L74-L77

Added lines #L74 - L77 were not covered by tests
}
}
}
@@ -114,24 +114,24 @@
case <-ctx.Done():
err = c.conn.Close()
if err != nil {
return fmt.Errorf("error closing websocket connection: %w", err)
}

Check warning on line 118 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L117-L118

Added lines #L117 - L118 were not covered by tests
close(c.doneChan)
return nil
case req := <-c.requestChan:
err = c.sendRelayerRequest(ctx, req)
if err != nil {
logger.Error("Error sending quote request: %s", err)
}

Check warning on line 125 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L124-L125

Added lines #L124 - L125 were not covered by tests
case msg := <-messageChan:
err = c.handleRelayerMessage(ctx, msg)
if err != nil {
logger.Error("Error handling relayer message: %s", err)
return fmt.Errorf("error handling relayer message: %w", err)
}
case <-c.pingTicker.C:
// ping timed out, close the connection
cancel()

Check warning on line 134 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L129-L134

Added lines #L129 - L134 were not covered by tests
}
}
}
@@ -159,71 +159,71 @@

rawData, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("error marshaling quote request: %w", err)
}

Check warning on line 163 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L162-L163

Added lines #L162 - L163 were not covered by tests
msg := model.ActiveRFQMessage{
Op: RequestQuoteOp,
Content: json.RawMessage(rawData),
}
err = c.conn.WriteJSON(msg)
if err != nil {
return fmt.Errorf("error sending quote request: %w", err)
}

Check warning on line 171 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L170-L171

Added lines #L170 - L171 were not covered by tests

return nil
}

// handleRelayerMessage handles messages from the relayer.
// An error returned will result in the websocket connection being closed.
func (c *wsClient) handleRelayerMessage(ctx context.Context, msg []byte) (err error) {

Check failure on line 178 in services/rfq/api/rest/ws.go

GitHub Actions / Lint (services/rfq)

calculated cyclomatic complexity for function handleRelayerMessage is 11, max is 10 (cyclop)
var rfqMsg model.ActiveRFQMessage
err = json.Unmarshal(msg, &rfqMsg)
if err != nil {
return fmt.Errorf("error unmarshaling websocket message: %w", err)
}

Check warning on line 183 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L182-L183

Added lines #L182 - L183 were not covered by tests

switch rfqMsg.Op {
case PingOp:
c.lastPing = time.Now()
resp := c.handlePing(ctx)
err = c.conn.WriteJSON(resp)
if err != nil {
return fmt.Errorf("error sending ping response: %w", err)
}

Check warning on line 192 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L186-L192

Added lines #L186 - L192 were not covered by tests
case SubscribeOp:
resp := c.handleSubscribe(ctx, rfqMsg.Content)
err = c.conn.WriteJSON(resp)
if err != nil {
return fmt.Errorf("error sending subscribe response: %w", err)
}
case UnsubscribeOp:
resp := c.handleUnsubscribe(ctx, rfqMsg.Content)
err = c.conn.WriteJSON(resp)
if err != nil {
return fmt.Errorf("error sending unsubscribe response: %w", err)
}

Check warning on line 204 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L197-L204

Added lines #L197 - L204 were not covered by tests
case SendQuoteOp:
err = c.handleSendQuote(ctx, rfqMsg.Content)
logger.Errorf("error handling send quote: %v", err)
case PongOp:
c.lastPing = time.Now()
default:
logger.Errorf("received unexpected operation from relayer: %s", rfqMsg.Op)
return nil

Check warning on line 212 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L208-L212

Added lines #L208 - L212 were not covered by tests
}

return nil
}

func (c *wsClient) handlePing(ctx context.Context) (resp model.ActiveRFQMessage) {
_, span := c.handler.Tracer().Start(ctx, "handlePing", trace.WithAttributes(
attribute.String("relayer_address", c.relayerAddr),
))
defer func() {
metrics.EndSpan(span)
}()

Check warning on line 224 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L218-L224

Added lines #L218 - L224 were not covered by tests

return getSuccessResponse(PongOp)

Check warning on line 226 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L226

Added line #L226 was not covered by tests
}

func (c *wsClient) handleSubscribe(ctx context.Context, content json.RawMessage) (resp model.ActiveRFQMessage) {
@@ -237,35 +237,35 @@
var sub model.SubscriptionParams
err := json.Unmarshal(content, &sub)
if err != nil {
return getErrorResponse(SubscribeOp, fmt.Errorf("could not unmarshal subscription params: %w", err))
}

Check warning on line 241 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L240-L241

Added lines #L240 - L241 were not covered by tests
span.SetAttributes(attribute.IntSlice("chain_ids", sub.Chains))
err = c.pubsub.AddSubscription(c.relayerAddr, sub)
if err != nil {
return getErrorResponse(SubscribeOp, fmt.Errorf("error adding subscription: %w", err))
}

Check warning on line 246 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L245-L246

Added lines #L245 - L246 were not covered by tests
return getSuccessResponse(SubscribeOp)
}

func (c *wsClient) handleUnsubscribe(ctx context.Context, content json.RawMessage) (resp model.ActiveRFQMessage) {
_, span := c.handler.Tracer().Start(ctx, "handleUnsubscribe", trace.WithAttributes(
attribute.String("relayer_address", c.relayerAddr),
))
defer func() {
metrics.EndSpan(span)
}()

Check warning on line 256 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L250-L256

Added lines #L250 - L256 were not covered by tests

var sub model.SubscriptionParams
err := json.Unmarshal(content, &sub)
if err != nil {
return getErrorResponse(UnsubscribeOp, fmt.Errorf("could not unmarshal subscription params: %w", err))
}
span.SetAttributes(attribute.IntSlice("chain_ids", sub.Chains))
err = c.pubsub.RemoveSubscription(c.relayerAddr, sub)
if err != nil {
return getErrorResponse(UnsubscribeOp, fmt.Errorf("error removing subscription: %w", err))
}
return getSuccessResponse(UnsubscribeOp)

Check warning on line 268 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L258-L268

Added lines #L258 - L268 were not covered by tests
}

func (c *wsClient) handleSendQuote(ctx context.Context, content json.RawMessage) (err error) {
@@ -280,41 +280,21 @@
var resp model.WsRFQResponse
err = json.Unmarshal(content, &resp)
if err != nil {
return fmt.Errorf("error unmarshaling websocket message: %w", err)
}

Check warning on line 284 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L283-L284

Added lines #L283 - L284 were not covered by tests
span.SetAttributes(
attribute.String("request_id", resp.RequestID),
attribute.String("dest_amount", resp.DestAmount),
)
responseChan, ok := c.responseChans.Load(resp.RequestID)
if !ok {
return fmt.Errorf("no response channel for request %s", resp.RequestID)
}

Check warning on line 292 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L291-L292

Added lines #L291 - L292 were not covered by tests
responseChan <- &resp

return nil
}

func (c *wsClient) trySendPong(lastPing time.Time) (err error) {
if time.Since(lastPing) > pingPeriod && !lastPing.IsZero() {
err = c.conn.Close()
if err != nil {
return fmt.Errorf("error closing websocket connection: %w", err)
}
close(c.doneChan)
return fmt.Errorf("ping not received in time")
}
pingMsg := model.ActiveRFQMessage{
Op: PongOp,
}
err = c.conn.WriteJSON(pingMsg)
if err != nil {
return fmt.Errorf("error sending ping message: %w", err)
}

return nil
}

func getSuccessResponse(op string) model.ActiveRFQMessage {
return model.ActiveRFQMessage{
Op: op,
@@ -322,10 +302,10 @@
}
}

func getErrorResponse(op string, err error) model.ActiveRFQMessage {
return model.ActiveRFQMessage{
Op: op,
Success: false,
Content: json.RawMessage(fmt.Sprintf("{\"reason\": \"%s\"}", err.Error())),
}

Check warning on line 310 in services/rfq/api/rest/ws.go

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L305-L310

Added lines #L305 - L310 were not covered by tests
}
Loading