Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: chunk stream upload endpoint #2230

Merged
merged 16 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,22 @@ paths:
default:
description: Default response

"/chunks/stream":
get:
summary: "Upload stream of chunks"
tags:
- Chunk
parameters:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmTagParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
responses:
"200":
description: "Returns a Websocket connection on which stream of chunks can be uploaded. Each chunk sent is acknowledged using a text response `success`. Chunks should be packaged as binary messages for uploading."
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
description: Default response
"/bzz":
post:
summary: "Upload file or a collection of files"
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type testServerOptions struct {
PostageContract postagecontract.Interface
Post postage.Service
Steward steward.Reuploader
WsHeaders http.Header
}

func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) {
Expand Down Expand Up @@ -115,7 +116,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.

if o.WsPath != "" {
u := url.URL{Scheme: "ws", Host: ts.Listener.Addr().String(), Path: o.WsPath}
conn, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
conn, _, err = websocket.DefaultDialer.Dial(u.String(), o.WsHeaders)
if err != nil {
t.Fatalf("dial: %v. url %v", err, u.String())
}
Expand Down
72 changes: 39 additions & 33 deletions pkg/api/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -29,27 +30,55 @@ type chunkAddressResponse struct {
Reference swarm.Address `json:"reference"`
}

func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
var (
tag *tags.Tag
ctx = r.Context()
err error
)
func (s *server) processUploadRequest(
r *http.Request,
) (ctx context.Context, tag *tags.Tag, putter storage.Putter, err error) {

if h := r.Header.Get(SwarmTagHeader); h != "" {
tag, err = s.getTag(h)
if err != nil {
s.logger.Debugf("chunk upload: get tag: %v", err)
s.logger.Error("chunk upload: get tag")
jsonhttp.BadRequest(w, "cannot get tag")
return

return nil, nil, nil, errors.New("cannot get tag")
}

// add the tag to the context if it exists
ctx = sctx.SetTag(r.Context(), tag)
} else {
ctx = r.Context()
}

batch, err := requestPostageBatchId(r)
if err != nil {
s.logger.Debugf("chunk upload: postage batch id: %v", err)
s.logger.Error("chunk upload: postage batch id")
return nil, nil, nil, errors.New("invalid postage batch id")
}

// increment the StateSplit here since we dont have a splitter for the file upload
putter, err = newStamperPutter(s.storer, s.post, s.signer, batch)
if err != nil {
s.logger.Debugf("chunk upload: putter:%v", err)
s.logger.Error("chunk upload: putter")
switch {
case errors.Is(err, postage.ErrNotFound):
return nil, nil, nil, errors.New("batch not found")
case errors.Is(err, postage.ErrNotUsable):
return nil, nil, nil, errors.New("batch not usable")
}
return nil, nil, nil, errors.New("")
}

return ctx, tag, putter, nil
}

func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
ctx, tag, putter, err := s.processUploadRequest(r)
if err != nil {
jsonhttp.BadRequest(w, err.Error())
return
}

if tag != nil {
err = tag.Inc(tags.StateSplit)
if err != nil {
s.logger.Debugf("chunk upload: increment tag: %v", err)
Expand Down Expand Up @@ -85,29 +114,6 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}

batch, err := requestPostageBatchId(r)
if err != nil {
s.logger.Debugf("chunk upload: postage batch id: %v", err)
s.logger.Error("chunk upload: postage batch id")
jsonhttp.BadRequest(w, "invalid postage batch id")
return
}

putter, err := newStamperPutter(s.storer, s.post, s.signer, batch)
if err != nil {
s.logger.Debugf("chunk upload: putter:%v", err)
s.logger.Error("chunk upload: putter")
switch {
case errors.Is(err, postage.ErrNotFound):
jsonhttp.BadRequest(w, "batch not found")
case errors.Is(err, postage.ErrNotUsable):
jsonhttp.BadRequest(w, "batch not usable yet")
default:
jsonhttp.BadRequest(w, nil)
}
return
}

seen, err := putter.Put(ctx, requestModePut(r), chunk)
if err != nil {
s.logger.Debugf("chunk upload: chunk write error: %v, addr %s", err, chunk.Address())
Expand Down
198 changes: 198 additions & 0 deletions pkg/api/chunk_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"context"
"errors"
"net/http"
"strings"
"time"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/websocket"
)

var successWsMsg = []byte("successful")

func (s *server) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Request) {

ctx, tag, putter, err := s.processUploadRequest(r)
if err != nil {
jsonhttp.BadRequest(w, err.Error())
return
}

upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkSize,
WriteBufferSize: swarm.ChunkSize,
CheckOrigin: s.checkOrigin,
}

c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
s.logger.Debugf("chunk stream handler failed upgrading: %v", err)
s.logger.Error("chunk stream handler: upgrading")
jsonhttp.BadRequest(w, "not a websocket connection")
return
}

s.wsWg.Add(1)
go s.handleUploadStream(
ctx,
c,
tag,
putter,
requestModePut(r),
strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true",
)
}

