Skip to content

Commit

Permalink
[RSDK-8982, RSDK-9167, RSDK-8979] - Add SetStreamOptions to stream se…
Browse files Browse the repository at this point in the history
…rver (#4530)
  • Loading branch information
seanavery authored Nov 26, 2024
1 parent 07c109b commit 3225263
Show file tree
Hide file tree
Showing 5 changed files with 482 additions and 6 deletions.
14 changes: 10 additions & 4 deletions gostream/webrtc_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ type trackLocalStaticRTP struct {
mu sync.RWMutex
bindings []trackBinding
codec webrtc.RTPCodecCapability
sequencer rtp.Sequencer
id, rid, streamID string
}

// newtrackLocalStaticRTP returns a trackLocalStaticRTP.
func newtrackLocalStaticRTP(c webrtc.RTPCodecCapability, id, streamID string) *trackLocalStaticRTP {
return &trackLocalStaticRTP{
codec: c,
bindings: []trackBinding{},
id: id,
streamID: streamID,
codec: c,
bindings: []trackBinding{},
id: id,
sequencer: rtp.NewRandomSequencer(),
streamID: streamID,
}
}

Expand Down Expand Up @@ -126,6 +128,10 @@ func (s *trackLocalStaticRTP) WriteRTP(p *rtp.Packet) error {
for _, b := range s.bindings {
outboundPacket.Header.SSRC = uint32(b.ssrc)
outboundPacket.Header.PayloadType = uint8(b.payloadType)
// We overwrite the sequence number to ensure continuity between packets
// coming from Passthrough sources and those that are packetized by the
// Pion RTP Packetizer in the WriteData method.
outboundPacket.Header.SequenceNumber = s.sequencer.NextSequenceNumber()
if _, err := b.writeStream.WriteRTP(&outboundPacket.Header, outboundPacket.Payload); err != nil {
writeErrs = append(writeErrs, err)
}
Expand Down
104 changes: 104 additions & 0 deletions robot/web/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ const (
retryDelay = 50 * time.Millisecond
)

const (
optionsCommandResize = iota
optionsCommandReset
optionsCommandUnknown
)

type peerState struct {
streamState *state.StreamState
senders []*webrtc.RTPSender
Expand Down Expand Up @@ -372,6 +378,104 @@ func (server *Server) GetStreamOptions(
}, nil
}

// SetStreamOptions implements part of the StreamServiceServer. It sets the resolution of the stream
// to the given width and height.
func (server *Server) SetStreamOptions(
ctx context.Context,
req *streampb.SetStreamOptionsRequest,
) (*streampb.SetStreamOptionsResponse, error) {
cmd, err := validateSetStreamOptionsRequest(req)
if err != nil {
return nil, err
}
server.mu.Lock()
defer server.mu.Unlock()
switch cmd {
case optionsCommandResize:
err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height))
if err != nil {
return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err)
}
case optionsCommandReset:
err = server.resetVideoSource(req.Name)
if err != nil {
return nil, fmt.Errorf("failed to reset video source for stream %q: %w", req.Name, err)
}
default:
return nil, fmt.Errorf("unknown command type %v", cmd)
}
return &streampb.SetStreamOptionsResponse{}, nil
}

// validateSetStreamOptionsRequest validates the request to set the stream options.
func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int, error) {
if req.Name == "" {
return optionsCommandUnknown, errors.New("stream name is required in request")
}
if req.Resolution == nil {
return optionsCommandReset, nil
}
if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 {
return optionsCommandUnknown,
fmt.Errorf(
"invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0",
req.Name, req.Resolution.Width, req.Resolution.Height,
)
}
return optionsCommandResize, nil
}

// resizeVideoSource resizes the video source with the given name.
func (server *Server) resizeVideoSource(name string, width, height int) error {
existing, ok := server.videoSources[name]
if !ok {
return fmt.Errorf("video source %q not found", name)
}
cam, err := camera.FromRobot(server.robot, name)
if err != nil {
server.logger.Errorf("error getting camera %q from robot", name)
return err
}
streamState, ok := server.nameToStreamState[name]
if !ok {
return fmt.Errorf("stream state not found with name %q", name)
}
resizer := gostream.NewResizeVideoSource(cam, width, height)
server.logger.Debugf(
"resizing video source to width %d and height %d",
width, height,
)
existing.Swap(resizer)
err = streamState.Resize()
if err != nil {
return fmt.Errorf("failed to resize stream %q: %w", name, err)
}
return nil
}

