forked from datarhei/gosrt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
177 lines (145 loc) · 4.06 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package srt
import (
"context"
"fmt"
"io"
"sync"
"github.com/datarhei/gosrt/internal/packet"
)
// PubSub is a publish/subscriber service for SRT connections.
type PubSub interface {
// Publish accepts a SRT connection where it reads from. It blocks
// until the connection closes. The returned error indicates why it
// stopped. There can be only one publisher.
Publish(c Conn) error
// Subscribe accepts a SRT connection where it writes the data from
// the publisher to. It blocks until an error happens. If the publisher
// disconnects, io.EOF is returned. There can be an arbitrary number
// of subscribers.
Subscribe(c Conn) error
}
type packetReadWriter interface {
readPacket() (packet.Packet, error)
writePacket(p packet.Packet) error
}
// pubSub is an implementation of the PubSub interface
type pubSub struct {
incoming chan packet.Packet
ctx context.Context
cancel context.CancelFunc
publish bool
publishLock sync.Mutex
listeners map[uint32]chan packet.Packet
listenersLock sync.Mutex
logger Logger
}
// PubSubConfig is for configuring a new PubSub
type PubSubConfig struct {
Logger Logger // Optional logger
}
// NewPubSub returns a PubSub. After the publishing connection closed
// this PubSub can't be used anymore.
func NewPubSub(config PubSubConfig) PubSub {
pb := &pubSub{
incoming: make(chan packet.Packet, 1024),
listeners: make(map[uint32]chan packet.Packet),
logger: config.Logger,
}
pb.ctx, pb.cancel = context.WithCancel(context.Background())
if pb.logger == nil {
pb.logger = NewLogger(nil)
}
go pb.broadcast()
return pb
}
func (pb *pubSub) broadcast() {
defer func() {
pb.logger.Print("pubsub:close", 0, 1, func() string { return "exiting broadcast loop" })
}()
pb.logger.Print("pubsub:new", 0, 1, func() string { return "starting broadcast loop" })
for {
select {
case <-pb.ctx.Done():
return
case p := <-pb.incoming:
pb.listenersLock.Lock()
for socketId, c := range pb.listeners {
pp := p.Clone()
select {
case c <- pp:
default:
pb.logger.Print("pubsub:error", socketId, 1, func() string { return "broadcast target queue is full" })
}
}
pb.listenersLock.Unlock()
// We don't need this packet anymore
p.Decommission()
}
}
}
func (pb *pubSub) Publish(c Conn) error {
pb.publishLock.Lock()
defer pb.publishLock.Unlock()
if pb.publish {
err := fmt.Errorf("only one publisher is allowed")
pb.logger.Print("pubsub:error", 0, 1, func() string { return err.Error() })
return err
}
var p packet.Packet
var err error
conn, ok := c.(packetReadWriter)
if !ok {
err := fmt.Errorf("the provided connection is not a SRT connection")
pb.logger.Print("pubsub:error", 0, 1, func() string { return err.Error() })
return err
}
socketId := c.SocketId()
pb.logger.Print("pubsub:publish", socketId, 1, func() string { return "new publisher" })
pb.publish = true
for {
p, err = conn.readPacket()
if err != nil {
pb.logger.Print("pubsub:error", socketId, 1, func() string { return err.Error() })
break
}
select {
case pb.incoming <- p:
default:
pb.logger.Print("pubsub:error", socketId, 1, func() string { return "incoming queue is full" })
}
}
pb.cancel()
return err
}
func (pb *pubSub) Subscribe(c Conn) error {
l := make(chan packet.Packet, 1024)
socketId := c.SocketId()
conn, ok := c.(packetReadWriter)
if !ok {
err := fmt.Errorf("the provided connection is not a SRT connection")
pb.logger.Print("pubsub:error", 0, 1, func() string { return err.Error() })
return err
}
pb.logger.Print("pubsub:subscribe", socketId, 1, func() string { return "new subscriber" })
pb.listenersLock.Lock()
pb.listeners[socketId] = l
pb.listenersLock.Unlock()
defer func() {
pb.listenersLock.Lock()
delete(pb.listeners, socketId)
pb.listenersLock.Unlock()
}()
for {
select {
case <-pb.ctx.Done():
return io.EOF
case p := <-l:
err := conn.writePacket(p)
p.Decommission()
if err != nil {
pb.logger.Print("pubsub:error", socketId, 1, func() string { return err.Error() })
return err
}
}
}
}