From d60356ec78f227bc43fe731f7c9a809edba9671d Mon Sep 17 00:00:00 2001 From: Vlad Date: Mon, 31 Jul 2023 14:45:17 +0300 Subject: [PATCH] apply review suggestions --- share/getter.go | 5 ----- share/getters/shrex_test.go | 2 +- share/p2p/metrics.go | 19 +++++++++---------- share/p2p/shrexnd/client.go | 4 ++-- share/p2p/shrexnd/server.go | 33 +++++++++++++++++++++------------ 5 files changed, 33 insertions(+), 30 deletions(-) diff --git a/share/getter.go b/share/getter.go index 06dbca778f..7caa954a24 100644 --- a/share/getter.go +++ b/share/getter.go @@ -47,11 +47,6 @@ func (ns NamespacedShares) Flatten() []Share { return shares } -// IsEmpty indicates if there are no shares found for given namespace -func (ns NamespacedShares) IsEmpty() bool { - return len(ns) == 0 || (len(ns) == 1 && len(ns[0].Shares) == 0) -} - // NamespacedRow represents all shares with proofs within a specific namespace of a single EDS row. type NamespacedRow struct { Shares []Share diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index b7de5b8c8e..704859124b 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -61,7 +61,7 @@ func TestShrexGetter(t *testing.T) { getter := NewShrexGetter(edsClient, ndClient, peerManager) require.NoError(t, getter.Start(ctx)) - t.Run("ND_Available", func(t *testing.T) { + t.Run("ND_Available, total data size > 1mb", func(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Second*10) t.Cleanup(cancel) diff --git a/share/p2p/metrics.go b/share/p2p/metrics.go index 33653046b7..fee3b12413 100644 --- a/share/p2p/metrics.go +++ b/share/p2p/metrics.go @@ -14,16 +14,15 @@ var meter = otel.Meter("shrex/eds") type status string const ( - StatusBadReuest status = "bad_request" - StatusSendRespErr status = "send_resp_err" - StatusSendReqErr status = "send_req_err" - StatusReadRespErr status = "read_resp_err" - StatusInternalErr status = "internal_err" - StatusNotFound status = "not_found" - StatusNamespaceNotFound status = "namespace_not_found" - StatusTimeout status = "timeout" - StatusSuccess status = "success" - StatusRateLimited status = "rate_limited" + StatusBadRequest status = "bad_request" + StatusSendRespErr status = "send_resp_err" + StatusSendReqErr status = "send_req_err" + StatusReadRespErr status = "read_resp_err" + StatusInternalErr status = "internal_err" + StatusNotFound status = "not_found" + StatusTimeout status = "timeout" + StatusSuccess status = "success" + StatusRateLimited status = "rate_limited" ) type Metrics struct { diff --git a/share/p2p/shrexnd/client.go b/share/p2p/shrexnd/client.go index 70bb374fa0..86c5150095 100644 --- a/share/p2p/shrexnd/client.go +++ b/share/p2p/shrexnd/client.go @@ -133,7 +133,7 @@ func (c *Client) readStatus(ctx context.Context, stream network.Stream) error { return c.convertStatusToErr(ctx, resp.Status) } -// convertToNamespacedShares converts proto Rows to share.NamespacedShares +// readNamespacedShares converts proto Rows to share.NamespacedShares func (c *Client) readNamespacedShares( ctx context.Context, stream network.Stream, @@ -213,7 +213,7 @@ func (c *Client) convertStatusToErr(ctx context.Context, status pb.StatusCode) e c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) return p2p.ErrNotFound case pb.StatusCode_INVALID: - log.Error("client-nd: invalid request") + log.Warn("client-nd: invalid request") fallthrough case pb.StatusCode_INTERNAL: fallthrough diff --git a/share/p2p/shrexnd/server.go b/share/p2p/shrexnd/server.go index 2de71f000b..670ecdcf99 100644 --- a/share/p2p/shrexnd/server.go +++ b/share/p2p/shrexnd/server.go @@ -101,13 +101,13 @@ func (srv *Server) observeRateLimitedRequests() { func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stream) error { logger := log.With("source", "server", "peer", stream.Conn().RemotePeer().String()) - logger.Debug("server: handling nd request") + logger.Debug("handling nd request") srv.observeRateLimitedRequests() req, err := srv.readRequest(logger, stream) if err != nil { logger.Warnw("read request", "err", err) - srv.metrics.ObserveRequests(ctx, 1, p2p.StatusBadReuest) + srv.metrics.ObserveRequests(ctx, 1, p2p.StatusBadRequest) return err } @@ -117,13 +117,21 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre ctx, cancel := context.WithTimeout(ctx, srv.params.HandleRequestTimeout) defer cancel() - shares, status, getErr := srv.getNamespaceData(ctx, req.RootHash, req.Namespace) + shares, status, err := srv.getNamespaceData(ctx, req.RootHash, req.Namespace) + if err != nil { + // server should respond with status regardless if there was an error getting data + sendErr := srv.respondStatus(ctx, logger, stream, status) + if sendErr != nil { + logger.Errorw("sending response", "err", sendErr) + srv.metrics.ObserveRequests(ctx, 1, p2p.StatusSendRespErr) + } + logger.Errorw("handling request", "err", err) + return errors.Join(err, sendErr) + } - // server should respond with status regardless if there was an error getting data err = srv.respondStatus(ctx, logger, stream, status) - if err != nil || getErr != nil { - err = errors.Join(getErr, err) - logger.Errorw("handling request", "err", err) + if err != nil { + logger.Errorw("sending response", "err", err) srv.metrics.ObserveRequests(ctx, 1, p2p.StatusSendRespErr) return err } @@ -143,7 +151,7 @@ func (srv *Server) readRequest( ) (*pb.GetSharesByNamespaceRequest, error) { err := stream.SetReadDeadline(time.Now().Add(srv.params.ServerReadTimeout)) if err != nil { - logger.Debugw("server: setting read deadline", "err", err) + logger.Debugw("setting read deadline", "err", err) } var req pb.GetSharesByNamespaceRequest @@ -153,10 +161,10 @@ func (srv *Server) readRequest( } - logger.Debugw("server: new request") + logger.Debugw("new request") err = stream.CloseRead() if err != nil { - logger.Debugw("server: closing read side of the stream", "err", err) + logger.Debugw("closing read side of the stream", "err", err) } err = validateRequest(req) @@ -188,12 +196,13 @@ func (srv *Server) respondStatus( ctx context.Context, logger *zap.SugaredLogger, stream network.Stream, - status pb.StatusCode) error { + status pb.StatusCode, +) error { srv.observeStatus(ctx, status) err := stream.SetWriteDeadline(time.Now().Add(srv.params.ServerWriteTimeout)) if err != nil { - logger.Debugw("server: setting write deadline", "err", err) + logger.Debugw("setting write deadline", "err", err) } _, err = serde.Write(stream, &pb.GetSharesByNamespaceStatusResponse{Status: status})