Skip to content

Commit

Permalink
feat: gsoc subscribe (#4901)
Browse files Browse the repository at this point in the history
Co-authored-by: Viktor Levente Tóth <toth.viktor.levente@gmail.com>
Co-authored-by: Acha Bill <57879913+acha-bill@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 18, 2024
1 parent 9e388ed commit a276067
Show file tree
Hide file tree
Showing 47 changed files with 1,344 additions and 248 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ jobs:
- name: Test soc
id: soc
run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-soc
- name: Test gsoc
id: gsoc
run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-gsoc
- name: Test pushsync (chunks)
id: pushsync-chunks-1
run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-pushsync-chunks
Expand Down
24 changes: 22 additions & 2 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,28 @@ paths:
default:
description: Default response

"/gsoc/subscribe/{address}":
get:
summary: Subscribe to GSOC payloads
tags:
- GSOC
- Subscribe
- Websocket
parameters:
- in: path
name: reference
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmReference"
required: true
description: "Single Owner Chunk address (which may have multiple payloads)"
responses:
"200":
description: Returns a WebSocket with a subscription for incoming message data on the requested SOC address.
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response

"/soc/{owner}/{id}":
post:
summary: Upload single owner chunk
Expand Down Expand Up @@ -847,8 +869,6 @@ paths:
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
required: true
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress"
Expand Down
15 changes: 14 additions & 1 deletion pkg/api/accesscontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
resp struct {
Reference swarm.Address `json:"reference"`
}
direct bool
}{
{
name: "bzz",
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
data: bytes.NewReader(sch.WrappedChunk.Data()),
expdata: sch.Chunk().Data(),
contenttype: "binary/octet-stream",
direct: true,
},
}

