Skip to content

Commit

Permalink
Merge pull request #9 from LopanovCo/debug
Browse files Browse the repository at this point in the history
Mutexes workaround
  • Loading branch information
LdDl authored Sep 5, 2024
2 parents 392826a + 5627a4c commit 26e9f40
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 21 deletions.
2 changes: 1 addition & 1 deletion example_client/hls_example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
},
"dependencies": {
"core-js": "^3.6.4",
"hls.js": "^0.14.3",
"hls.js": "^1.5.15",
"vue": "^2.6.11"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion example_client/hls_example/src/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<div id="app" >
<p>HLS player</p>
<div>
<HLSPlayer schema="http" server="localhost" :port="8090" suuid="a1b8e7cb-09e6-4f3a-b857-30c7b272a744" :verbose="false" />
<HLSPlayer schema="http" server="localhost" :port="8090" suuid="74f17aa1-dd57-4c77-96de-b71668a9a25a" :verbose="false" />
</div>
</div>
</template>
Expand Down
2 changes: 1 addition & 1 deletion example_client/mse_example/src/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<div id="app" >
<p>MSE player</p>
<div>
<MSEPlayer schema="ws" server="localhost" :port="8090" suuid="a1b8e7cb-09e6-4f3a-b857-30c7b272a744" :verbose="false" />
<MSEPlayer schema="ws" server="localhost" :port="8090" suuid="74f17aa1-dd57-4c77-96de-b71668a9a25a" :verbose="false" />
</div>
</div>
</template>
Expand Down
17 changes: 16 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/deepch/vdk/format/rtspv2"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

const (
Expand All @@ -17,6 +18,7 @@ const (

// runStream runs RTSP grabbing process
func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled bool) error {
log.Info().Str("scope", "streaming").Str("event", "stream_dialing").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Msg("Trying to dial")
session, err := rtspv2.Dial(rtspv2.RTSPClientOptions{
URL: url,
DisableAudio: true,
Expand All @@ -28,8 +30,10 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled boo
return errors.Wrapf(err, "Can't connect to stream '%s'", url)
}
defer session.Close()
if session.CodecData != nil {
if len(session.CodecData) != 0 {
log.Info().Str("scope", "streaming").Str("event", "stream_codec_met").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Any("codec_data", session.CodecData).Msg("Found codec. Adding this one")
app.addCodec(streamID, session.CodecData)
log.Info().Str("scope", "streaming").Str("event", "stream_status_update").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Msg("Update stream status")
err = app.updateStreamStatus(streamID, true)
if err != nil {
return errors.Wrapf(err, "Can't update status for stream %s", streamID)
Expand All @@ -38,51 +42,62 @@ func (app *Application) runStream(streamID uuid.UUID, url string, hlsEnabled boo
isAudioOnly := false
if len(session.CodecData) == 1 {
if session.CodecData[0].Type().IsAudio() {
log.Info().Str("scope", "streaming").Str("event", "stream_audio_met").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("hls_enabled", hlsEnabled).Msg("Only audio")
isAudioOnly = true
}
}

var stopHlsCast chan bool
var stopMP4Cast chan bool
if hlsEnabled {
log.Info().Str("scope", "streaming").Str("event", "stream_hls_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Need to start casting for HLS")
stopHlsCast = make(chan bool, 1)
app.startHlsCast(streamID, stopHlsCast)
}
archive := app.getStreamArchive(streamID)
if archive != nil {
log.Info().Str("scope", "streaming").Str("event", "stream_mp4_req").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Need to start casting to MP4 archive")
stopMP4Cast = make(chan bool, 1)
app.startMP4Cast(streamID, stopMP4Cast)
}
pingStream := time.NewTimer(pingDuration)
for {
select {
case <-pingStream.C:
log.Error().Err(ErrStreamHasNoVideo).Str("scope", "streaming").Str("event", "stream_exit_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Stream has no video")
return errors.Wrapf(ErrStreamHasNoVideo, "URL is '%s'", url)
case signals := <-session.Signals:
switch signals {
case rtspv2.SignalCodecUpdate:
log.Info().Str("scope", "streaming").Str("event", "stream_codec_update_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Any("codec_data", session.CodecData).Msg("Recieved update codec signal")
app.addCodec(streamID, session.CodecData)
err = app.updateStreamStatus(streamID, true)
if err != nil {
return errors.Wrapf(err, "Can't update status for stream %s", streamID)
}
case rtspv2.SignalStreamRTPStop:
log.Info().Str("scope", "streaming").Str("event", "stream_stop_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Recieved stop signal")
err = app.updateStreamStatus(streamID, false)
if err != nil {
errors.Wrapf(err, "Can't switch status to False for stream '%s'", url)
}
return errors.Wrapf(ErrStreamDisconnected, "URL is '%s'", url)
}
case packetAV := <-session.OutgoingPacketQueue:
// log.Info().Str("scope", "streaming").Str("event", "stream_packet_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Recieved outgoing packet from queue")
if isAudioOnly || packetAV.IsKeyFrame {
log.Info().Str("scope", "streaming").Str("event", "stream_packet_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("only_audio", isAudioOnly).Bool("is_keyframe", packetAV.IsKeyFrame).Msg("Need to reset ping for stream")
pingStream.Reset(pingDurationRestart)
}
// log.Info().Str("scope", "streaming").Str("event", "stream_packet_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("only_audio", isAudioOnly).Bool("is_keyframe", packetAV.IsKeyFrame).Msg("Casting packet")
err = app.cast(streamID, *packetAV, hlsEnabled)
if err != nil {
if hlsEnabled {
log.Info().Str("scope", "streaming").Str("event", "stream_packet_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("only_audio", isAudioOnly).Bool("is_keyframe", packetAV.IsKeyFrame).Msg("Need to stop HLS cast")
stopHlsCast <- true
}
if archive != nil {
log.Info().Str("scope", "streaming").Str("event", "stream_packet_signal").Str("stream_id", streamID.String()).Str("stream_url", url).Bool("only_audio", isAudioOnly).Bool("is_keyframe", packetAV.IsKeyFrame).Msg("Need to stop MP4 cast")
stopMP4Cast <- true
}
errStatus := app.updateStreamStatus(streamID, false)
Expand Down
5 changes: 3 additions & 2 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ const (
// StartStreams starts all video streams
func (app *Application) StartStreams() {
streamsIDs := app.Streams.getKeys()
for _, k := range streamsIDs {
app.StartStream(k)
for i := range streamsIDs {
app.StartStream(streamsIDs[i])
}
}

Expand All @@ -35,6 +35,7 @@ func (app *Application) RunStream(ctx context.Context, k uuid.UUID) {
func (app *Application) startLoop(ctx context.Context, streamID uuid.UUID, url string, hlsEnabled bool) {
select {
case <-ctx.Done():
log.Info().Str("scope", "streaming").Str("event", "stream_done").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Stream is done")
return
default:
log.Info().Str("scope", "streaming").Str("event", "stream_start").Str("stream_id", streamID.String()).Str("stream_url", url).Msg("Stream must be establishment")
Expand Down
14 changes: 8 additions & 6 deletions streams_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func (streams *StreamsStorage) streamExists(streamID uuid.UUID) bool {
}

func (streams *StreamsStorage) existsWithType(streamID uuid.UUID, streamType StreamType) bool {
streams.RLock()
streams.Lock()
defer streams.Unlock()
curStream, ok := streams.Streams[streamID]
if !ok {
return false
}
supportedTypes := curStream.SupportedOutputTypes
typeEnabled := typeExists(streamType, supportedTypes)
streams.RUnlock()
return ok && typeEnabled
}

Expand All @@ -65,6 +65,8 @@ func (streams *StreamsStorage) addCodec(streamID uuid.UUID, codecs []av.CodecDat
}

func (streams *StreamsStorage) getCodec(streamID uuid.UUID) ([]av.CodecData, error) {
streams.Lock()
defer streams.Unlock()
curStream, ok := streams.Streams[streamID]
if !ok {
return nil, ErrStreamNotFound
Expand All @@ -83,18 +85,19 @@ func (streams *StreamsStorage) getCodec(streamID uuid.UUID) ([]av.CodecData, err

func (streams *StreamsStorage) updateStreamStatus(streamID uuid.UUID, status bool) error {
streams.Lock()
defer streams.Unlock()
curStream, ok := streams.Streams[streamID]
if !ok {
return ErrStreamNotFound
}
curStream.Status = status
streams.Streams[streamID] = curStream
streams.Unlock()
return nil
}

func (streams *StreamsStorage) addClient(streamID uuid.UUID) (uuid.UUID, chan av.Packet, error) {
streams.Lock()
defer streams.Unlock()
curStream, ok := streams.Streams[streamID]
if !ok {
return uuid.UUID{}, nil, ErrStreamNotFound
Expand All @@ -105,7 +108,6 @@ func (streams *StreamsStorage) addClient(streamID uuid.UUID) (uuid.UUID, chan av
}
ch := make(chan av.Packet, 100)
curStream.Clients[clientID] = viewer{c: ch}
streams.Unlock()
return clientID, ch, nil
}

Expand All @@ -117,14 +119,15 @@ func (streams *StreamsStorage) deleteClient(streamID, clientID uuid.UUID) {

func (streams *StreamsStorage) cast(streamID uuid.UUID, pck av.Packet, hlsEnabled bool) error {
streams.Lock()
defer streams.Unlock()
curStream, ok := streams.Streams[streamID]
if !ok {
return ErrStreamNotFound
}
if hlsEnabled {
curStream.hlsChanel <- pck
}
archive := streams.getArchiveStream(streamID)
archive := curStream.archive
if archive != nil {
curStream.mp4Chanel <- pck
}
Expand All @@ -133,7 +136,6 @@ func (streams *StreamsStorage) cast(streamID uuid.UUID, pck av.Packet, hlsEnable
v.c <- pck
}
}
streams.Unlock()
return nil
}

Expand Down
39 changes: 39 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package videoserver

import (
"reflect"
"sync"
"sync/atomic"
)

// https://github.com/trailofbits/go-mutexasserts/blob/master/mutex.go#L15
const mutexLocked = 1

func RWMutexLocked(rw *sync.RWMutex) bool {
// RWMutex has a "w" sync.Mutex field for write lock
state := reflect.ValueOf(rw).Elem().FieldByName("w").FieldByName("state")
return state.Int()&mutexLocked == mutexLocked
}

func MutexLocked(m *sync.Mutex) bool {
state := reflect.ValueOf(m).Elem().FieldByName("state")
return state.Int()&mutexLocked == mutexLocked
}

func RWMutexRLocked(rw *sync.RWMutex) bool {
return readerCount(rw) > 0
}

// Starting in go1.20, readerCount is an atomic int32 value.
// See: https://go-review.googlesource.com/c/go/+/429767
func readerCount(rw *sync.RWMutex) int64 {
// Look up the address of the readerCount field and use it to create a pointer to an atomic.Int32,
// then load the value to return.
rc := (*atomic.Int32)(reflect.ValueOf(rw).Elem().FieldByName("readerCount").Addr().UnsafePointer())
return int64(rc.Load())
}

// Prior to go1.20, readerCount was an int value.
// func readerCount(rw *sync.RWMutex) int64 {
// return reflect.ValueOf(rw).Elem().FieldByName("readerCount").Int()
// }
Loading

0 comments on commit 26e9f40

Please sign in to comment.