From fffb68cac58227e5c58c1d2aa23d279e8d6c543c Mon Sep 17 00:00:00 2001 From: alok Date: Sat, 26 Jun 2021 14:16:47 +0530 Subject: [PATCH 01/16] chore: add readDeadline --- pkg/api/pss.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/api/pss.go b/pkg/api/pss.go index 923a07f9d65..79e22c79231 100644 --- a/pkg/api/pss.go +++ b/pkg/api/pss.go @@ -25,6 +25,7 @@ import ( var ( writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close + readDeadline = 4 * time.Second // read deadline. should be smaller than the shutdown timeout on api close targetMaxLength = 2 // max target length in bytes, in order to prevent grieving by excess computation ) From a6666f383826de13f645d55e5e5ca54fbfd2f595 Mon Sep 17 00:00:00 2001 From: alok Date: Mon, 28 Jun 2021 11:12:31 +0530 Subject: [PATCH 02/16] feat: check upload stream handler --- pkg/api/chunk.go | 59 ++++++++++-- pkg/api/chunk_stream.go | 192 ++++++++++++++++++++++++++++++++++++++++ pkg/api/router.go | 5 ++ 3 files changed, 247 insertions(+), 9 deletions(-) create mode 100644 pkg/api/chunk_stream.go diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 2a1719f8940..3819b5210f3 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -6,6 +6,7 @@ package api import ( "bytes" + "context" "errors" "fmt" "io" @@ -29,27 +30,67 @@ type chunkAddressResponse struct { Reference swarm.Address `json:"reference"` } -func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { - var ( - tag *tags.Tag - ctx = r.Context() - err error - ) +func (s *server) processUploadRequest( + r *http.Request, +) (ctx context.Context, tag *tags.Tag, putter storage.Putter, err error) { if h := r.Header.Get(SwarmTagHeader); h != "" { tag, err = s.getTag(h) if err != nil { s.logger.Debugf("chunk upload: get tag: %v", err) s.logger.Error("chunk upload: get tag") - jsonhttp.BadRequest(w, "cannot get tag") - return - + return nil, nil, nil, errors.New("cannot get tag") } // add the tag to the context if it exists ctx = sctx.SetTag(r.Context(), tag) // increment the StateSplit here since we dont have a splitter for the file upload + err = tag.Inc(tags.StateSplit) + if err != nil { + s.logger.Debugf("chunk upload: increment tag: %v", err) + s.logger.Error("chunk upload: increment tag") + return nil, nil, nil, errors.New("cannot increment tag") + } + } else { + ctx = r.Context() + } + + batch, err := requestPostageBatchId(r) + switch { + case errors.Is(err, errSwarmPostageBatchIDHeaderNotFound) && s.post.DefaultIssuer() != nil: + batch = s.post.DefaultIssuer().ID() + case err != nil: + s.logger.Debugf("chunk upload: postage batch id: %v", err) + s.logger.Error("chunk upload: postage batch id") + return nil, nil, nil, errors.New("invalid postage batch id") + } + + putter, err = newStamperPutter(s.storer, s.post, s.signer, batch) + if err != nil { + s.logger.Debugf("chunk upload: putter:%v", err) + s.logger.Error("chunk upload: putter") + switch { + case errors.Is(err, postage.ErrNotFound): + return nil, nil, nil, errors.New("batch not found") + case errors.Is(err, postage.ErrNotUsable): + return nil, nil, nil, errors.New("batch not usable") + } + return nil, nil, nil, errors.New("") + } + + return ctx, tag, putter, nil +} + +func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { + fmt.Println("handling") + ctx, tag, putter, err := s.processUploadRequest(r) + if err != nil { + jsonhttp.BadRequest(w, err.Error()) + return + } + + if tag != nil { err = tag.Inc(tags.StateSplit) if err != nil { s.logger.Debugf("chunk upload: increment tag: %v", err) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go new file mode 100644 index 00000000000..bff90c49167 --- /dev/null +++ b/pkg/api/chunk_stream.go @@ -0,0 +1,192 @@ +package api + +import ( + "context" + "errors" + "net/http" + "strings" + "time" + + "github.com/ethersphere/bee/pkg/cac" + "github.com/ethersphere/bee/pkg/jsonhttp" + "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/swarm" + "github.com/ethersphere/bee/pkg/tags" + "github.com/gorilla/websocket" +) + +func (s *server) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Request) { + + ctx, tag, putter, err := s.processUploadRequest(r) + if err != nil { + jsonhttp.BadRequest(w, err.Error()) + return + } + + upgrader := websocket.Upgrader{ + ReadBufferSize: swarm.ChunkSize, + WriteBufferSize: swarm.ChunkSize, + CheckOrigin: s.checkOrigin, + } + + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + s.logger.Debugf("chunk stream handler failed upgrading: %v", err) + s.logger.Error("chunk stream handler: upgrading") + jsonhttp.BadRequest(w, "not a websocket connection") + return + } + + s.wsWg.Add(1) + go s.handleUploadStream( + ctx, + c, + tag, + putter, + requestModePut(r), + strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true", + ) +} + +func (s *server) handleUploadStream( + ctx context.Context, + conn *websocket.Conn, + tag *tags.Tag, + putter storage.Putter, + mode storage.ModePut, + pin bool, +) { + defer s.wsWg.Done() + + var ( + gone = make(chan struct{}) + ticker = time.NewTicker(time.Second * 4) + err error + ) + defer func() { + ticker.Stop() + _ = conn.Close() + }() + + conn.SetCloseHandler(func(code int, text string) error { + s.logger.Debugf("chunk stream handler: client gone. code %d message %s", code, text) + close(gone) + return nil + }) + + // default handlers for ping/pong + conn.SetPingHandler(nil) + conn.SetPongHandler(nil) + + sendMsg := func(msgType int, buf []byte) error { + err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err != nil { + return err + } + err = conn.WriteMessage(msgType, buf) + if err != nil { + return err + } + return nil + } + + sendErrorAndClose := func(errmsg string) error { + return sendMsg(websocket.CloseMessage, []byte(errmsg)) + } + + for { + select { + case <-s.quit: + // shutdown + err = sendErrorAndClose("node shutting down") + if err != nil { + s.logger.Debugf("failed sending close message: %v", err) + } + return + case <-gone: + // client gone + return + case <-ticker.C: + err = sendMsg(websocket.PingMessage, nil) + if err != nil { + s.logger.Debugf("failed sending ping message: %v", err) + return + } + default: + // if there is no indication to stop, go ahead and read the next message + } + + err = conn.SetReadDeadline(time.Now().Add(readDeadline)) + if err != nil { + s.logger.Debugf("chunk stream set read deadline: %v", err) + return + } + + mt, msg, err := conn.ReadMessage() + if err != nil { + s.logger.Debugf("chunk stream handler read message error: %v", err) + return + } + + if mt != websocket.BinaryMessage { + s.logger.Debug("unexpected message received from client", mt) + sendErrorAndClose("invalid message") + return + } + + if len(msg) < swarm.SpanSize { + s.logger.Debug("chunk upload: not enough data") + return + } + + chunk, err := cac.NewWithDataSpan(msg) + if err != nil { + s.logger.Debugf("chunk upload: create chunk error: %v", err) + return + } + + seen, err := putter.Put(ctx, mode, chunk) + if err != nil { + s.logger.Debugf("chunk upload: chunk write error: %v, addr %s", err, chunk.Address()) + switch { + case errors.Is(err, postage.ErrBucketFull): + sendErrorAndClose("batch is overissued") + default: + sendErrorAndClose("chunk write error") + } + return + } else if len(seen) > 0 && seen[0] && tag != nil { + err := tag.Inc(tags.StateSeen) + if err != nil { + s.logger.Debugf("chunk upload: increment tag", err) + sendErrorAndClose("failed incrementing tag") + return + } + } + + if tag != nil { + // indicate that the chunk is stored + err = tag.Inc(tags.StateStored) + if err != nil { + s.logger.Debugf("chunk upload: increment tag", err) + sendErrorAndClose("failed incrementing tag") + return + } + } + + if pin { + if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil { + s.logger.Debugf("chunk upload: creation of pin for %q failed: %v", chunk.Address(), err) + sendErrorAndClose("failed creating pin") + return + } + } + + err = sendMsg(websocket.TextMessage, []byte("success")) + if err != nil { + s.logger.Debugf("failed sending success msg: %v", err) + return + } + } +} diff --git a/pkg/api/router.go b/pkg/api/router.go index 43de89080ac..5b656afc9f4 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -63,6 +63,11 @@ func (s *server) setupRouting() { ), }) + handle("/stream/chunks", web.ChainHandlers( + s.gatewayModeForbidEndpointHandler, + web.FinalHandlerFunc(s.chunkUploadStreamHandler), + )) + handle("/chunks/{addr}", jsonhttp.MethodHandler{ "GET": http.HandlerFunc(s.chunkGetHandler), }) From e24526d860b68d6b90bb007687b3db01a6a824ad Mon Sep 17 00:00:00 2001 From: alok Date: Mon, 28 Jun 2021 14:44:16 +0530 Subject: [PATCH 03/16] feat: chunk upload stream handler and tests --- pkg/api/api_test.go | 3 +- pkg/api/chunk_stream.go | 33 ++++++--- pkg/api/chunk_stream_test.go | 127 +++++++++++++++++++++++++++++++++++ pkg/api/export_test.go | 2 + 4 files changed, 153 insertions(+), 12 deletions(-) create mode 100644 pkg/api/chunk_stream_test.go diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 506a762e884..d2619a99e36 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -71,6 +71,7 @@ type testServerOptions struct { PostageContract postagecontract.Interface Post postage.Service Steward steward.Reuploader + WsHeaders http.Header } func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) { @@ -115,7 +116,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. if o.WsPath != "" { u := url.URL{Scheme: "ws", Host: ts.Listener.Addr().String(), Path: o.WsPath} - conn, _, err = websocket.DefaultDialer.Dial(u.String(), nil) + conn, _, err = websocket.DefaultDialer.Dial(u.String(), o.WsHeaders) if err != nil { t.Fatalf("dial: %v. url %v", err, u.String()) } diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index bff90c49167..b2e2512d3e7 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -16,6 +16,10 @@ import ( "github.com/gorilla/websocket" ) +const uploadPingTimout = time.Second * 4 + +var successWsMsg = []byte("successful") + func (s *server) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Request) { ctx, tag, putter, err := s.processUploadRequest(r) @@ -77,7 +81,10 @@ func (s *server) handleUploadStream( // default handlers for ping/pong conn.SetPingHandler(nil) - conn.SetPongHandler(nil) + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(readDeadline)) + return nil + }) sendMsg := func(msgType int, buf []byte) error { err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)) @@ -91,15 +98,19 @@ func (s *server) handleUploadStream( return nil } - sendErrorAndClose := func(errmsg string) error { - return sendMsg(websocket.CloseMessage, []byte(errmsg)) + sendErrorClose := func(code int, errmsg string) error { + return conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(code, errmsg), + time.Now().Add(writeDeadline), + ) } for { select { case <-s.quit: // shutdown - err = sendErrorAndClose("node shutting down") + err := sendErrorClose(websocket.CloseGoingAway, "node shutting down") if err != nil { s.logger.Debugf("failed sending close message: %v", err) } @@ -131,7 +142,7 @@ func (s *server) handleUploadStream( if mt != websocket.BinaryMessage { s.logger.Debug("unexpected message received from client", mt) - sendErrorAndClose("invalid message") + sendErrorClose(websocket.CloseUnsupportedData, "invalid message") return } @@ -151,16 +162,16 @@ func (s *server) handleUploadStream( s.logger.Debugf("chunk upload: chunk write error: %v, addr %s", err, chunk.Address()) switch { case errors.Is(err, postage.ErrBucketFull): - sendErrorAndClose("batch is overissued") + sendErrorClose(websocket.CloseInternalServerErr, "batch is overissued") default: - sendErrorAndClose("chunk write error") + sendErrorClose(websocket.CloseInternalServerErr, "chunk write error") } return } else if len(seen) > 0 && seen[0] && tag != nil { err := tag.Inc(tags.StateSeen) if err != nil { s.logger.Debugf("chunk upload: increment tag", err) - sendErrorAndClose("failed incrementing tag") + sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag") return } } @@ -170,7 +181,7 @@ func (s *server) handleUploadStream( err = tag.Inc(tags.StateStored) if err != nil { s.logger.Debugf("chunk upload: increment tag", err) - sendErrorAndClose("failed incrementing tag") + sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag") return } } @@ -178,12 +189,12 @@ func (s *server) handleUploadStream( if pin { if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil { s.logger.Debugf("chunk upload: creation of pin for %q failed: %v", chunk.Address(), err) - sendErrorAndClose("failed creating pin") + sendErrorClose(websocket.CloseInternalServerErr, "failed creating pin") return } } - err = sendMsg(websocket.TextMessage, []byte("success")) + err = sendMsg(websocket.TextMessage, successWsMsg) if err != nil { s.logger.Debugf("failed sending success msg: %v", err) return diff --git a/pkg/api/chunk_stream_test.go b/pkg/api/chunk_stream_test.go new file mode 100644 index 00000000000..4490522d2bb --- /dev/null +++ b/pkg/api/chunk_stream_test.go @@ -0,0 +1,127 @@ +package api_test + +import ( + "bytes" + "context" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/ethersphere/bee/pkg/api" + "github.com/ethersphere/bee/pkg/logging" + pinning "github.com/ethersphere/bee/pkg/pinning/mock" + mockpost "github.com/ethersphere/bee/pkg/postage/mock" + statestore "github.com/ethersphere/bee/pkg/statestore/mock" + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storage/mock" + testingc "github.com/ethersphere/bee/pkg/storage/testing" + "github.com/ethersphere/bee/pkg/swarm" + "github.com/ethersphere/bee/pkg/tags" + "github.com/gorilla/websocket" +) + +func TestChunkUploadStream(t *testing.T) { + + wsHeaders := http.Header{} + wsHeaders.Set("Content-Type", "application/octet-stream") + wsHeaders.Set("Swarm-Postage-Batch-Id", batchOkStr) + + var ( + statestoreMock = statestore.NewStateStore() + logger = logging.New(ioutil.Discard, 0) + tag = tags.NewTags(statestoreMock, logger) + storerMock = mock.NewStorer() + pinningMock = pinning.NewServiceMock() + _, wsConn, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Pinning: pinningMock, + Tags: tag, + Post: mockpost.New(mockpost.WithAcceptAll()), + WsPath: "/stream/chunks", + WsHeaders: wsHeaders, + }) + ) + + t.Run("upload and verify", func(t *testing.T) { + chsToGet := []swarm.Chunk{} + for i := 0; i < 5; i++ { + ch := testingc.GenerateTestRandomChunk() + + err := wsConn.SetWriteDeadline(time.Now().Add(time.Second)) + if err != nil { + t.Fatal(err) + } + + err = wsConn.WriteMessage(websocket.BinaryMessage, ch.Data()) + if err != nil { + t.Fatal(err) + } + + err = wsConn.SetReadDeadline(time.Now().Add(time.Second)) + if err != nil { + t.Fatal(err) + } + + mt, msg, err := wsConn.ReadMessage() + if err != nil { + t.Fatal(err) + } + + if mt != websocket.TextMessage || !bytes.Equal(msg, api.SuccessWsMsg) { + t.Fatal("invalid response", mt, string(msg)) + } + + chsToGet = append(chsToGet, ch) + } + + for _, c := range chsToGet { + ch, err := storerMock.Get(context.Background(), storage.ModeGetRequest, c.Address()) + if err != nil { + t.Fatal("failed to get chunk after upload", err) + } + if !ch.Equal(c) { + t.Fatal("invalid chunk read") + } + } + }) + + t.Run("close on incorrect msg", func(t *testing.T) { + serverClosed := make(chan struct{}) + var errResponse string + wsConn.SetCloseHandler(func(code int, msg string) error { + errResponse = msg + close(serverClosed) + return nil + }) + + err := wsConn.SetWriteDeadline(time.Now().Add(time.Second)) + if err != nil { + t.Fatal(err) + } + + err = wsConn.WriteMessage(websocket.TextMessage, []byte("incorrect msg")) + if err != nil { + t.Fatal(err) + } + + err = wsConn.SetReadDeadline(time.Now().Add(time.Second)) + if err != nil { + t.Fatal(err) + } + + _, _, err = wsConn.ReadMessage() + if err == nil { + t.Fatal("expected failure on read") + } + + select { + case <-time.After(time.Second * 5): + t.Fatal("waited 5 secs for server to close on error") + case <-serverClosed: + if errResponse != "invalid message" { + t.Fatalf("incorrect response on error, exp: (invalid message) got (%s)", errResponse) + } + } + }) +} diff --git a/pkg/api/export_test.go b/pkg/api/export_test.go index 8e6331d7a11..e6115e62c1d 100644 --- a/pkg/api/export_test.go +++ b/pkg/api/export_test.go @@ -42,6 +42,8 @@ var ( FeedMetadataEntryOwner = feedMetadataEntryOwner FeedMetadataEntryTopic = feedMetadataEntryTopic FeedMetadataEntryType = feedMetadataEntryType + + SuccessWsMsg = successWsMsg ) func (s *Server) ResolveNameOrAddress(str string) (swarm.Address, error) { From 25669c115beed862309b07805d7f7dcddcd52138 Mon Sep 17 00:00:00 2001 From: alok Date: Mon, 28 Jun 2021 14:56:33 +0530 Subject: [PATCH 04/16] fix: check close error instead of close handler --- pkg/api/chunk_stream_test.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/pkg/api/chunk_stream_test.go b/pkg/api/chunk_stream_test.go index 4490522d2bb..602ea51f880 100644 --- a/pkg/api/chunk_stream_test.go +++ b/pkg/api/chunk_stream_test.go @@ -87,14 +87,6 @@ func TestChunkUploadStream(t *testing.T) { }) t.Run("close on incorrect msg", func(t *testing.T) { - serverClosed := make(chan struct{}) - var errResponse string - wsConn.SetCloseHandler(func(code int, msg string) error { - errResponse = msg - close(serverClosed) - return nil - }) - err := wsConn.SetWriteDeadline(time.Now().Add(time.Second)) if err != nil { t.Fatal(err) @@ -114,13 +106,11 @@ func TestChunkUploadStream(t *testing.T) { if err == nil { t.Fatal("expected failure on read") } - - select { - case <-time.After(time.Second * 5): - t.Fatal("waited 5 secs for server to close on error") - case <-serverClosed: - if errResponse != "invalid message" { - t.Fatalf("incorrect response on error, exp: (invalid message) got (%s)", errResponse) + if cerr, ok := err.(*websocket.CloseError); !ok { + t.Fatal("invalid error on read") + } else { + if cerr.Text != "invalid message" { + t.Fatalf("incorrect response on error, exp: (invalid message) got (%s)", cerr.Text) } } }) From 72ac978dbe22bdf078f65e42ee035554cb3620ae Mon Sep 17 00:00:00 2001 From: alok Date: Wed, 21 Jul 2021 18:03:22 +0530 Subject: [PATCH 05/16] fix: rebase --- pkg/api/chunk.go | 28 +--------------------------- 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 3819b5210f3..0f84c3e52ae 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -57,10 +57,7 @@ func (s *server) processUploadRequest( } batch, err := requestPostageBatchId(r) - switch { - case errors.Is(err, errSwarmPostageBatchIDHeaderNotFound) && s.post.DefaultIssuer() != nil: - batch = s.post.DefaultIssuer().ID() - case err != nil: + if err != nil { s.logger.Debugf("chunk upload: postage batch id: %v", err) s.logger.Error("chunk upload: postage batch id") return nil, nil, nil, errors.New("invalid postage batch id") @@ -126,29 +123,6 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { return } - batch, err := requestPostageBatchId(r) - if err != nil { - s.logger.Debugf("chunk upload: postage batch id: %v", err) - s.logger.Error("chunk upload: postage batch id") - jsonhttp.BadRequest(w, "invalid postage batch id") - return - } - - putter, err := newStamperPutter(s.storer, s.post, s.signer, batch) - if err != nil { - s.logger.Debugf("chunk upload: putter:%v", err) - s.logger.Error("chunk upload: putter") - switch { - case errors.Is(err, postage.ErrNotFound): - jsonhttp.BadRequest(w, "batch not found") - case errors.Is(err, postage.ErrNotUsable): - jsonhttp.BadRequest(w, "batch not usable yet") - default: - jsonhttp.BadRequest(w, nil) - } - return - } - seen, err := putter.Put(ctx, requestModePut(r), chunk) if err != nil { s.logger.Debugf("chunk upload: chunk write error: %v, addr %s", err, chunk.Address()) From 166086dbad1614be079e1bbeab914ec146485e17 Mon Sep 17 00:00:00 2001 From: alok Date: Wed, 21 Jul 2021 18:15:29 +0530 Subject: [PATCH 06/16] fix: tests, lint and deepsource --- pkg/api/chunk.go | 9 --------- pkg/api/chunk_stream.go | 17 ++++++++--------- pkg/api/chunk_stream_test.go | 6 ++---- 3 files changed, 10 insertions(+), 22 deletions(-) diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 0f84c3e52ae..e7c54466ee3 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -44,14 +44,6 @@ func (s *server) processUploadRequest( // add the tag to the context if it exists ctx = sctx.SetTag(r.Context(), tag) - - // increment the StateSplit here since we dont have a splitter for the file upload - err = tag.Inc(tags.StateSplit) - if err != nil { - s.logger.Debugf("chunk upload: increment tag: %v", err) - s.logger.Error("chunk upload: increment tag") - return nil, nil, nil, errors.New("cannot increment tag") - } } else { ctx = r.Context() } @@ -80,7 +72,6 @@ func (s *server) processUploadRequest( } func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { - fmt.Println("handling") ctx, tag, putter, err := s.processUploadRequest(r) if err != nil { jsonhttp.BadRequest(w, err.Error()) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index b2e2512d3e7..f95947d338c 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -65,7 +65,7 @@ func (s *server) handleUploadStream( var ( gone = make(chan struct{}) - ticker = time.NewTicker(time.Second * 4) + ticker = time.NewTicker(uploadPingTimout) err error ) defer func() { @@ -82,8 +82,7 @@ func (s *server) handleUploadStream( // default handlers for ping/pong conn.SetPingHandler(nil) conn.SetPongHandler(func(string) error { - conn.SetReadDeadline(time.Now().Add(readDeadline)) - return nil + return conn.SetReadDeadline(time.Now().Add(readDeadline)) }) sendMsg := func(msgType int, buf []byte) error { @@ -98,22 +97,22 @@ func (s *server) handleUploadStream( return nil } - sendErrorClose := func(code int, errmsg string) error { - return conn.WriteControl( + sendErrorClose := func(code int, errmsg string) { + err := conn.WriteControl( websocket.CloseMessage, websocket.FormatCloseMessage(code, errmsg), time.Now().Add(writeDeadline), ) + if err != nil { + s.logger.Errorf("failed sending close msg with err, reason: %s", err.Error()) + } } for { select { case <-s.quit: // shutdown - err := sendErrorClose(websocket.CloseGoingAway, "node shutting down") - if err != nil { - s.logger.Debugf("failed sending close message: %v", err) - } + sendErrorClose(websocket.CloseGoingAway, "node shutting down") return case <-gone: // client gone diff --git a/pkg/api/chunk_stream_test.go b/pkg/api/chunk_stream_test.go index 602ea51f880..b16ecee5a5c 100644 --- a/pkg/api/chunk_stream_test.go +++ b/pkg/api/chunk_stream_test.go @@ -108,10 +108,8 @@ func TestChunkUploadStream(t *testing.T) { } if cerr, ok := err.(*websocket.CloseError); !ok { t.Fatal("invalid error on read") - } else { - if cerr.Text != "invalid message" { - t.Fatalf("incorrect response on error, exp: (invalid message) got (%s)", cerr.Text) - } + } else if cerr.Text != "invalid message" { + t.Fatalf("incorrect response on error, exp: (invalid message) got (%s)", cerr.Text) } }) } From 8ac00bcc2b7bf8e7ff3a493f5dc59c484d78a02a Mon Sep 17 00:00:00 2001 From: alok Date: Wed, 21 Jul 2021 18:17:58 +0530 Subject: [PATCH 07/16] fix: review comments --- pkg/api/router.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/api/router.go b/pkg/api/router.go index 5b656afc9f4..154ba693f64 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -63,8 +63,8 @@ func (s *server) setupRouting() { ), }) - handle("/stream/chunks", web.ChainHandlers( - s.gatewayModeForbidEndpointHandler, + handle("/chunks/stream", web.ChainHandlers( + s.newTracingHandler("chunks-stream-upload"), web.FinalHandlerFunc(s.chunkUploadStreamHandler), )) From 916782c669e83fda856239c7375a124158778369 Mon Sep 17 00:00:00 2001 From: alok Date: Wed, 21 Jul 2021 19:17:38 +0530 Subject: [PATCH 08/16] fix: test --- pkg/api/chunk_stream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/chunk_stream_test.go b/pkg/api/chunk_stream_test.go index b16ecee5a5c..5ff8ca6516a 100644 --- a/pkg/api/chunk_stream_test.go +++ b/pkg/api/chunk_stream_test.go @@ -38,7 +38,7 @@ func TestChunkUploadStream(t *testing.T) { Pinning: pinningMock, Tags: tag, Post: mockpost.New(mockpost.WithAcceptAll()), - WsPath: "/stream/chunks", + WsPath: "/chunks/stream", WsHeaders: wsHeaders, }) ) From 8583ee0a13d62003d2e58ec94779e798aef77582 Mon Sep 17 00:00:00 2001 From: alok Date: Thu, 22 Jul 2021 12:42:07 +0530 Subject: [PATCH 09/16] fix: remove pingpong handling, not required --- pkg/api/chunk_stream.go | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index f95947d338c..59edaad1941 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -64,12 +64,10 @@ func (s *server) handleUploadStream( defer s.wsWg.Done() var ( - gone = make(chan struct{}) - ticker = time.NewTicker(uploadPingTimout) - err error + gone = make(chan struct{}) + err error ) defer func() { - ticker.Stop() _ = conn.Close() }() @@ -79,12 +77,6 @@ func (s *server) handleUploadStream( return nil }) - // default handlers for ping/pong - conn.SetPingHandler(nil) - conn.SetPongHandler(func(string) error { - return conn.SetReadDeadline(time.Now().Add(readDeadline)) - }) - sendMsg := func(msgType int, buf []byte) error { err := conn.SetWriteDeadline(time.Now().Add(writeDeadline)) if err != nil { @@ -117,12 +109,6 @@ func (s *server) handleUploadStream( case <-gone: // client gone return - case <-ticker.C: - err = sendMsg(websocket.PingMessage, nil) - if err != nil { - s.logger.Debugf("failed sending ping message: %v", err) - return - } default: // if there is no indication to stop, go ahead and read the next message } From c3faa21e730ac4508bf228f2f2183d8e30dbcd28 Mon Sep 17 00:00:00 2001 From: alok Date: Thu, 22 Jul 2021 12:42:45 +0530 Subject: [PATCH 10/16] fix: remove unused const --- pkg/api/chunk_stream.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index 59edaad1941..51064c0ee71 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -16,8 +16,6 @@ import ( "github.com/gorilla/websocket" ) -const uploadPingTimout = time.Second * 4 - var successWsMsg = []byte("successful") func (s *server) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Request) { From b21ff6027fd795532edbee94420917b37fddfcc9 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Wed, 28 Jul 2021 18:06:44 +0530 Subject: [PATCH 11/16] fix: review comments and openapi changes --- openapi/Swarm.yaml | 16 +++++++++++++++ pkg/api/chunk_stream.go | 40 +++++++++++++++++++++++------------- pkg/api/chunk_stream_test.go | 4 ++++ 3 files changed, 46 insertions(+), 14 deletions(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 9f7c8f75b9f..457bce081c5 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -164,6 +164,22 @@ paths: default: description: Default response + "/chunks/stream": + get: + summary: "Upload stream of chunks" + tags: + - Chunk + parameters: + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmTagParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" + responses: + "200": + description: "Returns a Websocket connection on which stream of chunks can be uploaded. Each chunk sent is acknowledged using a text response `success`. Chunks should be packaged as binary messages for uploading." + "400": + $ref: "SwarmCommon.yaml#/components/responses/400" + default: + description: Default response "/bzz": post: summary: "Upload file or a collection of files" diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index 51064c0ee71..973f24aaf0d 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -1,3 +1,7 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package api import ( @@ -94,7 +98,7 @@ func (s *server) handleUploadStream( time.Now().Add(writeDeadline), ) if err != nil { - s.logger.Errorf("failed sending close msg with err, reason: %s", err.Error()) + s.logger.Errorf("chunk stream handler: failed sending close msg") } } @@ -113,36 +117,42 @@ func (s *server) handleUploadStream( err = conn.SetReadDeadline(time.Now().Add(readDeadline)) if err != nil { - s.logger.Debugf("chunk stream set read deadline: %v", err) + s.logger.Debugf("chunk stream handler: set read deadline: %v", err) + s.logger.Error("chunk stream handler: set read deadline") return } mt, msg, err := conn.ReadMessage() if err != nil { - s.logger.Debugf("chunk stream handler read message error: %v", err) + s.logger.Debugf("chunk stream handler: read message error: %v", err) + s.logger.Error("chunk stream handler: read message error") return } if mt != websocket.BinaryMessage { - s.logger.Debug("unexpected message received from client", mt) + s.logger.Debug("chunk stream handler: unexpected message received from client", mt) + s.logger.Error("chunk stream handler: unexpected message received from client") sendErrorClose(websocket.CloseUnsupportedData, "invalid message") return } if len(msg) < swarm.SpanSize { - s.logger.Debug("chunk upload: not enough data") + s.logger.Debug("chunk stream handler: not enough data") + s.logger.Error("chunk stream handler: not enough data") return } chunk, err := cac.NewWithDataSpan(msg) if err != nil { - s.logger.Debugf("chunk upload: create chunk error: %v", err) + s.logger.Debugf("chunk stream handler: create chunk error: %v", err) + s.logger.Error("chunk stream handler: failed creating chunk") return } seen, err := putter.Put(ctx, mode, chunk) if err != nil { - s.logger.Debugf("chunk upload: chunk write error: %v, addr %s", err, chunk.Address()) + s.logger.Debugf("chunk stream handler: chunk write error: %v, addr %s", err, chunk.Address()) + s.logger.Error("chunk stream handler: chunk write error") switch { case errors.Is(err, postage.ErrBucketFull): sendErrorClose(websocket.CloseInternalServerErr, "batch is overissued") @@ -153,17 +163,17 @@ func (s *server) handleUploadStream( } else if len(seen) > 0 && seen[0] && tag != nil { err := tag.Inc(tags.StateSeen) if err != nil { - s.logger.Debugf("chunk upload: increment tag", err) + s.logger.Debugf("chunk stream handler: increment tag", err) + s.logger.Error("chunk stream handler: increment tag") sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag") return } - } - - if tag != nil { + } else if tag != nil { // indicate that the chunk is stored err = tag.Inc(tags.StateStored) if err != nil { - s.logger.Debugf("chunk upload: increment tag", err) + s.logger.Debugf("chunk stream handler: increment tag", err) + s.logger.Error("chunk stream handler: increment tag") sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag") return } @@ -171,7 +181,8 @@ func (s *server) handleUploadStream( if pin { if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil { - s.logger.Debugf("chunk upload: creation of pin for %q failed: %v", chunk.Address(), err) + s.logger.Debugf("chunk stream handler: creation of pin for %q failed: %v", chunk.Address(), err) + s.logger.Error("chunk stream handler: creation of pin failed") sendErrorClose(websocket.CloseInternalServerErr, "failed creating pin") return } @@ -179,7 +190,8 @@ func (s *server) handleUploadStream( err = sendMsg(websocket.TextMessage, successWsMsg) if err != nil { - s.logger.Debugf("failed sending success msg: %v", err) + s.logger.Debugf("chunk stream handler: failed sending success msg: %v", err) + s.logger.Error("chunk stream handler: failed sending confirmation") return } } diff --git a/pkg/api/chunk_stream_test.go b/pkg/api/chunk_stream_test.go index 5ff8ca6516a..f1d84bdbc43 100644 --- a/pkg/api/chunk_stream_test.go +++ b/pkg/api/chunk_stream_test.go @@ -1,3 +1,7 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package api_test import ( From 208b528d1ca1057697378e83c2d208e4e8b4afce Mon Sep 17 00:00:00 2001 From: alok Date: Mon, 2 Aug 2021 16:30:05 +0530 Subject: [PATCH 12/16] fix: review comments --- openapi/Swarm.yaml | 2 +- pkg/api/chunk_stream.go | 4 ++-- pkg/api/chunk_stream_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 457bce081c5..083f27e9c37 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -175,7 +175,7 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" responses: "200": - description: "Returns a Websocket connection on which stream of chunks can be uploaded. Each chunk sent is acknowledged using a text response `success`. Chunks should be packaged as binary messages for uploading." + description: "Returns a Websocket connection on which stream of chunks can be uploaded. Each chunk sent is acknowledged using a binary response `0` which serves as confirmation of upload of single chunk. Chunks should be packaged as binary messages for uploading." "400": $ref: "SwarmCommon.yaml#/components/responses/400" default: diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index 973f24aaf0d..90136a33bc0 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -20,7 +20,7 @@ import ( "github.com/gorilla/websocket" ) -var successWsMsg = []byte("successful") +var successWsMsg = []byte{} func (s *server) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Request) { @@ -188,7 +188,7 @@ func (s *server) handleUploadStream( } } - err = sendMsg(websocket.TextMessage, successWsMsg) + err = sendMsg(websocket.BinaryMessage, successWsMsg) if err != nil { s.logger.Debugf("chunk stream handler: failed sending success msg: %v", err) s.logger.Error("chunk stream handler: failed sending confirmation") diff --git a/pkg/api/chunk_stream_test.go b/pkg/api/chunk_stream_test.go index f1d84bdbc43..41ffb566a58 100644 --- a/pkg/api/chunk_stream_test.go +++ b/pkg/api/chunk_stream_test.go @@ -72,7 +72,7 @@ func TestChunkUploadStream(t *testing.T) { t.Fatal(err) } - if mt != websocket.TextMessage || !bytes.Equal(msg, api.SuccessWsMsg) { + if mt != websocket.BinaryMessage || !bytes.Equal(msg, api.SuccessWsMsg) { t.Fatal("invalid response", mt, string(msg)) } From 22fdbf569d4aa8e3881bdfe61d38c8d9cdf5e26f Mon Sep 17 00:00:00 2001 From: alok Date: Mon, 2 Aug 2021 18:42:12 +0530 Subject: [PATCH 13/16] fix: review comment --- pkg/api/chunk.go | 11 ++++++++--- pkg/api/chunk_stream.go | 7 +++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index e7c54466ee3..9d60f4c7d94 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -133,9 +133,7 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.BadRequest(w, "increment tag") return } - } - - if tag != nil { + } else if tag != nil { // indicate that the chunk is stored err = tag.Inc(tags.StateStored) if err != nil { @@ -151,6 +149,13 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil { s.logger.Debugf("chunk upload: creation of pin for %q failed: %v", chunk.Address(), err) s.logger.Error("chunk upload: creation of pin failed") + // since we already increment the pin counter because of the ModePut, we need + // to delete the pin here to prevent the pin counter from never going to 0 + err = s.pinning.DeletePin(ctx, chunk.Address()) + if err != nil { + s.logger.Debugf("chunk upload: deletion of pin for %q failed: %v", chunk.Address(), err) + s.logger.Error("chunk upload: deletion of pin failed") + } jsonhttp.InternalServerError(w, nil) return } diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index 90136a33bc0..a263fce0dd9 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -183,6 +183,13 @@ func (s *server) handleUploadStream( if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil { s.logger.Debugf("chunk stream handler: creation of pin for %q failed: %v", chunk.Address(), err) s.logger.Error("chunk stream handler: creation of pin failed") + // since we already increment the pin counter because of the ModePut, we need + // to delete the pin here to prevent the pin counter from never going to 0 + err = s.pinning.DeletePin(ctx, chunk.Address()) + if err != nil { + s.logger.Debugf("chunk stream handler: deletion of pin for %q failed: %v", chunk.Address(), err) + s.logger.Error("chunk stream handler: deletion of pin failed") + } sendErrorClose(websocket.CloseInternalServerErr, "failed creating pin") return } From d8e97610df58f56da2ee078a1a74a6672e3a5388 Mon Sep 17 00:00:00 2001 From: alok Date: Mon, 2 Aug 2021 20:40:22 +0530 Subject: [PATCH 14/16] feat: fix tag logic and add test --- pkg/api/chunk.go | 8 ++--- pkg/api/chunk_stream.go | 16 +++++++-- pkg/api/tag_test.go | 72 +++++++++++++++++++++++++++++++++++------ 3 files changed, 81 insertions(+), 15 deletions(-) diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 9d60f4c7d94..38ef6e6c0d7 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -133,7 +133,9 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.BadRequest(w, "increment tag") return } - } else if tag != nil { + } + + if tag != nil { // indicate that the chunk is stored err = tag.Inc(tags.StateStored) if err != nil { @@ -149,9 +151,7 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil { s.logger.Debugf("chunk upload: creation of pin for %q failed: %v", chunk.Address(), err) s.logger.Error("chunk upload: creation of pin failed") - // since we already increment the pin counter because of the ModePut, we need - // to delete the pin here to prevent the pin counter from never going to 0 - err = s.pinning.DeletePin(ctx, chunk.Address()) + err = s.storer.Set(ctx, storage.ModeSetUnpin, chunk.Address()) if err != nil { s.logger.Debugf("chunk upload: deletion of pin for %q failed: %v", chunk.Address(), err) s.logger.Error("chunk upload: deletion of pin failed") diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index a263fce0dd9..9448a3fa949 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -136,6 +136,16 @@ func (s *server) handleUploadStream( return } + if tag != nil { + err = tag.Inc(tags.StateSplit) + if err != nil { + s.logger.Debug("chunk stream handler: failed incrementing tag", err) + s.logger.Error("chunk stream handler: failed incrementing tag") + sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag") + return + } + } + if len(msg) < swarm.SpanSize { s.logger.Debug("chunk stream handler: not enough data") s.logger.Error("chunk stream handler: not enough data") @@ -168,7 +178,9 @@ func (s *server) handleUploadStream( sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag") return } - } else if tag != nil { + } + + if tag != nil { // indicate that the chunk is stored err = tag.Inc(tags.StateStored) if err != nil { @@ -185,7 +197,7 @@ func (s *server) handleUploadStream( s.logger.Error("chunk stream handler: creation of pin failed") // since we already increment the pin counter because of the ModePut, we need // to delete the pin here to prevent the pin counter from never going to 0 - err = s.pinning.DeletePin(ctx, chunk.Address()) + err = s.storer.Set(ctx, storage.ModeSetUnpin, chunk.Address()) if err != nil { s.logger.Debugf("chunk stream handler: deletion of pin for %q failed: %v", chunk.Address(), err) s.logger.Error("chunk stream handler: deletion of pin failed") diff --git a/pkg/api/tag_test.go b/pkg/api/tag_test.go index 3dfa7bb5247..0eedd67b7c6 100644 --- a/pkg/api/tag_test.go +++ b/pkg/api/tag_test.go @@ -9,9 +9,11 @@ import ( "fmt" "io/ioutil" "net/http" + "net/url" "sort" "strconv" "testing" + "time" "github.com/ethersphere/bee/pkg/logging" mockpost "github.com/ethersphere/bee/pkg/postage/mock" @@ -25,6 +27,7 @@ import ( "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm/test" "github.com/ethersphere/bee/pkg/tags" + "github.com/gorilla/websocket" "gitlab.com/nolash/go-mockbytes" ) @@ -35,16 +38,17 @@ type fileUploadResponse struct { func tagsWithIdResource(id uint32) string { return fmt.Sprintf("/tags/%d", id) } func TestTags(t *testing.T) { + var ( - bzzResource = "/bzz" - bytesResource = "/bytes" - chunksResource = "/chunks" - tagsResource = "/tags" - chunk = testingc.GenerateTestRandomChunk() - mockStatestore = statestore.NewStateStore() - logger = logging.New(ioutil.Discard, 0) - tag = tags.NewTags(mockStatestore, logger) - client, _, _ = newTestServer(t, testServerOptions{ + bzzResource = "/bzz" + bytesResource = "/bytes" + chunksResource = "/chunks" + tagsResource = "/tags" + chunk = testingc.GenerateTestRandomChunk() + mockStatestore = statestore.NewStateStore() + logger = logging.New(ioutil.Discard, 0) + tag = tags.NewTags(mockStatestore, logger) + client, _, listenAddr = newTestServer(t, testServerOptions{ Storer: mock.NewStorer(), Tags: tag, Logger: logger, @@ -123,6 +127,56 @@ func TestTags(t *testing.T) { tagValueTest(t, tr.Uid, 1, 1, 1, 0, 0, 0, swarm.ZeroAddress, client) }) + t.Run("create tag upload chunk stream", func(t *testing.T) { + // create a tag using the API + tr := api.TagResponse{} + jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated, + jsonhttptest.WithJSONRequestBody(api.TagRequest{}), + jsonhttptest.WithUnmarshalJSONResponse(&tr), + ) + + wsHeaders := http.Header{} + wsHeaders.Set("Content-Type", "application/octet-stream") + wsHeaders.Set(api.SwarmPostageBatchIdHeader, batchOkStr) + wsHeaders.Set(api.SwarmTagHeader, strconv.FormatUint(uint64(tr.Uid), 10)) + + u := url.URL{Scheme: "ws", Host: listenAddr, Path: "/chunks/stream"} + wsConn, _, err := websocket.DefaultDialer.Dial(u.String(), wsHeaders) + if err != nil { + t.Fatalf("dial: %v. url %v", err, u.String()) + } + + for i := 0; i < 5; i++ { + ch := testingc.GenerateTestRandomChunk() + + err := wsConn.SetWriteDeadline(time.Now().Add(time.Second)) + if err != nil { + t.Fatal(err) + } + + err = wsConn.WriteMessage(websocket.BinaryMessage, ch.Data()) + if err != nil { + t.Fatal(err) + } + + err = wsConn.SetReadDeadline(time.Now().Add(time.Second)) + if err != nil { + t.Fatal(err) + } + + mt, msg, err := wsConn.ReadMessage() + if err != nil { + t.Fatal(err) + } + + if mt != websocket.BinaryMessage || !bytes.Equal(msg, api.SuccessWsMsg) { + t.Fatal("invalid response", mt, string(msg)) + } + } + + tagValueTest(t, tr.Uid, 5, 5, 0, 0, 0, 0, swarm.ZeroAddress, client) + }) + t.Run("list tags", func(t *testing.T) { // list all current tags var resp api.ListTagsResponse From 2de3be2731a55052fea40b9917d4a2973736a370 Mon Sep 17 00:00:00 2001 From: alok Date: Tue, 3 Aug 2021 10:53:15 +0530 Subject: [PATCH 15/16] fix: minor review comment --- pkg/api/chunk.go | 2 +- pkg/api/chunk_stream.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 38ef6e6c0d7..0290ebc6f69 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -153,7 +153,7 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { s.logger.Error("chunk upload: creation of pin failed") err = s.storer.Set(ctx, storage.ModeSetUnpin, chunk.Address()) if err != nil { - s.logger.Debugf("chunk upload: deletion of pin for %q failed: %v", chunk.Address(), err) + s.logger.Debugf("chunk upload: deletion of pin for %s failed: %v", chunk.Address(), err) s.logger.Error("chunk upload: deletion of pin failed") } jsonhttp.InternalServerError(w, nil) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index 9448a3fa949..92d6a9ea34f 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -199,7 +199,7 @@ func (s *server) handleUploadStream( // to delete the pin here to prevent the pin counter from never going to 0 err = s.storer.Set(ctx, storage.ModeSetUnpin, chunk.Address()) if err != nil { - s.logger.Debugf("chunk stream handler: deletion of pin for %q failed: %v", chunk.Address(), err) + s.logger.Debugf("chunk stream handler: deletion of pin for %s failed: %v", chunk.Address(), err) s.logger.Error("chunk stream handler: deletion of pin failed") } sendErrorClose(websocket.CloseInternalServerErr, "failed creating pin") From 4e61198901058c00850d3c93f153fce6e66e975c Mon Sep 17 00:00:00 2001 From: alok Date: Tue, 3 Aug 2021 13:28:17 +0530 Subject: [PATCH 16/16] fix: review comments --- pkg/api/chunk.go | 4 ++-- pkg/api/pss.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 0290ebc6f69..127348717cf 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -57,7 +57,7 @@ func (s *server) processUploadRequest( putter, err = newStamperPutter(s.storer, s.post, s.signer, batch) if err != nil { - s.logger.Debugf("chunk upload: putter:%v", err) + s.logger.Debugf("chunk upload: putter: %v", err) s.logger.Error("chunk upload: putter") switch { case errors.Is(err, postage.ErrNotFound): @@ -65,7 +65,7 @@ func (s *server) processUploadRequest( case errors.Is(err, postage.ErrNotUsable): return nil, nil, nil, errors.New("batch not usable") } - return nil, nil, nil, errors.New("") + return nil, nil, nil, err } return ctx, tag, putter, nil diff --git a/pkg/api/pss.go b/pkg/api/pss.go index 79e22c79231..f3b599588fc 100644 --- a/pkg/api/pss.go +++ b/pkg/api/pss.go @@ -23,7 +23,7 @@ import ( "github.com/gorilla/websocket" ) -var ( +const ( writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close readDeadline = 4 * time.Second // read deadline. should be smaller than the shutdown timeout on api close targetMaxLength = 2 // max target length in bytes, in order to prevent grieving by excess computation