Skip to content

Commit

Permalink
More logs
Browse files Browse the repository at this point in the history
  • Loading branch information
dwasse committed Dec 12, 2024
1 parent 3ba6dda commit e160c45
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
2 changes: 2 additions & 0 deletions services/rfq/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn,
case <-ctx.Done():
return nil
case msg, ok := <-reqChan:
fmt.Printf("recved message from reqChan: %+v\n", msg)
if !ok {
return fmt.Errorf("error reading from reqChan: %w", ctx.Err())
}
Expand All @@ -283,6 +284,7 @@ func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn,
return fmt.Errorf("error sending message to websocket: %w", err)
}
case msg, ok := <-readChan:
fmt.Printf("recved message from readChan: %+v\n", msg)
if !ok {
return nil
}
Expand Down
13 changes: 13 additions & 0 deletions services/rfq/api/rest/rfq.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ func (r *QuoterAPIServer) handleActiveRFQ(ctx context.Context, request *model.Pu
relayerReq := model.NewWsRFQRequest(request.Data, requestID)
fmt.Printf("parsed websocket request: %+v\n", relayerReq.Data)
r.wsClients.Range(func(relayerAddr string, client WsClient) bool {
fmt.Printf("sending quote request to client with address %v: %v\n", relayerAddr, client)
sendCtx, sendSpan := r.handler.Tracer().Start(ctx, "sendQuoteRequest", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
))
defer metrics.EndSpan(sendSpan)

subscribed := r.pubSubManager.IsSubscribed(relayerAddr, request.Data.OriginChainID, request.Data.DestChainID)
fmt.Printf("relayer %v subscribed: %v\n", relayerAddr, subscribed)
span.SetAttributes(attribute.Bool("subscribed", subscribed))
if subscribed {
fmt.Printf("sending request to relayer %v: %+v\n", relayerAddr, relayerReq)
err := client.SendQuoteRequest(sendCtx, relayerReq)
if err != nil {
logger.Errorf("Error sending quote request to %s: %v", relayerAddr, err)
Expand All @@ -57,10 +60,12 @@ func (r *QuoterAPIServer) handleActiveRFQ(ctx context.Context, request *model.Pu
// collect the responses and determine the best quote
responses := r.collectRelayerResponses(ctx, request, requestID)
for r, resp := range responses {
fmt.Printf("considering response: %+v\n", resp)
relayerAddr := r
quote = getBestQuote(quote, getRelayerQuoteData(request, resp))
quote.RelayerAddress = &relayerAddr
}
fmt.Printf("best quote: %+v\n", quote)
err = r.recordActiveQuote(ctx, quote, requestID)
if err != nil {
logger.Errorf("Error recording active quote: %v", err)
Expand All @@ -70,6 +75,7 @@ func (r *QuoterAPIServer) handleActiveRFQ(ctx context.Context, request *model.Pu
}

func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *model.PutRFQRequest, requestID string) (responses map[string]*model.WsRFQResponse) {
fmt.Printf("collectRelayerResponses with request data: %+v\n", request.Data)
ctx, span := r.handler.Tracer().Start(ctx, "collectRelayerResponses", trace.WithAttributes(
attribute.String("user_address", request.UserAddress),
attribute.String("request_id", requestID),
Expand All @@ -87,6 +93,7 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
respMux := sync.Mutex{}
responses = map[string]*model.WsRFQResponse{}
r.wsClients.Range(func(relayerAddr string, client WsClient) bool {
fmt.Printf("processing ws client with addr: %v: client: %v\n", relayerAddr, client)
wg.Add(1)
go func(client WsClient) {
var respStatus db.ActiveQuoteResponseStatus
Expand All @@ -102,6 +109,7 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *

defer wg.Done()
resp, err := client.ReceiveQuoteResponse(collectionCtx, requestID)
fmt.Printf("recved quote resp: %+v\n", resp)
if err != nil {
logger.Errorf("Error receiving quote response: %v", err)
return
Expand All @@ -121,6 +129,7 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
}

// record the response
fmt.Printf("recording resp: %+v\n", resp)
err = r.db.InsertActiveQuoteResponse(collectionCtx, resp, relayerAddr, respStatus)
if err != nil {
logger.Errorf("Error inserting active quote response: %v", err)
Expand All @@ -132,6 +141,7 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
// wait for all responses to be received, or expiration
select {
case <-expireCtx.Done():
fmt.Println("request expired")
// request expired before all responses were received
case <-func() chan struct{} {
ch := make(chan struct{})
Expand All @@ -144,6 +154,9 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
// all responses received
}

fmt.Printf("responses received: %+v\n", responses)
fmt.Printf("num responses: %v\n", len(responses))

return responses
}

Expand Down
6 changes: 3 additions & 3 deletions services/rfq/relayer/quoter/quoter.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,6 @@ func (m *Manager) SubmitAllQuotes(ctx context.Context) (err error) {
// This function is blocking and will run until the context is canceled.
func (m *Manager) SubscribeActiveRFQ(ctx context.Context) (err error) {
ctx, span := m.metricsHandler.Tracer().Start(ctx, "SubscribeActiveRFQ")
defer func() {
metrics.EndSpanWithErr(span, err)
}()

chainIDs := []int{}
for chainID := range m.config.Chains {
Expand All @@ -313,9 +310,12 @@ func (m *Manager) SubscribeActiveRFQ(ctx context.Context) (err error) {
reqChan := make(chan *model.ActiveRFQMessage)
respChan, err := m.rfqClient.SubscribeActiveQuotes(ctx, &req, reqChan)
if err != nil {
metrics.EndSpanWithErr(span, err)
return fmt.Errorf("error subscribing to active quotes: %w", err)
}
span.AddEvent("subscribed to active quotes")
metrics.EndSpan(span)

for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit e160c45

Please sign in to comment.