func (s *server) handleUploadStream(
ctx context.Context,
conn *websocket.Conn,
tag *tags.Tag,
putter storage.Putter,
mode storage.ModePut,
pin bool,
) {
defer s.wsWg.Done()

var (
gone = make(chan struct{})
err error
)
defer func() {
_ = conn.Close()
}()

conn.SetCloseHandler(func(code int, text string) error {
s.logger.Debugf("chunk stream handler: client gone. code %d message %s", code, text)
close(gone)
return nil
})

sendMsg := func(msgType int, buf []byte) error {
err := conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
return err
}
err = conn.WriteMessage(msgType, buf)
if err != nil {
return err
}
return nil
}

sendErrorClose := func(code int, errmsg string) {
err := conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(code, errmsg),
time.Now().Add(writeDeadline),
)
if err != nil {
s.logger.Errorf("chunk stream handler: failed sending close msg")
}
}

for {
select {
case <-s.quit:
// shutdown
sendErrorClose(websocket.CloseGoingAway, "node shutting down")
return
case <-gone:
// client gone
return
default:
// if there is no indication to stop, go ahead and read the next message
}

err = conn.SetReadDeadline(time.Now().Add(readDeadline))
if err != nil {
s.logger.Debugf("chunk stream handler: set read deadline: %v", err)
s.logger.Error("chunk stream handler: set read deadline")
return
}

mt, msg, err := conn.ReadMessage()
if err != nil {
s.logger.Debugf("chunk stream handler: read message error: %v", err)
s.logger.Error("chunk stream handler: read message error")
return
}

if mt != websocket.BinaryMessage {
s.logger.Debug("chunk stream handler: unexpected message received from client", mt)
s.logger.Error("chunk stream handler: unexpected message received from client")
sendErrorClose(websocket.CloseUnsupportedData, "invalid message")
return
}

if len(msg) < swarm.SpanSize {
s.logger.Debug("chunk stream handler: not enough data")
s.logger.Error("chunk stream handler: not enough data")
return
}

chunk, err := cac.NewWithDataSpan(msg)
if err != nil {
s.logger.Debugf("chunk stream handler: create chunk error: %v", err)
s.logger.Error("chunk stream handler: failed creating chunk")
return
}

seen, err := putter.Put(ctx, mode, chunk)
if err != nil {
s.logger.Debugf("chunk stream handler: chunk write error: %v, addr %s", err, chunk.Address())
s.logger.Error("chunk stream handler: chunk write error")
switch {
case errors.Is(err, postage.ErrBucketFull):
sendErrorClose(websocket.CloseInternalServerErr, "batch is overissued")
default:
sendErrorClose(websocket.CloseInternalServerErr, "chunk write error")
}
return
} else if len(seen) > 0 && seen[0] && tag != nil {
err := tag.Inc(tags.StateSeen)
if err != nil {
s.logger.Debugf("chunk stream handler: increment tag", err)
s.logger.Error("chunk stream handler: increment tag")
sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag")
return
}
} else if tag != nil {
// indicate that the chunk is stored
err = tag.Inc(tags.StateStored)
if err != nil {
s.logger.Debugf("chunk stream handler: increment tag", err)
s.logger.Error("chunk stream handler: increment tag")
sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag")
return
}
}

if pin {
if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil {
s.logger.Debugf("chunk stream handler: creation of pin for %q failed: %v", chunk.Address(), err)
s.logger.Error("chunk stream handler: creation of pin failed")
sendErrorClose(websocket.CloseInternalServerErr, "failed creating pin")
return
}
}

err = sendMsg(websocket.TextMessage, successWsMsg)
if err != nil {
s.logger.Debugf("chunk stream handler: failed sending success msg: %v", err)
s.logger.Error("chunk stream handler: failed sending confirmation")
return
}
}
}
Loading