Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rfq): active quoting API #3128

Merged
merged 109 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
bb18412
WIP: initial websocket wiring
dwasse Sep 13, 2024
31e52d8
WIP: add ws client and handling
dwasse Sep 13, 2024
e8ab231
Fix: receive respsects context
dwasse Sep 13, 2024
782cffd
Cleanup: split into rfq.go
dwasse Sep 13, 2024
6344a37
Fix: build
dwasse Sep 13, 2024
8aa16cb
Feat: add mocked ws client
dwasse Sep 13, 2024
4764926
Fix: build
dwasse Sep 16, 2024
afb2f19
Feat: add SubscribeActiveQuotes() to client
dwasse Sep 16, 2024
e30cd63
Feat: add PutUserQuoteRequest() to api client
dwasse Sep 16, 2024
3c10c02
Fix: build
dwasse Sep 16, 2024
bdae4ca
WIP: rfq tests with ws auth
dwasse Sep 16, 2024
138297d
WIP: test fixes
dwasse Sep 17, 2024
fc1ea97
Feat: working TestHandleActiveRFQ
dwasse Sep 17, 2024
6ae7a71
Feat: add expired request case
dwasse Sep 17, 2024
7cdcade
WIP: functionalize test relayer resps
dwasse Sep 17, 2024
01d83dc
Feat: add runMockRelayer with multiple relayers
dwasse Sep 17, 2024
ee408a9
Feat: add MultipleRelayers case
dwasse Sep 17, 2024
94a8f4d
Feat: add FallbackToPassive case
dwasse Sep 17, 2024
c39d62c
Fix: bigint ptr issue
dwasse Sep 17, 2024
6beb23a
Cleanup: bump expiration window
dwasse Sep 17, 2024
fdf9d12
WIP: logs
dwasse Sep 17, 2024
e23175f
Feat: split into separate tests
dwasse Sep 17, 2024
4b99340
Cleanup: logs
dwasse Sep 17, 2024
c557a28
Feat: add PassiveBestQuote case
dwasse Sep 17, 2024
888ce50
WIP: update db interface with new models
dwasse Sep 17, 2024
3293166
Feat: impl new db funcs
dwasse Sep 17, 2024
63f1a1e
Feat: insert models within api server
dwasse Sep 17, 2024
594d6ea
Feat: update quote request / response statuses
dwasse Sep 17, 2024
7e7c5a1
Fix: db error handling
dwasse Sep 17, 2024
7dcdf59
Fix: api tests
dwasse Sep 17, 2024
8cae8e4
Feat: add initial response validation
dwasse Sep 17, 2024
94ee250
Feat: impl pingpong
dwasse Sep 18, 2024
46d04e2
Fix: register models
dwasse Sep 18, 2024
36701ba
Feat: verify quote request in SingleRelayer case
dwasse Sep 18, 2024
2616b54
Feat: verify more db requests
dwasse Sep 18, 2024
fe7a774
Cleanup: common vars
dwasse Sep 18, 2024
60db841
Cleanup: break down handleActiveRFQ into sub funcs
dwasse Sep 18, 2024
83b7f6d
Cleanup: comments
dwasse Sep 18, 2024
32065ee
Cleanup: remove unused mock
dwasse Sep 18, 2024
c5e9a00
Fix: builds
dwasse Sep 18, 2024
e7d08e7
Feat: make relayer response data optional to signify null resp
dwasse Sep 19, 2024
8e405e5
Fix: response primary key on quote id
dwasse Sep 19, 2024
c6db31f
Fix: build
dwasse Sep 19, 2024
7812573
Feat: update swagger docs
dwasse Sep 19, 2024
a01fb9a
WIP: generic pubsub
dwasse Sep 20, 2024
c27ef32
Feat: add basic PubSubManager
dwasse Sep 20, 2024
b296da8
Feat: implement subscription / unsubscription operations
dwasse Sep 20, 2024
4384fb2
Feat: respond to subscribe operation
dwasse Sep 20, 2024
be695ea
Feat: add runWsListener helper
dwasse Sep 20, 2024
5656bae
Cleanup: reduce chan buffer
dwasse Sep 20, 2024
1c3870c
Cleanup: lints
dwasse Sep 20, 2024
2051b30
Cleanup: break down into smaller funcs
dwasse Sep 20, 2024
f0928c4
Cleanup: refactor ws client
dwasse Sep 20, 2024
4683974
Cleanup: more lints
dwasse Sep 20, 2024
33c24a3
Fix: build
dwasse Sep 20, 2024
7aee229
Cleanup: lints
dwasse Sep 20, 2024
ff0aece
Feat: mark as fulfilled when updating request status
dwasse Sep 20, 2024
91c1bf5
Cleanup: lint
dwasse Sep 20, 2024
cdee6ea
Skip broken test for now
dwasse Sep 20, 2024
8ccbb3f
Cleanup: lint
dwasse Sep 20, 2024
f2920e2
Feat: add open_quote_requests endpoint with test
dwasse Sep 20, 2024
83a3603
Feat: add new open request model
dwasse Sep 20, 2024
f112235
Update swagger
dwasse Sep 20, 2024
292cd37
go mod tidy
trajan0x Sep 23, 2024
dd961c1
fix error
trajan0x Sep 23, 2024
2368313
Fix: respecting context
dwasse Sep 24, 2024
d7948d4
Replace: Fulfilled -> Closed
dwasse Sep 24, 2024
161ea2e
Cleanup: use errors.New()
dwasse Sep 24, 2024
c240cd3
Feat: ReceiveQuoteResponse specifies request id
dwasse Sep 25, 2024
c8b5435
Cleanup: remove logs
dwasse Sep 25, 2024
7fa8003
Feat: add some tracing
dwasse Sep 25, 2024
b05e6b7
Feat: add IntegratorID
dwasse Sep 25, 2024
f2a4be9
Feat: remove repetitive fields from relayer quote response, move requ…
dwasse Sep 25, 2024
f203e7c
Cleanup: use new routes
dwasse Sep 25, 2024
0835aae
Cleanup: migrate req/res struct naming
dwasse Sep 25, 2024
2996aaa
Cleanup: update swagger
dwasse Sep 25, 2024
89c565e
Cleanup: lint
dwasse Sep 25, 2024
0a2b46a
[goreleaser]
dwasse Sep 25, 2024
8850cf0
Feat: run ws endpoint within existing server
dwasse Sep 27, 2024
2bae6b1
[goreleaser]
dwasse Sep 27, 2024
af384d4
Fix: build
dwasse Sep 27, 2024
3ae9552
[goreleaser]
dwasse Sep 27, 2024
d3f839f
Feat: add more tracing
dwasse Sep 27, 2024
925617e
[goreleaser]
dwasse Sep 27, 2024
7ff7c81
feat(rfq-relayer): relayer supports active quoting (#3198)
dwasse Sep 30, 2024
99c9d5c
Fix: build
dwasse Sep 30, 2024
6d6d172
Cleanup: lint
dwasse Sep 30, 2024
c40dada
Cleanup: lint
dwasse Oct 1, 2024
7878364
Cleanup: update swagger
dwasse Oct 1, 2024
2c46bcb
Feat: client sends pings, server sends pongs
dwasse Oct 1, 2024
1025c6c
[goreleaser]
dwasse Oct 1, 2024
65ddc92
Cleanup: remove unused func
dwasse Oct 1, 2024
16b3a5b
WIP: ws error handling
dwasse Oct 1, 2024
d71d686
[goreleaser]
dwasse Oct 1, 2024
a0591d6
Feat: ws client uses errgroup
dwasse Oct 1, 2024
3bc93ab
Cleanup: remove log
dwasse Oct 1, 2024
aa50d07
[goreleaser]
dwasse Oct 1, 2024
b4a25e1
Replace: PutUserQuoteResponse -> PutRFQResponse
dwasse Oct 1, 2024
26c6bbc
Feat: add QuoteID to PutRFQResponse
dwasse Oct 1, 2024
04ff76b
[goreleaser]
dwasse Oct 1, 2024
3324e53
Cleanup: lint
dwasse Oct 2, 2024
cb7dde0
Fix: build
dwasse Oct 2, 2024
cbc6e18
Cleanup: lint
dwasse Oct 2, 2024
5cb6050
[goreleaser]
dwasse Oct 2, 2024
e687ece
Add logs
dwasse Oct 2, 2024
c8a5868
[goreleaser]
dwasse Oct 2, 2024
8bad457
Add logs
dwasse Oct 2, 2024
7e88a97
[goreleaser]
dwasse Oct 2, 2024
526f2af
Cleanup: remove logs
dwasse Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 142 additions & 20 deletions services/rfq/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ package client

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"

"github.com/ipfs/go-log"
dwasse marked this conversation as resolved.
Show resolved Hide resolved

Comment on lines +14 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider updating the logging package

The current implementation uses github.com/ipfs/go-log for logging. It's recommended to use a more modern and widely adopted logging library or the project's standard logging approach. This would help maintain consistency across the project and potentially avoid issues with outdated dependencies.

Consider replacing github.com/ipfs/go-log with a more standard logging library such as log from the standard library, or popular third-party options like github.com/sirupsen/logrus or go.uber.org/zap.

"github.com/google/uuid"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

Expand All @@ -17,17 +20,21 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/go-resty/resty/v2"
"github.com/gorilla/websocket"
dwasse marked this conversation as resolved.
Show resolved Hide resolved
"github.com/synapsecns/sanguine/ethergo/signer/signer"
"github.com/synapsecns/sanguine/services/rfq/api/model"
"github.com/synapsecns/sanguine/services/rfq/api/rest"
)

