From f67cb6dbc1176fbfa2dd868726172bd07ddcb291 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 31 May 2023 09:33:35 +0200 Subject: [PATCH 1/2] fix: reset read deadline after reading deal proposal message --- storagemarket/lp2pimpl/net.go | 71 +++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/storagemarket/lp2pimpl/net.go b/storagemarket/lp2pimpl/net.go index aae92782d..be8f2fbee 100644 --- a/storagemarket/lp2pimpl/net.go +++ b/storagemarket/lp2pimpl/net.go @@ -23,6 +23,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "go.uber.org/zap" ) var log = logging.Logger("boost-net") @@ -197,32 +198,40 @@ func (p *DealProvider) Stop() { func (p *DealProvider) handleNewDealStream(s network.Stream) { defer s.Close() + start := time.Now() + reqLogUuid := uuid.New() + reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer()) + reqLog.Debugw("new deal proposal request") + + defer func() { + reqLog.Debugw("handled deal proposal request", "duration", time.Since(start).String()) + }() + // Set a deadline on reading from the stream so it doesn't hang _ = s.SetReadDeadline(time.Now().Add(providerReadDeadline)) - defer s.SetReadDeadline(time.Time{}) // nolint // Read the deal proposal from the stream var proposal types.DealParams err := proposal.UnmarshalCBOR(s) + _ = s.SetReadDeadline(time.Time{}) // Clear read deadline so conn doesn't get closed if err != nil { - log.Warnw("reading storage deal proposal from stream", "err", err) + reqLog.Warnw("reading storage deal proposal from stream", "err", err) return } - log.Infow("received deal proposal", "id", proposal.DealUUID, "client-peer", s.Conn().RemotePeer()) + reqLog = reqLog.With("id", proposal.DealUUID) + reqLog.Infow("received deal proposal") // Start executing the deal. // Note: This method just waits for the deal to be accepted, it doesn't // wait for deal execution to complete. + startExec := time.Now() res, err := p.prov.ExecuteDeal(context.Background(), &proposal, s.Conn().RemotePeer()) + reqLog.Debugw("processed deal proposal accept") if err != nil { - log.Warnw("deal proposal failed", "id", proposal.DealUUID, "err", err, "reason", res.Reason) + reqLog.Warnw("deal proposal failed", "err", err, "reason", res.Reason) } - // Set a deadline on writing to the stream so it doesn't hang - _ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline)) - defer s.SetWriteDeadline(time.Time{}) // nolint - // Log the response propLog.Infow("send deal proposal response", "id", proposal.DealUUID, @@ -238,44 +247,58 @@ func (p *DealProvider) handleNewDealStream(s network.Stream) { "start epoch", proposal.ClientDealProposal.Proposal.StartEpoch, "end epoch", proposal.ClientDealProposal.Proposal.EndEpoch, "price per epoch", proposal.ClientDealProposal.Proposal.StoragePricePerEpoch, + "duration", time.Since(startExec).String(), ) _ = p.plDB.InsertLog(p.ctx, proposal, res.Accepted, res.Reason) //nolint:errcheck + // Set a deadline on writing to the stream so it doesn't hang + _ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline)) + defer s.SetWriteDeadline(time.Time{}) // nolint + // Write the response to the client err = cborutil.WriteCborRPC(s, &types.DealResponse{Accepted: res.Accepted, Message: res.Reason}) if err != nil { - log.Warnw("writing deal response", "id", proposal.DealUUID, "err", err) - return + reqLog.Warnw("writing deal response", "err", err) } } func (p *DealProvider) handleNewDealStatusStream(s network.Stream) { defer s.Close() - _ = s.SetReadDeadline(time.Now().Add(providerReadDeadline)) - defer s.SetReadDeadline(time.Time{}) // nolint + start := time.Now() + reqLogUuid := uuid.New() + reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer()) + reqLog.Debugw("new deal status request") + + defer func() { + reqLog.Debugw("handled deal status request", "duration", time.Since(start).String()) + }() + // Read the deal status request from the stream + _ = s.SetReadDeadline(time.Now().Add(providerReadDeadline)) var req types.DealStatusRequest err := req.UnmarshalCBOR(s) + _ = s.SetReadDeadline(time.Time{}) // Clear read deadline so conn doesn't get closed if err != nil { - log.Warnw("reading deal status request from stream", "err", err) + reqLog.Warnw("reading deal status request from stream", "err", err) return } - log.Debugw("received deal status request", "id", req.DealUUID, "client-peer", s.Conn().RemotePeer()) + reqLog = reqLog.With("id", req.DealUUID) + reqLog.Debugw("received deal status request") - resp := p.getDealStatus(req) + resp := p.getDealStatus(req, reqLog) + reqLog.Debugw("processed deal status request") // Set a deadline on writing to the stream so it doesn't hang _ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline)) defer s.SetWriteDeadline(time.Time{}) // nolint if err := cborutil.WriteCborRPC(s, &resp); err != nil { - log.Errorw("failed to write deal status response", "err", err) - return + reqLog.Errorw("failed to write deal status response", "err", err) } } -func (p *DealProvider) getDealStatus(req types.DealStatusRequest) types.DealStatusResponse { +func (p *DealProvider) getDealStatus(req types.DealStatusRequest, reqLog *zap.SugaredLogger) types.DealStatusResponse { errResp := func(err string) types.DealStatusResponse { return types.DealStatusResponse{DealUUID: req.DealUUID, Error: err} } @@ -286,34 +309,34 @@ func (p *DealProvider) getDealStatus(req types.DealStatusRequest) types.DealStat } if err != nil { - log.Errorw("failed to fetch deal status", "err", err) + reqLog.Errorw("failed to fetch deal status", "err", err) return errResp("failed to fetch deal status") } // verify request signature uuidBytes, err := req.DealUUID.MarshalBinary() if err != nil { - log.Errorw("failed to serialize request deal UUID", "err", err) + reqLog.Errorw("failed to serialize request deal UUID", "err", err) return errResp("failed to serialize request deal UUID") } clientAddr := pds.ClientDealProposal.Proposal.Client addr, err := p.fullNode.StateAccountKey(p.ctx, clientAddr, chaintypes.EmptyTSK) if err != nil { - log.Errorw("failed to get account key for client addr", "client", clientAddr.String(), "err", err) + reqLog.Errorw("failed to get account key for client addr", "client", clientAddr.String(), "err", err) msg := fmt.Sprintf("failed to get account key for client addr %s", clientAddr.String()) return errResp(msg) } err = sigs.Verify(&req.Signature, addr, uuidBytes) if err != nil { - log.Warnw("signature verification failed", "err", err) + reqLog.Warnw("signature verification failed", "err", err) return errResp("signature verification failed") } signedPropCid, err := pds.SignedProposalCid() if err != nil { - log.Errorw("getting signed proposal cid", "err", err) + reqLog.Errorw("getting signed proposal cid", "err", err) return errResp("getting signed proposal cid") } @@ -321,7 +344,7 @@ func (p *DealProvider) getDealStatus(req types.DealStatusRequest) types.DealStat si, err := p.spApi.SectorsStatus(p.ctx, pds.SectorID, false) if err != nil { - log.Errorw("getting sector status from sealer", "err", err) + reqLog.Errorw("getting sector status from sealer", "err", err) return errResp("getting sector status from sealer") } From e6fc5089dde7a1e7000f755e3dd07d8148695f62 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 31 May 2023 09:58:15 +0200 Subject: [PATCH 2/2] fix: increase client request deadline --- go.mod | 2 +- storagemarket/lp2pimpl/net.go | 24 +++++++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index c9c1b7166..e43191fee 100644 --- a/go.mod +++ b/go.mod @@ -320,7 +320,7 @@ require ( github.com/zondax/hid v0.9.1 // indirect github.com/zondax/ledger-go v0.12.1 // indirect go.uber.org/dig v1.15.0 // indirect - go.uber.org/zap v1.24.0 // indirect + go.uber.org/zap v1.24.0 go4.org v0.0.0-20200411211856-f5505b9728dd // indirect golang.org/x/mod v0.7.0 // indirect golang.org/x/net v0.7.0 // indirect diff --git a/storagemarket/lp2pimpl/net.go b/storagemarket/lp2pimpl/net.go index be8f2fbee..e281921b2 100644 --- a/storagemarket/lp2pimpl/net.go +++ b/storagemarket/lp2pimpl/net.go @@ -32,9 +32,19 @@ var propLog = logging.Logger("boost-prop") const DealProtocolv120ID = "/fil/storage/mk/1.2.0" const DealProtocolv121ID = "/fil/storage/mk/1.2.1" const DealStatusV12ProtocolID = "/fil/storage/status/1.2.0" + +// The time limit to read a message from the client when the client opens a stream const providerReadDeadline = 10 * time.Second + +// The time limit to write a response to the client const providerWriteDeadline = 10 * time.Second -const clientReadDeadline = 10 * time.Second + +// The time limit to wait for the provider to send a response to a client's request. +// This includes the time it takes for the provider to process the request and +// send a response. +const clientReadDeadline = 60 * time.Second + +// The time limit to write a message to the provider const clientWriteDeadline = 10 * time.Second // DealClientOption is an option for configuring the libp2p storage deal client @@ -196,14 +206,16 @@ func (p *DealProvider) Stop() { // Called when the client opens a libp2p stream with a new deal proposal func (p *DealProvider) handleNewDealStream(s network.Stream) { - defer s.Close() - start := time.Now() reqLogUuid := uuid.New() reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer()) reqLog.Debugw("new deal proposal request") defer func() { + err := s.Close() + if err != nil { + reqLog.Infow("closing stream", "err", err) + } reqLog.Debugw("handled deal proposal request", "duration", time.Since(start).String()) }() @@ -263,14 +275,16 @@ func (p *DealProvider) handleNewDealStream(s network.Stream) { } func (p *DealProvider) handleNewDealStatusStream(s network.Stream) { - defer s.Close() - start := time.Now() reqLogUuid := uuid.New() reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer()) reqLog.Debugw("new deal status request") defer func() { + err := s.Close() + if err != nil { + reqLog.Infow("closing stream", "err", err) + } reqLog.Debugw("handled deal status request", "duration", time.Since(start).String()) }()