// resetVideoSource resets the video source with the given name to the source resolution.
func (server *Server) resetVideoSource(name string) error {
existing, ok := server.videoSources[name]
if !ok {
return fmt.Errorf("video source %q not found", name)
}
cam, err := camera.FromRobot(server.robot, name)
if err != nil {
server.logger.Errorf("error getting camera %q from robot", name)
}
streamState, ok := server.nameToStreamState[name]
if !ok {
return fmt.Errorf("stream state not found with name %q", name)
}
server.logger.Debug("resetting video source")
existing.Swap(cam)
err = streamState.Reset()
if err != nil {
return fmt.Errorf("failed to reset stream %q: %w", name, err)
}
return nil
}

// AddNewStreams adds new video and audio streams to the server using the updated set of video and
// audio sources. It refreshes the sources, checks for a valid stream configuration, and starts
// the streams if applicable.
Expand Down
50 changes: 49 additions & 1 deletion robot/web/stream/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type StreamState struct {
streamSource streamSource
// streamSourceSub is only non nil if streamSource == streamSourcePassthrough
streamSourceSub rtppassthrough.Subscription
// isResized indicates whether the stream has been resized by the stream server.
// When set to true, it signals that the passthrough stream should not be restarted.
isResized bool
}

// New returns a new *StreamState.
Expand All @@ -61,6 +64,7 @@ func New(
robot: r,
msgChan: make(chan msg),
tickChan: make(chan struct{}),
isResized: false,
logger: logger,
}

Expand Down Expand Up @@ -90,6 +94,24 @@ func (state *StreamState) Decrement() error {
return state.send(msgTypeDecrement)
}

// Resize notifies that the gostream source has been resized. This will stop and prevent
// the use of the passthrough stream if it is supported.
func (state *StreamState) Resize() error {
if err := state.closedCtx.Err(); err != nil {
return multierr.Combine(ErrClosed, err)
}
return state.send(msgTypeResize)
}

// Reset notifies that the gostream source has been reset to the original resolution.
// This will restart the passthrough stream if it is supported.
func (state *StreamState) Reset() error {
if err := state.closedCtx.Err(); err != nil {
return multierr.Combine(ErrClosed, err)
}
return state.send(msgTypeReset)
}

// Close closes the StreamState.
func (state *StreamState) Close() error {
state.logger.Info("Closing streamState")
Expand Down Expand Up @@ -129,6 +151,8 @@ const (
msgTypeUnknown msgType = iota
msgTypeIncrement
msgTypeDecrement
msgTypeResize
msgTypeReset
)

func (mt msgType) String() string {
Expand All @@ -137,6 +161,10 @@ func (mt msgType) String() string {
return "Increment"
case msgTypeDecrement:
return "Decrement"
case msgTypeResize:
return "Resize"
case msgTypeReset:
return "Reset"
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -185,6 +213,14 @@ func (state *StreamState) sourceEventHandler() {
if state.activeClients == 0 {
state.tick()
}
case msgTypeResize:
state.logger.Debug("resize event received")
state.isResized = true
state.tick()
case msgTypeReset:
state.logger.Debug("reset event received")
state.isResized = false
state.tick()
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -247,6 +283,11 @@ func (state *StreamState) tick() {
// stop stream if there are no active clients
// noop if there is no stream source
state.stopInputStream()
// If streamSource is unknown and resized is true, we do not want to attempt passthrough.
case state.streamSource == streamSourceUnknown && state.isResized:
state.logger.Debug("in a resized state and stream source is unknown, defaulting to GoStream")
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourceUnknown: // && state.activeClients > 0
// this is the first subscription, attempt passthrough
state.logger.Info("attempting to subscribe to rtp_passthrough")
Expand All @@ -257,6 +298,13 @@ func (state *StreamState) tick() {
state.Stream.Start()
state.streamSource = streamSourceGoStream
}
// If we are currently using passthrough, and the stream state changes to resized
// we need to stop the passthrough stream and restart it through gostream.
case state.streamSource == streamSourcePassthrough && state.isResized:
state.logger.Info("stream resized, stopping passthrough stream")
state.stopInputStream()
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourcePassthrough && state.streamSourceSub.Terminated.Err() != nil:
// restart stream if there we were using passthrough but the sub is terminated
state.logger.Info("previous subscription terminated attempting to subscribe to rtp_passthrough")
Expand All @@ -271,7 +319,7 @@ func (state *StreamState) tick() {
case state.streamSource == streamSourcePassthrough:
// no op if we are using passthrough & are healthy
state.logger.Debug("still healthy and using h264 passthrough")
case state.streamSource == streamSourceGoStream:
case state.streamSource == streamSourceGoStream && !state.isResized:
// Try to upgrade to passthrough if we are using gostream. We leave logs these as debugs as
// we expect some components to not implement rtp passthrough.
state.logger.Debugw("currently using gostream, trying upgrade to rtp_passthrough")
Expand Down
Loading

0 comments on commit 3225263

Please sign in to comment.