var logger = log.Logger("rfq-client")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider updating logging package

You're using github.com/ipfs/go-log for logging. It's recommended to use a more modern logging library or the project's standard logging approach to maintain consistency and potentially avoid issues with outdated dependencies.


// AuthenticatedClient is an interface for the RFQ API.
// It provides methods for creating, retrieving and updating quotes.
type AuthenticatedClient interface {
PutQuote(ctx context.Context, q *model.PutQuoteRequest) error
PutQuote(ctx context.Context, q *model.PutRelayerQuoteRequest) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

PutQuote method calls may still use the old parameter type *model.PutQuoteRequest. Please update all instances to use *model.PutRelayerQuoteRequest to ensure type consistency.

  • services/rfq/relayer/quoter/quoter.go:817
  • services/rfq/api/client/client_test.go:20
  • services/rfq/api/client/client_test.go:111
  • services/rfq/api/client/client_test.go:148
🔗 Analysis chain

Verify the function signature change in the codebase.

The PutQuote method signature has been modified to accept a *model.PutRelayerQuoteRequest instead of a *model.PutQuoteRequest. Ensure that all calls to this method have been updated to pass the correct parameter type.

Run the following script to verify the PutQuote method usage:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify all calls to `PutQuote` pass the correct parameter type.

# Search for PutQuote calls that don't use *model.PutRelayerQuoteRequest
rg --type go 'PutQuote\(' | rg -v 'PutRelayerQuoteRequest'

# Also search for any imports of the old PutQuoteRequest type
rg --type go 'model\.PutQuoteRequest'

Length of output: 456


Script:

#!/bin/bash
# Description: Extract the argument types passed to `PutQuote` to ensure correctness.

# Find all PutQuote calls and display the line with the arguments
rg --type go 'PutQuote\(' --pretty

Length of output: 536


Script:

#!/bin/bash
# Description: Verify that 'req' is of type '*model.PutRelayerQuoteRequest' in all PutQuote calls.

# Define an array of files where PutQuote is called
files=(
  "services/rfq/relayer/quoter/quoter.go"
  "services/rfq/api/client/client_test.go"
)

# Iterate over each file and search for the declaration of 'req'
for file in "${files[@]}"; do
  echo "Checking in $file:"
  rg --type go 'var\s+req\s+\*\model\.PutRelayerQuoteRequest' "$file"
done

Length of output: 821


Script:

#!/bin/bash
# Description: Verify that 'req' is of type '*model.PutRelayerQuoteRequest' in all PutQuote calls.

# Define an array of files where PutQuote is called
files=(
  "services/rfq/relayer/quoter/quoter.go"
  "services/rfq/api/client/client_test.go"
)

# Iterate over each file and search for the declaration of 'req'
for file in "${files[@]}"; do
  echo "Checking in $file:"
  rg --type go 'var\s+req\s+\*model\.PutRelayerQuoteRequest' "$file"
done

Length of output: 569


Script:

#!/bin/bash
# Description: Find all declarations of 'req' and display their types to verify correctness.

# Define an array of files where PutQuote is called
files=(
  "services/rfq/relayer/quoter/quoter.go"
  "services/rfq/api/client/client_test.go"
)

# Iterate over each file and search for the declaration of 'req'
for file in "${files[@]}"; do
  echo "Checking declarations in $file:"
  # Search for lines where 'req' is declared and extract the type
  rg --type go 'req\s*[:=]\s*\*?model\.Put(?:Relayer)?QuoteRequest' "$file" -o
done

Length of output: 641

PutBulkQuotes(ctx context.Context, q *model.PutBulkQuotesRequest) error
PutRelayAck(ctx context.Context, req *model.PutAckRequest) (*model.PutRelayAckResponse, error)
SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error)
trajan0x marked this conversation as resolved.
Show resolved Hide resolved
UnauthenticatedClient
}