Expand All @@ -183,13 +185,24 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
upTestOpts = append(upTestOpts, jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "True"))
}
t.Run(v.name, func(t *testing.T) {
client, _, _, _ := newTestServer(t, testServerOptions{
client, _, _, chanStore := newTestServer(t, testServerOptions{
Storer: storerMock,
Logger: logger,
Post: mockpost.New(mockpost.WithAcceptAll()),
PublicKey: pk.PublicKey,
AccessControl: mockac.New(),
DirectUpload: v.direct,
})

if chanStore != nil {
chanStore.Subscribe(func(chunk swarm.Chunk) {
err := storerMock.Put(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}
})
}

header := jsonhttptest.Request(t, client, http.MethodPost, v.upurl, http.StatusCreated,
upTestOpts...,
)
Expand Down
11 changes: 10 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/p2p"
Expand Down Expand Up @@ -151,6 +152,7 @@ type Service struct {
storer Storer
resolver resolver.Interface
pss pss.Interface
gsoc gsoc.Listener
steward steward.Interface
logger log.Logger
loggerV1 log.Logger
Expand Down Expand Up @@ -254,6 +256,7 @@ type ExtraOptions struct {
Storer Storer
Resolver resolver.Interface
Pss pss.Interface
Gsoc gsoc.Listener
FeedFactory feeds.Factory
Post postage.Service
AccessControl accesscontrol.Controller
Expand Down Expand Up @@ -337,6 +340,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
s.storer = e.Storer
s.resolver = e.Resolver
s.pss = e.Pss
s.gsoc = e.Gsoc
s.feedFactory = e.FeedFactory
s.post = e.Post
s.accesscontrol = e.AccessControl
Expand Down Expand Up @@ -682,7 +686,12 @@ type putterSessionWrapper struct {
}

func (p *putterSessionWrapper) Put(ctx context.Context, chunk swarm.Chunk) error {
stamp, err := p.stamper.Stamp(chunk.Address())
idAddress, err := storage.IdentityAddress(chunk)
if err != nil {
return err
}

stamp, err := p.stamper.Stamp(chunk.Address(), idAddress)
if err != nil {
return err
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/pipeline"
"github.com/ethersphere/bee/v2/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/log"
p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock"
Expand Down Expand Up @@ -93,6 +94,7 @@ type testServerOptions struct {
StateStorer storage.StateStorer
Resolver resolver.Interface
Pss pss.Interface
Gsoc gsoc.Listener
WsPath string
WsPingPeriod time.Duration
Logger log.Logger
Expand Down Expand Up @@ -194,6 +196,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
Storer: o.Storer,
Resolver: o.Resolver,
Pss: o.Pss,
Gsoc: o.Gsoc,
FeedFactory: o.Feeds,
Post: o.Post,
AccessControl: o.AccessControl,
Expand Down Expand Up @@ -630,6 +633,7 @@ type chanStorer struct {
lock sync.Mutex
chunks map[string]struct{}
quit chan struct{}
subs []func(chunk swarm.Chunk)
}

func newChanStore(cc <-chan *pusher.Op) *chanStorer {
Expand All @@ -647,6 +651,9 @@ func (c *chanStorer) drain(cc <-chan *pusher.Op) {
case op := <-cc:
c.lock.Lock()
c.chunks[op.Chunk.Address().ByteString()] = struct{}{}
for _, h := range c.subs {
h(op.Chunk)
}
c.lock.Unlock()
op.Err <- nil
case <-c.quit:
Expand All @@ -667,6 +674,12 @@ func (c *chanStorer) Has(addr swarm.Address) bool {
return ok
}

func (c *chanStorer) Subscribe(f func(chunk swarm.Chunk)) {
c.lock.Lock()
defer c.lock.Unlock()
c.subs = append(c.subs, f)
}

func createRedistributionAgentService(
t *testing.T,
addr swarm.Address,
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *Service) envelopePostHandler(w http.ResponseWriter, r *http.Request) {
return
}

stamp, err := stamper.Stamp(paths.Address)
stamp, err := stamper.Stamp(paths.Address, paths.Address)
if err != nil {
logger.Debug("split write all failed", "error", err)
logger.Error(nil, "split write all failed")
Expand Down
120 changes: 120 additions & 0 deletions pkg/api/gsoc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"net/http"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("gsoc_subscribe").Build()

paths := struct {
Address swarm.Address `map:"address,resolve" validate:"required"`
}{}

if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkSize,
WriteBufferSize: swarm.ChunkSize,
CheckOrigin: s.checkOrigin,
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Debug("upgrade failed", "error", err)
logger.Error(nil, "upgrade failed")
jsonhttp.InternalServerError(w, "upgrade failed")
return
}

s.wsWg.Add(1)
go s.gsocListeningWs(conn, paths.Address)
}

func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) {
defer s.wsWg.Done()

var (
dataC = make(chan []byte)
gone = make(chan struct{})
ticker = time.NewTicker(s.WsPingPeriod)
err error
)
defer func() {
ticker.Stop()
_ = conn.Close()
}()
cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) {
select {
case dataC <- m:
case <-gone:
return
case <-s.quit:
return
}
})

defer cleanup()

conn.SetCloseHandler(func(code int, text string) error {
s.logger.Debug("gsoc ws: client gone", "code", code, "message", text)
close(gone)
return nil
})

for {
select {
case b := <-dataC:
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}

err = conn.WriteMessage(websocket.BinaryMessage, b)
if err != nil {
s.logger.Debug("gsoc ws: write message failed", "error", err)
return
}

case <-s.quit:
// shutdown
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}
err = conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
s.logger.Debug("gsoc ws: write close message failed", "error", err)
}
return
case <-gone:
// client gone
return
case <-ticker.C:
err = conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
s.logger.Debug("gsoc ws: set write deadline failed", "error", err)
return
}
if err = conn.WriteMessage(websocket.PingMessage, nil); err != nil {
// error encountered while pinging client. client probably gone
return
}
}
}
}
Loading

0 comments on commit a276067

Please sign in to comment.