Skip to content

Commit

Permalink
apply review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Aug 4, 2023
1 parent fc040e7 commit d60356e
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 30 deletions.
5 changes: 0 additions & 5 deletions share/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ func (ns NamespacedShares) Flatten() []Share {
return shares
}

// IsEmpty indicates if there are no shares found for given namespace
func (ns NamespacedShares) IsEmpty() bool {
return len(ns) == 0 || (len(ns) == 1 && len(ns[0].Shares) == 0)
}

// NamespacedRow represents all shares with proofs within a specific namespace of a single EDS row.
type NamespacedRow struct {
Shares []Share
Expand Down
2 changes: 1 addition & 1 deletion share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestShrexGetter(t *testing.T) {
getter := NewShrexGetter(edsClient, ndClient, peerManager)
require.NoError(t, getter.Start(ctx))

t.Run("ND_Available", func(t *testing.T) {
t.Run("ND_Available, total data size > 1mb", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
t.Cleanup(cancel)

Expand Down
19 changes: 9 additions & 10 deletions share/p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@ var meter = otel.Meter("shrex/eds")
type status string

const (
StatusBadReuest status = "bad_request"
StatusSendRespErr status = "send_resp_err"
StatusSendReqErr status = "send_req_err"
StatusReadRespErr status = "read_resp_err"
StatusInternalErr status = "internal_err"
StatusNotFound status = "not_found"
StatusNamespaceNotFound status = "namespace_not_found"
StatusTimeout status = "timeout"
StatusSuccess status = "success"
StatusRateLimited status = "rate_limited"
StatusBadRequest status = "bad_request"
StatusSendRespErr status = "send_resp_err"
StatusSendReqErr status = "send_req_err"
StatusReadRespErr status = "read_resp_err"
StatusInternalErr status = "internal_err"
StatusNotFound status = "not_found"
StatusTimeout status = "timeout"
StatusSuccess status = "success"
StatusRateLimited status = "rate_limited"
)

type Metrics struct {
Expand Down
4 changes: 2 additions & 2 deletions share/p2p/shrexnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (c *Client) readStatus(ctx context.Context, stream network.Stream) error {
return c.convertStatusToErr(ctx, resp.Status)
}

// convertToNamespacedShares converts proto Rows to share.NamespacedShares
// readNamespacedShares converts proto Rows to share.NamespacedShares
func (c *Client) readNamespacedShares(
ctx context.Context,
stream network.Stream,
Expand Down Expand Up @@ -213,7 +213,7 @@ func (c *Client) convertStatusToErr(ctx context.Context, status pb.StatusCode) e
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return p2p.ErrNotFound
case pb.StatusCode_INVALID:
log.Error("client-nd: invalid request")
log.Warn("client-nd: invalid request")
fallthrough
case pb.StatusCode_INTERNAL:
fallthrough
Expand Down
33 changes: 21 additions & 12 deletions share/p2p/shrexnd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (srv *Server) observeRateLimitedRequests() {

func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stream) error {
logger := log.With("source", "server", "peer", stream.Conn().RemotePeer().String())
logger.Debug("server: handling nd request")
logger.Debug("handling nd request")

srv.observeRateLimitedRequests()
req, err := srv.readRequest(logger, stream)
if err != nil {
logger.Warnw("read request", "err", err)
srv.metrics.ObserveRequests(ctx, 1, p2p.StatusBadReuest)
srv.metrics.ObserveRequests(ctx, 1, p2p.StatusBadRequest)
return err
}

Expand All @@ -117,13 +117,21 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre
ctx, cancel := context.WithTimeout(ctx, srv.params.HandleRequestTimeout)
defer cancel()

shares, status, getErr := srv.getNamespaceData(ctx, req.RootHash, req.Namespace)
shares, status, err := srv.getNamespaceData(ctx, req.RootHash, req.Namespace)
if err != nil {
// server should respond with status regardless if there was an error getting data
sendErr := srv.respondStatus(ctx, logger, stream, status)
if sendErr != nil {
logger.Errorw("sending response", "err", sendErr)
srv.metrics.ObserveRequests(ctx, 1, p2p.StatusSendRespErr)
}
logger.Errorw("handling request", "err", err)
return errors.Join(err, sendErr)
}

// server should respond with status regardless if there was an error getting data
err = srv.respondStatus(ctx, logger, stream, status)
if err != nil || getErr != nil {
err = errors.Join(getErr, err)
logger.Errorw("handling request", "err", err)
if err != nil {
logger.Errorw("sending response", "err", err)
srv.metrics.ObserveRequests(ctx, 1, p2p.StatusSendRespErr)
return err
}
Expand All @@ -143,7 +151,7 @@ func (srv *Server) readRequest(
) (*pb.GetSharesByNamespaceRequest, error) {
err := stream.SetReadDeadline(time.Now().Add(srv.params.ServerReadTimeout))
if err != nil {
logger.Debugw("server: setting read deadline", "err", err)
logger.Debugw("setting read deadline", "err", err)
}

var req pb.GetSharesByNamespaceRequest
Expand All @@ -153,10 +161,10 @@ func (srv *Server) readRequest(

}

logger.Debugw("server: new request")
logger.Debugw("new request")
err = stream.CloseRead()
if err != nil {
logger.Debugw("server: closing read side of the stream", "err", err)
logger.Debugw("closing read side of the stream", "err", err)
}

err = validateRequest(req)
Expand Down Expand Up @@ -188,12 +196,13 @@ func (srv *Server) respondStatus(
ctx context.Context,
logger *zap.SugaredLogger,
stream network.Stream,
status pb.StatusCode) error {
status pb.StatusCode,
) error {
srv.observeStatus(ctx, status)

err := stream.SetWriteDeadline(time.Now().Add(srv.params.ServerWriteTimeout))
if err != nil {
logger.Debugw("server: setting write deadline", "err", err)
logger.Debugw("setting write deadline", "err", err)
}

_, err = serde.Write(stream, &pb.GetSharesByNamespaceStatusResponse{Status: status})
Expand Down

0 comments on commit d60356e

Please sign in to comment.