Expand All @@ -37,6 +44,7 @@ type UnauthenticatedClient interface {
GetSpecificQuote(ctx context.Context, q *model.GetQuoteSpecificRequest) ([]*model.GetQuoteResponse, error)
GetQuoteByRelayerAddress(ctx context.Context, relayerAddr string) ([]*model.GetQuoteResponse, error)
GetRFQContracts(ctx context.Context) (*model.GetContractsResponse, error)
PutUserQuoteRequest(ctx context.Context, q *model.PutUserQuoteRequest) (*model.PutUserQuoteResponse, error)
resty() *resty.Client
}

Expand All @@ -50,12 +58,14 @@ func (c unauthenticatedClient) resty() *resty.Client {

type clientImpl struct {
UnauthenticatedClient
rClient *resty.Client
rClient *resty.Client
wsURL *string
reqSigner signer.Signer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid redundancy in client fields

The clientImpl struct defines its own rClient field while also embedding UnauthenticatedClient, which may already contain an rClient field. This could lead to confusion or unintended behavior due to field shadowing.

Consider removing the rClient field from clientImpl or accessing the embedded rClient through the UnauthenticatedClient interface to maintain clarity.

}

// NewAuthenticatedClient creates a new client for the RFQ quoting API.
// TODO: @aurelius, you don't actually need to be authed for GET Requests.
func NewAuthenticatedClient(metrics metrics.Handler, rfqURL string, reqSigner signer.Signer) (AuthenticatedClient, error) {
func NewAuthenticatedClient(metrics metrics.Handler, rfqURL string, wsURL *string, reqSigner signer.Signer) (AuthenticatedClient, error) {
dwasse marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle nil wsURL parameter in NewAuthenticatedClient

The function NewAuthenticatedClient accepts a wsURL pointer. Ensure that you handle cases where wsURL might be nil to prevent potential nil pointer dereferences when accessing it later in the code.

unauthedClient, err := NewUnauthenticatedClient(metrics, rfqURL)
if err != nil {
return nil, fmt.Errorf("could not create unauthenticated client: %w", err)
Expand All @@ -65,33 +75,41 @@ func NewAuthenticatedClient(metrics metrics.Handler, rfqURL string, reqSigner si
// to a new variable for clarity.
authedClient := unauthedClient.resty().
OnBeforeRequest(func(client *resty.Client, request *resty.Request) error {
// if request.Method == "PUT" && request.URL == rfqURL+rest.QUOTE_ROUTE {
// i.e. signature (hex encoded) = keccak(bytes.concat("\x19Ethereum Signed Message:\n", len(strconv.Itoa(time.Now().Unix()), strconv.Itoa(time.Now().Unix())))
// so that full auth header string: auth = strconv.Itoa(time.Now().Unix()) + ":" + signature
// Get the current Unix timestamp as a string.
now := strconv.Itoa(int(time.Now().Unix()))

// Prepare the data to be signed.
data := "\x19Ethereum Signed Message:\n" + strconv.Itoa(len(now)) + now

sig, err := reqSigner.SignMessage(request.Context(), []byte(data), true)

authHeader, err := getAuthHeader(request.Context(), reqSigner)
if err != nil {
return fmt.Errorf("failed to sign request: %w", err)
return fmt.Errorf("failed to get auth header: %w", err)
}

res := fmt.Sprintf("%s:%s", now, hexutil.Encode(signer.Encode(sig)))
request.SetHeader("Authorization", res)

request.SetHeader(rest.AuthorizationHeader, authHeader)
Comment on lines +80 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential error when setting auth header.

In the OnBeforeRequest callback, if getAuthHeader returns an error, it propagates up and might cause the request to fail silently. Make sure to log the error or handle it appropriately to aid in debugging.

Apply this diff to add error logging:

 authHeader, err := getAuthHeader(request.Context(), reqSigner)
 if err != nil {
+    logger.Errorf("Failed to get auth header: %v", err)
     return fmt.Errorf("failed to get auth header: %w", err)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
authHeader, err := getAuthHeader(request.Context(), reqSigner)
if err != nil {
return fmt.Errorf("failed to sign request: %w", err)
return fmt.Errorf("failed to get auth header: %w", err)
}
res := fmt.Sprintf("%s:%s", now, hexutil.Encode(signer.Encode(sig)))
request.SetHeader("Authorization", res)
request.SetHeader(rest.AuthorizationHeader, authHeader)
authHeader, err := getAuthHeader(request.Context(), reqSigner)
if err != nil {
logger.Errorf("Failed to get auth header: %v", err)
return fmt.Errorf("failed to get auth header: %w", err)
}
request.SetHeader(rest.AuthorizationHeader, authHeader)
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 82-82: services/rfq/api/client/client.go#L82
Added line #L82 was not covered by tests

return nil
})

return &clientImpl{
UnauthenticatedClient: unauthedClient,
rClient: authedClient,
wsURL: wsURL,
reqSigner: reqSigner,
}, nil
}

func getAuthHeader(ctx context.Context, reqSigner signer.Signer) (string, error) {
// if request.Method == "PUT" && request.URL == rfqURL+rest.QUOTE_ROUTE {
// i.e. signature (hex encoded) = keccak(bytes.concat("\x19Ethereum Signed Message:\n", len(strconv.Itoa(time.Now().Unix()), strconv.Itoa(time.Now().Unix())))
// so that full auth header string: auth = strconv.Itoa(time.Now().Unix()) + ":" + signature
// Get the current Unix timestamp as a string.
now := strconv.Itoa(int(time.Now().Unix()))

// Prepare the data to be signed.
data := "\x19Ethereum Signed Message:\n" + strconv.Itoa(len(now)) + now

sig, err := reqSigner.SignMessage(ctx, []byte(data), true)

if err != nil {
return "", fmt.Errorf("failed to sign request: %w", err)
}

return fmt.Sprintf("%s:%s", now, hexutil.Encode(signer.Encode(sig))), nil
}

// NewUnauthenticatedClient creates a new client for the RFQ quoting API.
func NewUnauthenticatedClient(metricHandler metrics.Handler, rfqURL string) (UnauthenticatedClient, error) {
client := resty.New().
Expand All @@ -115,7 +133,7 @@ func NewUnauthenticatedClient(metricHandler metrics.Handler, rfqURL string) (Una
}

// PutQuote puts a new quote in the RFQ quoting API.
func (c *clientImpl) PutQuote(ctx context.Context, q *model.PutQuoteRequest) error {
func (c *clientImpl) PutQuote(ctx context.Context, q *model.PutRelayerQuoteRequest) error {
res, err := c.rClient.R().
SetContext(ctx).
SetBody(q).
Expand Down Expand Up @@ -159,6 +177,91 @@ func (c *clientImpl) PutRelayAck(ctx context.Context, req *model.PutAckRequest)
return ack, nil
}

func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {
if c.wsURL == nil {
return nil, fmt.Errorf("websocket URL is not set")
}
if len(req.ChainIDs) == 0 {
return nil, fmt.Errorf("chain IDs are required")
}

reqURL := *c.wsURL + rest.QuoteRequestsRoute

header := http.Header{}
chainIDsJSON, err := json.Marshal(req.ChainIDs)
if err != nil {
return nil, fmt.Errorf("failed to marshal chain IDs: %w", err)
}
header.Set(rest.ChainsHeader, string(chainIDsJSON))
authHeader, err := getAuthHeader(ctx, c.reqSigner)
if err != nil {
return nil, fmt.Errorf("failed to get auth header: %w", err)
}
header.Set(rest.AuthorizationHeader, authHeader)

// Use the header when dialing
conn, _, err := websocket.DefaultDialer.Dial(reqURL, header)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}

respChan = make(chan *model.ActiveRFQMessage, 1000)

go func() {
defer close(respChan)
defer conn.Close()

readChan := make(chan []byte, 1000)
go func() {
defer close(readChan)
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warnf("websocket connection closed unexpectedly: %v", err)
}
return
}
readChan <- message
}
}()

for {
select {
case <-ctx.Done():
return
case msg, ok := <-reqChan:
if !ok {
return
}
err := conn.WriteJSON(msg)
if err != nil {
logger.Warnf("error sending message to websocket: %v", err)
return
}
case msg, ok := <-readChan:
if !ok {
return
}
var rfqMsg model.ActiveRFQMessage
err = json.Unmarshal(msg, &rfqMsg)
if err != nil {
logger.Warn("error unmarshalling message: %v", err)
continue
}

select {
case respChan <- &rfqMsg:
case <-ctx.Done():
return
}
}
}
}()

return respChan, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prevent potential goroutine leaks in SubscribeActiveQuotes

In the SubscribeActiveQuotes function, there are goroutines that may not properly exit if an error occurs or if the context is canceled, leading to resource leaks.

Consider the following improvements:

  • Use defer statements to ensure that all resources, such as the WebSocket connection and channels, are properly closed.
  • Monitor the context within goroutines and exit when the context is done.
  • Handle errors in a way that guarantees goroutines will terminate correctly.

Apply this diff to enhance resource cleanup:

 func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {
     // Existing code...
 
     go func() {
+        defer conn.Close()
+        defer close(respChan)
         // Existing code...
 
         go func() {
+            defer close(readChan)
             // Existing code...
             for {
                 // Existing code...
                 if err != nil {
                     // Handle error...
+                    return
                 }
                 // Existing code...
             }
         }()
 
         for {
             select {
             case <-ctx.Done():
+                return
             // Existing code...
             }
         }
     }()
 
     return respChan, nil
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {
if c.wsURL == nil {
return nil, fmt.Errorf("websocket URL is not set")
}
if len(req.ChainIDs) == 0 {
return nil, fmt.Errorf("chain IDs are required")
}
reqURL := *c.wsURL + rest.QuoteRequestsRoute
header := http.Header{}
chainIDsJSON, err := json.Marshal(req.ChainIDs)
if err != nil {
return nil, fmt.Errorf("failed to marshal chain IDs: %w", err)
}
header.Set(rest.ChainsHeader, string(chainIDsJSON))
authHeader, err := getAuthHeader(ctx, c.reqSigner)
if err != nil {
return nil, fmt.Errorf("failed to get auth header: %w", err)
}
header.Set(rest.AuthorizationHeader, authHeader)
// Use the header when dialing
conn, _, err := websocket.DefaultDialer.Dial(reqURL, header)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
respChan = make(chan *model.ActiveRFQMessage, 1000)
go func() {
defer close(respChan)
defer conn.Close()
readChan := make(chan []byte, 1000)
go func() {
defer close(readChan)
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warnf("websocket connection closed unexpectedly: %v", err)
}
return
}
readChan <- message
}
}()
for {
select {
case <-ctx.Done():
return
case msg, ok := <-reqChan:
if !ok {
return
}
err := conn.WriteJSON(msg)
if err != nil {
logger.Warnf("error sending message to websocket: %v", err)
return
}
case msg, ok := <-readChan:
if !ok {
return
}
var rfqMsg model.ActiveRFQMessage
err = json.Unmarshal(msg, &rfqMsg)
if err != nil {
logger.Warn("error unmarshalling message: %v", err)
continue
}
select {
case respChan <- &rfqMsg:
case <-ctx.Done():
return
}
}
}
}()
return respChan, nil
}
func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {
if c.wsURL == nil {
return nil, fmt.Errorf("websocket URL is not set")
}
if len(req.ChainIDs) == 0 {
return nil, fmt.Errorf("chain IDs are required")
}
reqURL := *c.wsURL + rest.QuoteRequestsRoute
header := http.Header{}
chainIDsJSON, err := json.Marshal(req.ChainIDs)
if err != nil {
return nil, fmt.Errorf("failed to marshal chain IDs: %w", err)
}
header.Set(rest.ChainsHeader, string(chainIDsJSON))
authHeader, err := getAuthHeader(ctx, c.reqSigner)
if err != nil {
return nil, fmt.Errorf("failed to get auth header: %w", err)
}
header.Set(rest.AuthorizationHeader, authHeader)
// Use the header when dialing
conn, _, err := websocket.DefaultDialer.Dial(reqURL, header)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
respChan = make(chan *model.ActiveRFQMessage, 1000)
go func() {
defer conn.Close()
defer close(respChan)
readChan := make(chan []byte, 1000)
go func() {
defer close(readChan)
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warnf("websocket connection closed unexpectedly: %v", err)
}
return
}
readChan <- message
}
}()
for {
select {
case <-ctx.Done():
return
case msg, ok := <-reqChan:
if !ok {
return
}
err := conn.WriteJSON(msg)
if err != nil {
logger.Warnf("error sending message to websocket: %v", err)
return
}
case msg, ok := <-readChan:
if !ok {
return
}
var rfqMsg model.ActiveRFQMessage
err = json.Unmarshal(msg, &rfqMsg)
if err != nil {
logger.Warn("error unmarshalling message: %v", err)
continue
}
select {
case respChan <- &rfqMsg:
case <-ctx.Done():
return
}
}
}
}()
return respChan, nil
}


