Skip to content

Commit

Permalink
Update Subscribe Request format to PR#277
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Nov 8, 2023
1 parent 03ab563 commit 0e24d14
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 148 deletions.
10 changes: 6 additions & 4 deletions examples/chat/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewClient(p *moqtransport.Peer) (*Client, error) {
c.peer.OnAnnouncement(func(s string) error {
return nil
})
c.peer.OnSubscription(func(trackname string, st *moqtransport.SendTrack) (uint64, time.Duration, error) {
c.peer.OnSubscription(func(_, trackname string, st *moqtransport.SendTrack) (uint64, time.Duration, error) {
namespace := strings.SplitN(trackname, "/", 3)
if len(namespace) < 2 {
return 0, 0, errors.New("invalid trackname")
Expand Down Expand Up @@ -89,7 +89,8 @@ func (c *Client) handleCatalogDeltas(roomID, username string, catalogTrack *moqt
if p == username {
continue
}
t, err := c.peer.Subscribe(fmt.Sprintf("moq-chat/%v/%v", roomID, p))
fullname := fmt.Sprintf("moq-chat/%v/%v", roomID, p)
t, err := c.peer.Subscribe(fullname, fullname)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -120,7 +121,7 @@ func (c *Client) joinRoom(roomID, username string) {
if err := c.peer.Announce(fmt.Sprintf("moq-chat/%v/%v", roomID, username)); err != nil {
log.Fatal(err)
}
catalogTrack, err := c.peer.Subscribe(fmt.Sprintf("moq-chat/%v", roomID))
catalogTrack, err := c.peer.Subscribe("", fmt.Sprintf("moq-chat/%v", roomID))
if err != nil {
log.Fatal(err)
}
Expand All @@ -139,7 +140,8 @@ func (c *Client) joinRoom(roomID, username string) {
if p == username {
continue
}
t, err := c.peer.Subscribe(fmt.Sprintf("moq-chat/%v/%v", roomID, p))
fullname := fmt.Sprintf("moq-chat/%v/%v", roomID, p)
t, err := c.peer.Subscribe(fullname, fullname)
if err != nil {
log.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion examples/chat/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func (r *room) join(username string, p *moqtransport.Peer) error {
if _, ok := r.publishers[username]; ok {
return errors.New("username already taken")
}
t, err := p.Subscribe(fmt.Sprintf("moq-chat/%v/%v", r.id, username))
fullname := fmt.Sprintf("moq-chat/%v/%v", r.id, username)
t, err := p.Subscribe(fullname, fullname)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion examples/chat/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *Server) peerHandler() moqtransport.PeerHandlerFunc {
return s.chatRooms[id].join(name, p)
})

p.OnSubscription(func(trackname string, t *moqtransport.SendTrack) (uint64, time.Duration, error) {
p.OnSubscription(func(_, trackname string, t *moqtransport.SendTrack) (uint64, time.Duration, error) {
if len(name) == 0 {
// Subscribe requires a username which has to be announced
// before subscribing
Expand Down
6 changes: 3 additions & 3 deletions examples/date-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func run(addr string) error {
log.Printf("got announcement: %v", s)
return nil
})
p.OnSubscription(func(s string, _ *moqtransport.SendTrack) (uint64, time.Duration, error) {
log.Printf("got subscription attempt: %v", s)
p.OnSubscription(func(namespace, name string, _ *moqtransport.SendTrack) (uint64, time.Duration, error) {
log.Printf("got subscription attempt: %v/%v", namespace, name)
return 0, time.Duration(0), nil
})
go func() {
Expand All @@ -43,7 +43,7 @@ func run(addr string) error {
}
}()
log.Println("subscribing")
rt, err := p.Subscribe("clock/second")
rt, err := p.Subscribe("clock", "second")
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/date-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func run(addr string, wt bool, certFile, keyFile string) error {

func handler() moqtransport.PeerHandlerFunc {
return func(p *moqtransport.Peer) {
p.OnSubscription(func(trackname string, t *moqtransport.SendTrack) (uint64, time.Duration, error) {
if trackname != "clock/second" {
p.OnSubscription(func(namespace, trackname string, t *moqtransport.SendTrack) (uint64, time.Duration, error) {
if fmt.Sprintf("%v/%v", namespace, trackname) != "clock/second" {
return 0, 0, errors.New("unknown track ID")
}
go func() {
Expand Down
99 changes: 55 additions & 44 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"log"
"time"

"github.com/quic-go/quic-go/quicvarint"
Expand Down Expand Up @@ -97,51 +98,47 @@ type messageReader interface {
io.ByteReader
}

func readNext(reader messageReader) (message, error) {
func readNext(reader messageReader) (msg message, err error) {
mt, err := quicvarint.Read(reader)
if err != nil {
return nil, err
}
switch messageType(mt) {
case objectMessageLenType:
msg, err := parseObjectMessage(reader, mt)
return msg, err
msg, err = parseObjectMessage(reader, mt)
case objectMessageNoLenType:
msg, err := parseObjectMessage(reader, mt)
return msg, err
msg, err = parseObjectMessage(reader, mt)
case subscribeRequestMessageType:
srm, err := parseSubscribeMessage(reader)
return srm, err
msg, err = parseSubscribeRequestMessage(reader)
case subscribeOkMessageType:
som, err := parseSubscribeOkMessage(reader)
return som, err
msg, err = parseSubscribeOkMessage(reader)
case subscribeErrorMessageType:
sem, err := parseSubscribeErrorMessage(reader)
return sem, err
msg, err = parseSubscribeErrorMessage(reader)
case announceMessageType:
am, err := parseAnnounceMessage(reader)
return am, err
msg, err = parseAnnounceMessage(reader)
case announceOkMessageType:
aom, err := parseAnnounceOkMessage(reader)
return aom, err
msg, err = parseAnnounceOkMessage(reader)
case announceErrorMessageType:
return parseAnnounceErrorMessage(reader)
msg, err = parseAnnounceErrorMessage(reader)
case unannounceMessageType:
return parseUnannounceMessage(reader)
msg, err = parseUnannounceMessage(reader)
case unsubscribeMessageType:
return parseUnsubscribeMessage(reader)
msg, err = parseUnsubscribeMessage(reader)
case subscribeFinMessageType:
return parseSubscribeFinMessage(reader)
msg, err = parseSubscribeFinMessage(reader)
case subscribeRstMessageType:
return parseSubscribeRstMessage(reader)
msg, err = parseSubscribeRstMessage(reader)
case goAwayMessageType:
return parseGoAwayMessage(reader)
msg, err = parseGoAwayMessage(reader)
case clientSetupMessageType:
return parseClientSetupMessage(reader)
msg, err = parseClientSetupMessage(reader)
case serverSetupMessageType:
return parseServerSetupMessage(reader)
msg, err = parseServerSetupMessage(reader)
default:
return nil, errors.New("unknown message type")
}
return nil, errors.New("unknown message type")
log.Printf("parsed message: %v, err: %v", msg, err)
return
}

type objectMessage struct {
Expand Down Expand Up @@ -249,7 +246,10 @@ type clientSetupMessage struct {
}

func (m clientSetupMessage) String() string {
return clientSetupMessageType.String()
out := clientSetupMessageType.String()
out += fmt.Sprintf("\tSupportedVersions: %v\n", m.supportedVersions)
out += fmt.Sprintf("\tSetupParameters: %v\n", m.setupParameters)
return out
}

func (m *clientSetupMessage) append(buf []byte) []byte {
Expand Down Expand Up @@ -289,7 +289,10 @@ type serverSetupMessage struct {
}

func (m serverSetupMessage) String() string {
return serverSetupMessageType.String()
out := serverSetupMessageType.String()
out += fmt.Sprintf("\tSelectedVersions: %v\n", m.selectedVersion)
out += fmt.Sprintf("\tSetupParameters: %v\n", m.setupParameters)
return out
}

func (m *serverSetupMessage) append(buf []byte) []byte {
Expand Down Expand Up @@ -358,17 +361,19 @@ func parseLocation(r messageReader) (location, error) {
}

type subscribeRequestMessage struct {
fullTrackName string
startGroup location
startObject location
endGroup location
endObject location
parameters parameters
trackNamespace string
trackName string
startGroup location
startObject location
endGroup location
endObject location
parameters parameters
}

func (m subscribeRequestMessage) String() string {
out := subscribeRequestMessageType.String()
out += fmt.Sprintf("\tFullTrackName: %v\n", m.fullTrackName)
out += fmt.Sprintf("\tTrackNamespace: %v\n", m.trackNamespace)
out += fmt.Sprintf("\tTrackName: %v\n", m.trackName)
out += fmt.Sprintf("\tStartGroup: %v\n", m.startGroup)
out += fmt.Sprintf("\tStartObject: %v\n", m.startObject)
out += fmt.Sprintf("\tEndGroup: %v\n", m.endGroup)
Expand All @@ -380,13 +385,14 @@ func (m subscribeRequestMessage) String() string {
func (m subscribeRequestMessage) key() messageKey {
return messageKey{
mt: subscribeRequestMessageType,
id: m.fullTrackName,
id: fmt.Sprintf("%v/%v", m.trackNamespace, m.trackName),
}
}

func (m *subscribeRequestMessage) append(buf []byte) []byte {
buf = quicvarint.Append(buf, uint64(subscribeRequestMessageType))
buf = appendVarIntString(buf, m.fullTrackName)
buf = appendVarIntString(buf, m.trackNamespace)
buf = appendVarIntString(buf, m.trackName)
buf = m.startGroup.append(buf)
buf = m.startObject.append(buf)
buf = m.endGroup.append(buf)
Expand All @@ -398,10 +404,14 @@ func (m *subscribeRequestMessage) append(buf []byte) []byte {
return buf
}

func parseSubscribeMessage(r messageReader) (*subscribeRequestMessage, error) {
func parseSubscribeRequestMessage(r messageReader) (*subscribeRequestMessage, error) {
if r == nil {
return nil, errInvalidMessageReader
}
trackNamespace, err := parseVarIntString(r)
if err != nil {
return nil, err
}
fullTrackName, err := parseVarIntString(r)
if err != nil {
return nil, err
Expand All @@ -427,12 +437,13 @@ func parseSubscribeMessage(r messageReader) (*subscribeRequestMessage, error) {
return nil, err
}
return &subscribeRequestMessage{
fullTrackName: fullTrackName,
startGroup: startGroup,
startObject: startObject,
endGroup: endGroup,
endObject: endObject,
parameters: ps,
trackNamespace: trackNamespace,
trackName: fullTrackName,
startGroup: startGroup,
startObject: startObject,
endGroup: endGroup,
endObject: endObject,
parameters: ps,
}, nil
}

Expand All @@ -455,7 +466,7 @@ func (m subscribeOkMessage) String() string {
func (m subscribeOkMessage) key() messageKey {
return messageKey{
mt: subscribeRequestMessageType,
id: m.trackName,
id: fmt.Sprintf("%v/%v", m.trackNamespace, m.trackName),
}
}

Expand Down Expand Up @@ -515,7 +526,7 @@ func (m subscribeErrorMessage) String() string {
func (m subscribeErrorMessage) key() messageKey {
return messageKey{
mt: subscribeRequestMessageType,
id: m.trackName,
id: fmt.Sprintf("%v/%v", m.trackNamespace, m.trackName),
}
}

Expand Down
Loading

0 comments on commit 0e24d14

Please sign in to comment.