Skip to content

Commit

Permalink
refactor: rename fragment pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
ItsNotGoodName committed Apr 22, 2022
1 parent 6d7a703 commit 7a72e3b
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 86 deletions.
2 changes: 1 addition & 1 deletion cmd/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
go upnpsub.ListenAndServe("", controlPoint)

// Subscribe to all radios
fragmentPub := pubsub.NewPub()
fragmentPub := pubsub.NewFragmentPub()
sub := fragmentPub.Subscribe(8, "")
go func() {
for s := range sub.Channel() {
Expand Down
81 changes: 81 additions & 0 deletions core/pubsub/fragment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package pubsub

import (
"sync"

"github.com/ItsNotGoodName/reciva-web-remote/core/state"
)

type FragmentPub struct {
subsMu sync.Mutex
subs map[*FragmentSub]string
}

func NewFragmentPub() *FragmentPub {
return &FragmentPub{
subsMu: sync.Mutex{},
subs: make(map[*FragmentSub]string),
}
}

func (p *FragmentPub) Unsubscribe(sub *FragmentSub) {
p.subsMu.Lock()
delete(p.subs, sub)
sub.close()
p.subsMu.Unlock()
}

func (p *FragmentPub) Subscribe(buffer int, uuid string) *FragmentSub {
sub := newFragmentSub(buffer)

p.subsMu.Lock()
p.subs[sub] = uuid
p.subsMu.Unlock()

return sub
}

func (p *FragmentPub) Publish(frag state.Fragment) {
p.subsMu.Lock()
for sub, uuid := range p.subs {
if uuid == frag.UUID || uuid == "" {
if !sub.send(&frag) {
delete(p.subs, sub)
sub.close()
}
}
}
p.subsMu.Unlock()
}

type FragmentSub struct {
channel chan *state.Fragment
open bool
}

func newFragmentSub(buffer int) *FragmentSub {
return &FragmentSub{
channel: make(chan *state.Fragment, buffer),
open: true,
}
}

func (s *FragmentSub) Channel() <-chan *state.Fragment {
return s.channel
}

func (s *FragmentSub) send(f *state.Fragment) bool {
select {
case s.channel <- f:
return true
default:
return false
}
}

func (s *FragmentSub) close() {
if s.open {
close(s.channel)
s.open = false
}
}
49 changes: 0 additions & 49 deletions core/pubsub/pub.go

This file was deleted.

35 changes: 0 additions & 35 deletions core/pubsub/sub.go

This file was deleted.

1 change: 0 additions & 1 deletion core/radio/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (rs *RunServiceImpl) Run(radio Radio, s state.State) {
case event := <-radio.subscription.Events():
fragment := state.NewFragment(radio.UUID)
parseEvent(event, &fragment)

handle(fragment)
}
}
Expand Down

0 comments on commit 7a72e3b

Please sign in to comment.