// GetAllQuotes retrieves all quotes from the RFQ quoting API.
func (c *unauthenticatedClient) GetAllQuotes(ctx context.Context) ([]*model.GetQuoteResponse, error) {
var quotes []*model.GetQuoteResponse
Expand Down Expand Up @@ -242,6 +345,25 @@ func (c unauthenticatedClient) GetRFQContracts(ctx context.Context) (*model.GetC
return contracts, nil
}

func (c unauthenticatedClient) PutUserQuoteRequest(ctx context.Context, q *model.PutUserQuoteRequest) (*model.PutUserQuoteResponse, error) {
var response model.PutUserQuoteResponse
resp, err := c.rClient.R().
SetContext(ctx).
SetBody(q).
SetResult(&response).
Put(rest.PutQuoteRequestRoute)

if err != nil {
return nil, fmt.Errorf("error from server: %s: %w", getStatus(resp), err)
}

if resp.IsError() {
return nil, fmt.Errorf("error from server: %s", getStatus(resp))
}

return &response, nil
}

func getStatus(resp *resty.Response) string {
if resp == nil {
return "http status unavailable"
Expand Down
8 changes: 4 additions & 4 deletions services/rfq/api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// TODO: @aurelius tese tests make a lot less sesnes w/ a composite index

func (c *ClientSuite) TestPutAndGetQuote() {
req := model.PutQuoteRequest{
req := model.PutRelayerQuoteRequest{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
DestChainID: 42161,
Expand Down Expand Up @@ -40,7 +40,7 @@ func (c *ClientSuite) TestPutAndGetQuote() {

func (c *ClientSuite) TestPutAndGetBulkQuotes() {
req := model.PutBulkQuotesRequest{
Quotes: []model.PutQuoteRequest{
Quotes: []model.PutRelayerQuoteRequest{
{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
Expand Down Expand Up @@ -98,7 +98,7 @@ func (c *ClientSuite) TestPutAndGetBulkQuotes() {
}

func (c *ClientSuite) TestGetSpecificQuote() {
req := model.PutQuoteRequest{
req := model.PutRelayerQuoteRequest{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
DestChainID: 42161,
Expand Down Expand Up @@ -135,7 +135,7 @@ func (c *ClientSuite) TestGetSpecificQuote() {
}

func (c *ClientSuite) TestGetQuoteByRelayerAddress() {
req := model.PutQuoteRequest{
req := model.PutRelayerQuoteRequest{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
DestChainID: 42161,
Expand Down
2 changes: 1 addition & 1 deletion services/rfq/api/client/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *ClientSuite) SetupTest() {
}()
time.Sleep(2 * time.Second) // Wait for the server to start.

c.client, err = client.NewAuthenticatedClient(metrics.Get(), fmt.Sprintf("http://127.0.0.1:%d", port), localsigner.NewSigner(c.testWallet.PrivateKey()))
c.client, err = client.NewAuthenticatedClient(metrics.Get(), fmt.Sprintf("http://127.0.0.1:%d", port), nil, localsigner.NewSigner(c.testWallet.PrivateKey()))
c.Require().NoError(err)
}

Expand Down
1 change: 1 addition & 0 deletions services/rfq/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
Port string `yaml:"port"`
RelayAckTimeout time.Duration `yaml:"relay_ack_timeout"`
MaxQuoteAge time.Duration `yaml:"max_quote_age"`
WebsocketPort *string `yaml:"websocket_port"`
}
dwasse marked this conversation as resolved.
Show resolved Hide resolved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a GetWebSocketPort method with a default value

To provide a default WebSocket port when none is specified in the configuration, consider adding a GetWebSocketPort method, similar to GetRelayAckTimeout and GetMaxQuoteAge.

Add the following code:

const defaultWebSocketPort = ":8080"

// GetWebSocketPort returns the WebSocket port, defaulting to ":8080" if not set
func (c Config) GetWebSocketPort() string {
    if c.WebSocketPort == "" {
        return defaultWebSocketPort
    }
    return c.WebSocketPort
}

This ensures your application has a default port for WebSocket connections, improving robustness.

const defaultRelayAckTimeout = 30 * time.Second
Expand Down
6 changes: 3 additions & 3 deletions services/rfq/api/model/request.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package model

// PutQuoteRequest contains the schema for a PUT /quote request.
type PutQuoteRequest struct {
// PutRelayerQuoteRequest contains the schema for a PUT /quote request.
type PutRelayerQuoteRequest struct {
OriginChainID int `json:"origin_chain_id"`
OriginTokenAddr string `json:"origin_token_addr"`
DestChainID int `json:"dest_chain_id"`
Expand All @@ -15,7 +15,7 @@ type PutQuoteRequest struct {

// PutBulkQuotesRequest contains the schema for a PUT /quote request.
type PutBulkQuotesRequest struct {
Quotes []PutQuoteRequest `json:"quotes"`
Quotes []PutRelayerQuoteRequest `json:"quotes"`
}

// PutAckRequest contains the schema for a PUT /ack request.
Expand Down
Loading
Loading