From 494c82ac0fdc709df1edb9011926a7a3acfcf8b4 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 4 Mar 2021 14:03:01 +0800 Subject: [PATCH] Refine the regression test tool, add missing files --- trunk/3rdparty/srs-bench/Makefile | 2 +- trunk/3rdparty/srs-bench/README.md | 4 +- trunk/3rdparty/srs-bench/srs/player.go | 262 +++++++++++++ trunk/3rdparty/srs-bench/srs/publisher.go | 429 +++++++++++++++++++++ trunk/3rdparty/srs-bench/srs/rtc_test.go | 449 ++++++++++++++++++++++ trunk/3rdparty/srs-bench/srs/stat.go | 47 +++ trunk/3rdparty/srs-bench/srs/util.go | 142 +++++++ trunk/conf/regression-test.conf | 1 + 8 files changed, 1333 insertions(+), 3 deletions(-) create mode 100644 trunk/3rdparty/srs-bench/srs/player.go create mode 100644 trunk/3rdparty/srs-bench/srs/publisher.go create mode 100644 trunk/3rdparty/srs-bench/srs/rtc_test.go create mode 100644 trunk/3rdparty/srs-bench/srs/stat.go create mode 100644 trunk/3rdparty/srs-bench/srs/util.go diff --git a/trunk/3rdparty/srs-bench/Makefile b/trunk/3rdparty/srs-bench/Makefile index ea7341c847b..f4ee3f9d014 100644 --- a/trunk/3rdparty/srs-bench/Makefile +++ b/trunk/3rdparty/srs-bench/Makefile @@ -17,7 +17,7 @@ bench: ./objs/srs_bench test: ./objs/srs_test ./objs/srs_test: .format.txt *.go rtc/*.go srs/*.go Makefile - go test ./srs -c -o ./objs/srs_test + go test ./srs -mod=vendor -c -o ./objs/srs_test help: @echo "Usage: make [bench|test]" diff --git a/trunk/3rdparty/srs-bench/README.md b/trunk/3rdparty/srs-bench/README.md index c8ccb3cb11d..061976383b4 100644 --- a/trunk/3rdparty/srs-bench/README.md +++ b/trunk/3rdparty/srs-bench/README.md @@ -113,7 +113,7 @@ fi 然后运行回归测试用例,如果只跑一次,可以直接运行: ```bash -go test ./srs -v +go test ./srs -mod=vendor -v ``` 也可以用make编译出重复使用的二进制: @@ -125,7 +125,7 @@ make test && ./objs/srs_test -test.v 可以给回归测试传参数,这样可以测试不同的序列,比如: ```bash -go test ./srs -v -srs-server=127.0.0.1 +go test ./srs -mod=vendor -v -srs-server=127.0.0.1 # Or make test && ./objs/srs_test -test.v -srs-server=127.0.0.1 ``` diff --git a/trunk/3rdparty/srs-bench/srs/player.go b/trunk/3rdparty/srs-bench/srs/player.go new file mode 100644 index 00000000000..8977248e6e1 --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/player.go @@ -0,0 +1,262 @@ +package srs + +import ( + "context" + "fmt" + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + "github.com/pion/interceptor" + "github.com/pion/rtcp" + "github.com/pion/sdp/v3" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/pion/webrtc/v3/pkg/media/h264writer" + "github.com/pion/webrtc/v3/pkg/media/ivfwriter" + "github.com/pion/webrtc/v3/pkg/media/oggwriter" + "strings" + "sync" + "time" +) + +// @see https://github.com/pion/webrtc/blob/master/examples/save-to-disk/main.go +func StartPlay(ctx context.Context, r, dumpAudio, dumpVideo string, enableAudioLevel, enableTWCC bool, pli int) error { + ctx = logger.WithContext(ctx) + + logger.Tf(ctx, "Start play url=%v, audio=%v, video=%v, audio-level=%v, twcc=%v", + r, dumpAudio, dumpVideo, enableAudioLevel, enableTWCC) + + // For audio-level. + webrtcNewPeerConnection := func(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) { + m := &webrtc.MediaEngine{} + if err := m.RegisterDefaultCodecs(); err != nil { + return nil, err + } + + for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.TransportCCURI} { + if extension == sdp.TransportCCURI && !enableTWCC { + continue + } + if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeVideo); err != nil { + return nil, err + } + } + + // https://github.com/pion/ion/issues/130 + // https://github.com/pion/ion-sfu/pull/373/files#diff-6f42c5ac6f8192dd03e5a17e9d109e90cb76b1a4a7973be6ce44a89ffd1b5d18R73 + for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.AudioLevelURI} { + if extension == sdp.AudioLevelURI && !enableAudioLevel { + continue + } + if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeAudio); err != nil { + return nil, err + } + } + + i := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { + return nil, err + } + + api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)) + return api.NewPeerConnection(configuration) + } + + pc, err := webrtcNewPeerConnection(webrtc.Configuration{}) + if err != nil { + return errors.Wrapf(err, "Create PC") + } + defer pc.Close() + + pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) + pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) + + offer, err := pc.CreateOffer(nil) + if err != nil { + return errors.Wrapf(err, "Create Offer") + } + + if err := pc.SetLocalDescription(offer); err != nil { + return errors.Wrapf(err, "Set offer %v", offer) + } + + answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP) + if err != nil { + return errors.Wrapf(err, "Api request offer=%v", offer.SDP) + } + + if err := pc.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answer, + }); err != nil { + return errors.Wrapf(err, "Set answer %v", answer) + } + + var da media.Writer + var dv_vp8 media.Writer + var dv_h264 media.Writer + defer func() { + if da != nil { + da.Close() + } + if dv_vp8 != nil { + dv_vp8.Close() + } + if dv_h264 != nil { + dv_h264.Close() + } + }() + + handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error { + // Send a PLI on an interval so that the publisher is pushing a keyframe + go func() { + if track.Kind() == webrtc.RTPCodecTypeAudio { + return + } + + if pli <= 0 { + return + } + + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(pli) * time.Second): + _ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + MediaSSRC: uint32(track.SSRC()), + }}) + } + } + }() + + codec := track.Codec() + + trackDesc := fmt.Sprintf("channels=%v", codec.Channels) + if track.Kind() == webrtc.RTPCodecTypeVideo { + trackDesc = fmt.Sprintf("fmtp=%v", codec.SDPFmtpLine) + } + if headers := receiver.GetParameters().HeaderExtensions; len(headers) > 0 { + trackDesc = fmt.Sprintf("%v, header=%v", trackDesc, headers) + } + logger.Tf(ctx, "Got track %v, pt=%v, tbn=%v, %v", + codec.MimeType, codec.PayloadType, codec.ClockRate, trackDesc) + + if codec.MimeType == "audio/opus" { + if da == nil && dumpAudio != "" { + if da, err = oggwriter.New(dumpAudio, codec.ClockRate, codec.Channels); err != nil { + return errors.Wrapf(err, "New audio dumper") + } + logger.Tf(ctx, "Open ogg writer file=%v, tbn=%v, channels=%v", + dumpAudio, codec.ClockRate, codec.Channels) + } + + if err = writeTrackToDisk(ctx, da, track); err != nil { + return errors.Wrapf(err, "Write audio disk") + } + } else if codec.MimeType == "video/VP8" { + if dumpVideo != "" && !strings.HasSuffix(dumpVideo, ".ivf") { + return errors.Errorf("%v should be .ivf for VP8", dumpVideo) + } + + if dv_vp8 == nil && dumpVideo != "" { + if dv_vp8, err = ivfwriter.New(dumpVideo); err != nil { + return errors.Wrapf(err, "New video dumper") + } + logger.Tf(ctx, "Open ivf writer file=%v", dumpVideo) + } + + if err = writeTrackToDisk(ctx, dv_vp8, track); err != nil { + return errors.Wrapf(err, "Write video disk") + } + } else if codec.MimeType == "video/H264" { + if dumpVideo != "" && !strings.HasSuffix(dumpVideo, ".h264") { + return errors.Errorf("%v should be .h264 for H264", dumpVideo) + } + + if dv_h264 == nil && dumpVideo != "" { + if dv_h264, err = h264writer.New(dumpVideo); err != nil { + return errors.Wrapf(err, "New video dumper") + } + logger.Tf(ctx, "Open h264 writer file=%v", dumpVideo) + } + + if err = writeTrackToDisk(ctx, dv_h264, track); err != nil { + return errors.Wrapf(err, "Write video disk") + } + } else { + logger.Wf(ctx, "Ignore track %v pt=%v", codec.MimeType, codec.PayloadType) + } + return nil + } + + ctx, cancel := context.WithCancel(ctx) + pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + err = handleTrack(ctx, track, receiver) + if err != nil { + codec := track.Codec() + err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType) + cancel() + } + }) + + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + logger.If(ctx, "ICE state %v", state) + + if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed { + if ctx.Err() != nil { + return + } + + logger.Wf(ctx, "Close for ICE state %v", state) + cancel() + } + }) + + // Wait for event from context or tracks. + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): + StatRTC.PeerConnection = pc.GetStats() + } + } + }() + + wg.Wait() + return err +} + +func writeTrackToDisk(ctx context.Context, w media.Writer, track *webrtc.TrackRemote) error { + for ctx.Err() == nil { + pkt, _, err := track.ReadRTP() + if err != nil { + if ctx.Err() != nil { + return nil + } + return errors.Wrapf(err, "Read RTP") + } + + if w == nil { + continue + } + + if err := w.WriteRTP(pkt); err != nil { + if len(pkt.Payload) <= 2 { + continue + } + logger.Wf(ctx, "Ignore write RTP %vB err %+v", len(pkt.Payload), err) + } + } + + return ctx.Err() +} diff --git a/trunk/3rdparty/srs-bench/srs/publisher.go b/trunk/3rdparty/srs-bench/srs/publisher.go new file mode 100644 index 00000000000..172ff9e21f0 --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/publisher.go @@ -0,0 +1,429 @@ +package srs + +import ( + "context" + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + "github.com/ossrs/srs-bench/rtc" + "github.com/pion/interceptor" + "github.com/pion/rtp" + "github.com/pion/sdp/v3" + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/pion/webrtc/v3/pkg/media/h264reader" + "github.com/pion/webrtc/v3/pkg/media/oggreader" + "io" + "os" + "strings" + "sync" + "time" +) + +// @see https://github.com/pion/webrtc/blob/master/examples/play-from-disk/main.go +func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC bool) error { + ctx = logger.WithContext(ctx) + + logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v", + r, sourceAudio, sourceVideo, fps, enableAudioLevel, enableTWCC) + + // For audio-level. + webrtcNewPeerConnection := func(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) { + m := &webrtc.MediaEngine{} + if err := m.RegisterDefaultCodecs(); err != nil { + return nil, err + } + + for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.TransportCCURI} { + if extension == sdp.TransportCCURI && !enableTWCC { + continue + } + if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeVideo); err != nil { + return nil, err + } + } + + // https://github.com/pion/ion/issues/130 + // https://github.com/pion/ion-sfu/pull/373/files#diff-6f42c5ac6f8192dd03e5a17e9d109e90cb76b1a4a7973be6ce44a89ffd1b5d18R73 + for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.AudioLevelURI} { + if extension == sdp.AudioLevelURI && !enableAudioLevel { + continue + } + if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeAudio); err != nil { + return nil, err + } + } + + i := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { + return nil, err + } + + api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)) + return api.NewPeerConnection(configuration) + } + + pc, err := webrtcNewPeerConnection(webrtc.Configuration{}) + if err != nil { + return errors.Wrapf(err, "Create PC") + } + defer pc.Close() + + var sVideoTrack *rtc.TrackLocalStaticSample + var sVideoSender *webrtc.RTPSender + if sourceVideo != "" { + mimeType, trackID := "video/H264", "video" + if strings.HasSuffix(sourceVideo, ".ivf") { + mimeType = "video/VP8" + } + + sVideoTrack, err = rtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 90000}, trackID, "pion", + ) + if err != nil { + return errors.Wrapf(err, "Create video track") + } + + sVideoSender, err = pc.AddTrack(sVideoTrack) + if err != nil { + return errors.Wrapf(err, "Add video track") + } + sVideoSender.Stop() + } + + var sAudioTrack *rtc.TrackLocalStaticSample + var sAudioSender *webrtc.RTPSender + if sourceAudio != "" { + mimeType, trackID := "audio/opus", "audio" + sAudioTrack, err = rtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 48000, Channels: 2}, trackID, "pion", + ) + if err != nil { + return errors.Wrapf(err, "Create audio track") + } + + sAudioSender, err = pc.AddTrack(sAudioTrack) + if err != nil { + return errors.Wrapf(err, "Add audio track") + } + defer sAudioSender.Stop() + } + + offer, err := pc.CreateOffer(nil) + if err != nil { + return errors.Wrapf(err, "Create Offer") + } + + if err := pc.SetLocalDescription(offer); err != nil { + return errors.Wrapf(err, "Set offer %v", offer) + } + + answer, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP) + if err != nil { + return errors.Wrapf(err, "Api request offer=%v", offer.SDP) + } + + if err := pc.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answer, + }); err != nil { + return errors.Wrapf(err, "Set answer %v", answer) + } + + logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState()) + + // ICE state management. + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + logger.Tf(ctx, "ICE state %v", state) + }) + + pc.OnSignalingStateChange(func(state webrtc.SignalingState) { + logger.Tf(ctx, "Signaling state %v", state) + }) + + sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) { + logger.Tf(ctx, "DTLS state %v", state) + }) + + ctx, cancel := context.WithCancel(ctx) + pcDone, pcDoneCancel := context.WithCancel(context.Background()) + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + logger.Tf(ctx, "PC state %v", state) + + if state == webrtc.PeerConnectionStateConnected { + pcDoneCancel() + } + + if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed { + if ctx.Err() != nil { + return + } + + logger.Wf(ctx, "Close for PC state %v", state) + cancel() + } + }) + + // Wait for event from context or tracks. + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + if sAudioSender == nil { + return + } + + select { + case <-ctx.Done(): + case <-pcDone.Done(): + logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read audio packets") + } + + buf := make([]byte, 1500) + for ctx.Err() == nil { + if _, _, err := sAudioSender.Read(buf); err != nil { + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + if sAudioTrack == nil { + return + } + + select { + case <-ctx.Done(): + case <-pcDone.Done(): + logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest audio %v", sourceAudio) + } + + for ctx.Err() == nil { + if err := readAudioTrackFromDisk(ctx, sourceAudio, sAudioSender, sAudioTrack); err != nil { + if errors.Cause(err) == io.EOF { + logger.Tf(ctx, "EOF, restart ingest audio %v", sourceAudio) + continue + } + logger.Wf(ctx, "Ignore audio err %+v", err) + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + if sVideoSender == nil { + return + } + + select { + case <-ctx.Done(): + case <-pcDone.Done(): + logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read video packets") + } + + buf := make([]byte, 1500) + for ctx.Err() == nil { + if _, _, err := sVideoSender.Read(buf); err != nil { + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + if sVideoTrack == nil { + return + } + + select { + case <-ctx.Done(): + case <-pcDone.Done(): + logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo) + } + + for ctx.Err() == nil { + if err := readVideoTrackFromDisk(ctx, sourceVideo, sVideoSender, fps, sVideoTrack); err != nil { + if errors.Cause(err) == io.EOF { + logger.Tf(ctx, "EOF, restart ingest video %v", sourceVideo) + continue + } + logger.Wf(ctx, "Ignore video err %+v", err) + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): + StatRTC.PeerConnection = pc.GetStats() + } + } + }() + + wg.Wait() + return nil +} + +func readAudioTrackFromDisk(ctx context.Context, source string, sender *webrtc.RTPSender, track *rtc.TrackLocalStaticSample) error { + f, err := os.Open(source) + if err != nil { + return errors.Wrapf(err, "Open file %v", source) + } + defer f.Close() + + ogg, _, err := oggreader.NewWith(f) + if err != nil { + return errors.Wrapf(err, "Open ogg %v", source) + } + + enc := sender.GetParameters().Encodings[0] + codec := sender.GetParameters().Codecs[0] + headers := sender.GetParameters().HeaderExtensions + logger.Tf(ctx, "Audio %v, tbn=%v, channels=%v, ssrc=%v, pt=%v, header=%v", + codec.MimeType, codec.ClockRate, codec.Channels, enc.SSRC, codec.PayloadType, headers) + + // Whether should encode the audio-level in RTP header. + var audioLevel *webrtc.RTPHeaderExtensionParameter + for _, h := range headers { + if h.URI == sdp.AudioLevelURI { + audioLevel = &h + } + } + + clock := newWallClock() + var lastGranule uint64 + + for ctx.Err() == nil { + pageData, pageHeader, err := ogg.ParseNextPage() + if err == io.EOF { + return nil + } + if err != nil { + return errors.Wrapf(err, "Read ogg") + } + + // The amount of samples is the difference between the last and current timestamp + sampleCount := uint64(pageHeader.GranulePosition - lastGranule) + lastGranule = pageHeader.GranulePosition + sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate)) + + // For audio-level, set the extensions if negotiated. + track.OnBeforeWritePacket = func(p *rtp.Packet) { + if audioLevel != nil { + if b, err := new(rtp.AudioLevelExtension).Marshal(); err == nil { + p.SetExtension(uint8(audioLevel.ID), b) + } + } + } + + if err = track.WriteSample(media.Sample{Data: pageData, Duration: sampleDuration}); err != nil { + return errors.Wrapf(err, "Write sample") + } + + if d := clock.Tick(sampleDuration); d > 0 { + time.Sleep(d) + } + } + + return nil +} + +func readVideoTrackFromDisk(ctx context.Context, source string, sender *webrtc.RTPSender, fps int, track *rtc.TrackLocalStaticSample) error { + f, err := os.Open(source) + if err != nil { + return errors.Wrapf(err, "Open file %v", source) + } + defer f.Close() + + // TODO: FIXME: Support ivf for vp8. + h264, err := h264reader.NewReader(f) + if err != nil { + return errors.Wrapf(err, "Open h264 %v", source) + } + + enc := sender.GetParameters().Encodings[0] + codec := sender.GetParameters().Codecs[0] + headers := sender.GetParameters().HeaderExtensions + logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v", + codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers) + + clock := newWallClock() + sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps)) + for ctx.Err() == nil { + var sps, pps *h264reader.NAL + var oFrames []*h264reader.NAL + for ctx.Err() == nil { + frame, err := h264.NextNAL() + if err == io.EOF { + return nil + } + if err != nil { + return errors.Wrapf(err, "Read h264") + } + + oFrames = append(oFrames, frame) + logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, RefIdc=%v, %v bytes", + frame.UnitType.String(), frame.PictureOrderCount, frame.ForbiddenZeroBit, frame.RefIdc, len(frame.Data)) + + if frame.UnitType == h264reader.NalUnitTypeSPS { + sps = frame + } else if frame.UnitType == h264reader.NalUnitTypePPS { + pps = frame + } else { + break + } + } + + var frames []*h264reader.NAL + // Package SPS/PPS to STAP-A + if sps != nil && pps != nil { + stapA := packageAsSTAPA(sps, pps) + frames = append(frames, stapA) + } + // Append other original frames. + for _, frame := range oFrames { + if frame.UnitType != h264reader.NalUnitTypeSPS && frame.UnitType != h264reader.NalUnitTypePPS { + frames = append(frames, frame) + } + } + + // Covert frames to sample(buffers). + for i, frame := range frames { + sample := media.Sample{Data: frame.Data, Duration: sampleDuration} + // Use the sample timestamp for frames. + if i != len(frames)-1 { + sample.Duration = 0 + } + + // For STAP-A, set marker to false, to make Chrome happy. + track.OnBeforeWritePacket = func(p *rtp.Packet) { + if i < len(frames)-1 { + p.Header.Marker = false + } + } + + if err = track.WriteSample(sample); err != nil { + return errors.Wrapf(err, "Write sample") + } + } + + if d := clock.Tick(sampleDuration); d > 0 { + time.Sleep(d) + } + } + + return nil +} diff --git a/trunk/3rdparty/srs-bench/srs/rtc_test.go b/trunk/3rdparty/srs-bench/srs/rtc_test.go new file mode 100644 index 00000000000..da7d89949bb --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/rtc_test.go @@ -0,0 +1,449 @@ +package srs + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + "github.com/ossrs/srs-bench/rtc" + "github.com/pion/rtcp" + "github.com/pion/webrtc/v3" + "io" + "io/ioutil" + "net/http" + "os" + "strings" + "sync" + "testing" + "time" +) + +var srsSchema = "http" +var srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API") +var srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to") +var srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play") +var srsLog = flag.Bool("srs-log", false, "Whether enable the detail log") +var srsTimeout = flag.Int("srs-timeout", 3000, "For each case, the timeout in ms") +var srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.") +var srsPlayOKPackets = flag.Int("srs-play-ok-packets", 10, "If got N packets, it's ok, or fail") +var srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.") +var srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.") +var srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.") + +func TestMain(m *testing.M) { + // Should parse it first. + flag.Parse() + + // The stream should starts with /, for example, /rtc/regression + if strings.HasPrefix(*srsStream, "/") { + *srsStream = "/" + *srsStream + } + + // Generate srs protocol from whether use HTTPS. + if *srsHttps { + srsSchema = "https" + } + + // Disable the logger during all tests. + logger.Tf(nil, "sys log %v", *srsLog) + + if *srsLog == false { + olw := logger.Switch(ioutil.Discard) + defer func() { + logger.Switch(olw) + }() + } + + // Run tests. + os.Exit(m.Run()) +} + +func TestRTCServerVersion(t *testing.T) { + api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer) + req, err := http.NewRequest("POST", api, nil) + if err != nil { + t.Errorf("Request %v", api) + return + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + t.Errorf("Do request %v", api) + return + } + + b, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("Read body of %v", api) + return + } + + obj := struct { + Code int `json:"code"` + Server string `json:"server"` + Data struct { + Major int `json:"major"` + Minor int `json:"minor"` + Revision int `json:"revision"` + Version string `json:"version"` + } `json:"data"` + }{} + if err := json.Unmarshal(b, &obj); err != nil { + t.Errorf("Parse %v", string(b)) + return + } + if obj.Code != 0 { + t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server) + return + } + if obj.Data.Major == 0 && obj.Data.Minor == 0 { + t.Errorf("Invalid version %v", obj.Data) + return + } +} + +func TestRTCServerPublishPlay(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithCancel(ctx) + + r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) + publishReady, publishReadyCancel := context.WithCancel(context.Background()) + + startPlay := func(ctx context.Context) error { + logger.Tf(ctx, "Start play url=%v", r) + + pc, err := webrtc.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return errors.Wrapf(err, "Create PC") + } + defer pc.Close() + + pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) + pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) + + offer, err := pc.CreateOffer(nil) + if err != nil { + return errors.Wrapf(err, "Create Offer") + } + + if err := pc.SetLocalDescription(offer); err != nil { + return errors.Wrapf(err, "Set offer %v", offer) + } + + answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP) + if err != nil { + return errors.Wrapf(err, "Api request offer=%v", offer.SDP) + } + + if err := pc.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answer, + }); err != nil { + return errors.Wrapf(err, "Set answer %v", answer) + } + + handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error { + // Send a PLI on an interval so that the publisher is pushing a keyframe + go func() { + if track.Kind() == webrtc.RTPCodecTypeAudio { + return + } + + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(*srsPlayPLI) * time.Millisecond): + _ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + MediaSSRC: uint32(track.SSRC()), + }}) + } + } + }() + + // Try to read packets of track. + for i := 0; i < *srsPlayOKPackets && ctx.Err() == nil; i++ { + _, _, err := track.ReadRTP() + if err != nil { + return errors.Wrapf(err, "Read RTP") + } + } + + // Completed. + cancel() + + return nil + } + + pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + err = handleTrack(ctx, track, receiver) + if err != nil { + codec := track.Codec() + err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType) + cancel() + } + }) + + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed { + err = errors.Errorf("Close for ICE state %v", state) + cancel() + } + }) + + <-ctx.Done() + return err + } + + startPublish := func(ctx context.Context) error { + sourceVideo := *srsPublishVideo + sourceAudio := *srsPublishAudio + fps := *srsPublishVideoFps + + logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v", + r, sourceAudio, sourceVideo, fps) + + pc, err := webrtc.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return errors.Wrapf(err, "Create PC") + } + defer pc.Close() + + var sVideoTrack *rtc.TrackLocalStaticSample + var sVideoSender *webrtc.RTPSender + if sourceVideo != "" { + mimeType, trackID := "video/H264", "video" + if strings.HasSuffix(sourceVideo, ".ivf") { + mimeType = "video/VP8" + } + + sVideoTrack, err = rtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 90000}, trackID, "pion", + ) + if err != nil { + return errors.Wrapf(err, "Create video track") + } + + sVideoSender, err = pc.AddTrack(sVideoTrack) + if err != nil { + return errors.Wrapf(err, "Add video track") + } + sVideoSender.Stop() + } + + var sAudioTrack *rtc.TrackLocalStaticSample + var sAudioSender *webrtc.RTPSender + if sourceAudio != "" { + mimeType, trackID := "audio/opus", "audio" + sAudioTrack, err = rtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 48000, Channels: 2}, trackID, "pion", + ) + if err != nil { + return errors.Wrapf(err, "Create audio track") + } + + sAudioSender, err = pc.AddTrack(sAudioTrack) + if err != nil { + return errors.Wrapf(err, "Add audio track") + } + defer sAudioSender.Stop() + } + + offer, err := pc.CreateOffer(nil) + if err != nil { + return errors.Wrapf(err, "Create Offer") + } + + if err := pc.SetLocalDescription(offer); err != nil { + return errors.Wrapf(err, "Set offer %v", offer) + } + + answer, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP) + if err != nil { + return errors.Wrapf(err, "Api request offer=%v", offer.SDP) + } + + if err := pc.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answer, + }); err != nil { + return errors.Wrapf(err, "Set answer %v", answer) + } + + logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState()) + + ctx, cancel := context.WithCancel(ctx) + pcDone, pcDoneCancel := context.WithCancel(context.Background()) + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + logger.Tf(ctx, "PC state %v", state) + + if state == webrtc.PeerConnectionStateConnected { + pcDoneCancel() + publishReadyCancel() + } + + if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed { + err = errors.Errorf("Close for PC state %v", state) + cancel() + } + }) + + // Wait for event from context or tracks. + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + if sAudioSender == nil { + return + } + + select { + case <-ctx.Done(): + case <-pcDone.Done(): + } + + buf := make([]byte, 1500) + for ctx.Err() == nil { + if _, _, err := sAudioSender.Read(buf); err != nil { + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + if sAudioTrack == nil { + return + } + + select { + case <-ctx.Done(): + case <-pcDone.Done(): + } + + for ctx.Err() == nil { + if err := readAudioTrackFromDisk(ctx, sourceAudio, sAudioSender, sAudioTrack); err != nil { + if errors.Cause(err) == io.EOF { + logger.Tf(ctx, "EOF, restart ingest audio %v", sourceAudio) + continue + } + logger.Wf(ctx, "Ignore audio err %+v", err) + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + if sVideoSender == nil { + return + } + + select { + case <-ctx.Done(): + case <-pcDone.Done(): + logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read video packets") + } + + buf := make([]byte, 1500) + for ctx.Err() == nil { + if _, _, err := sVideoSender.Read(buf); err != nil { + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + if sVideoTrack == nil { + return + } + + select { + case <-ctx.Done(): + case <-pcDone.Done(): + logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo) + } + + for ctx.Err() == nil { + if err := readVideoTrackFromDisk(ctx, sourceVideo, sVideoSender, fps, sVideoTrack); err != nil { + if errors.Cause(err) == io.EOF { + logger.Tf(ctx, "EOF, restart ingest video %v", sourceVideo) + continue + } + logger.Wf(ctx, "Ignore video err %+v", err) + } + } + }() + + wg.Wait() + return err + } + + var wg sync.WaitGroup + errs := make(chan error, 0) + + wg.Add(1) + go func() { + defer wg.Done() + + // Wait for publisher to start first. + select { + case <-ctx.Done(): + return + case <-publishReady.Done(): + } + + errs <- startPlay(logger.WithContext(ctx)) + cancel() + }() + + wg.Add(1) + go func() { + defer wg.Done() + + errs <- startPublish(logger.WithContext(ctx)) + cancel() + }() + + wg.Add(1) + go func() { + defer wg.Done() + + select { + case <-ctx.Done(): + case <-time.After(time.Duration(*srsTimeout) * time.Millisecond): + errs <- errors.Errorf("timeout for %vms", *srsTimeout) + cancel() + } + }() + + testDone, testDoneCancel := context.WithCancel(context.Background()) + go func() { + wg.Wait() + testDoneCancel() + }() + + // Handle errs, the test result. + for { + select { + case <-testDone.Done(): + return + case err := <-errs: + if err != nil && err != context.Canceled && !t.Failed() { + t.Errorf("err %+v", err) + } + } + } +} diff --git a/trunk/3rdparty/srs-bench/srs/stat.go b/trunk/3rdparty/srs-bench/srs/stat.go new file mode 100644 index 00000000000..35cbe3737c1 --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/stat.go @@ -0,0 +1,47 @@ +package srs + +import ( + "context" + "encoding/json" + "github.com/ossrs/go-oryx-lib/logger" + "net/http" + "strings" +) + +type statRTC struct { + Publishers struct { + Expect int `json:"expect"` + Alive int `json:"alive"` + } `json:"publishers"` + Subscribers struct { + Expect int `json:"expect"` + Alive int `json:"alive"` + } `json:"subscribers"` + PeerConnection interface{} `json:"random-pc"` +} + +var StatRTC statRTC + +func HandleStat(ctx context.Context, mux *http.ServeMux, l string) { + if strings.HasPrefix(l, ":") { + l = "127.0.0.1" + l + } + + logger.Tf(ctx, "Handle http://%v/api/v1/sb/rtc", l) + mux.HandleFunc("/api/v1/sb/rtc", func(w http.ResponseWriter, r *http.Request) { + res := &struct { + Code int `json:"code"` + Data interface{} `json:"data"` + }{ + 0, &StatRTC, + } + + b, err := json.Marshal(res) + if err != nil { + logger.Wf(ctx, "marshal %v err %+v", res, err) + return + } + + w.Write(b) + }) +} diff --git a/trunk/3rdparty/srs-bench/srs/util.go b/trunk/3rdparty/srs-bench/srs/util.go new file mode 100644 index 00000000000..4e6b2f783a6 --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/util.go @@ -0,0 +1,142 @@ +package srs + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + "github.com/pion/webrtc/v3/pkg/media/h264reader" + "io/ioutil" + "net/http" + "net/url" + "strings" + "time" +) + +func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error) { + u, err := url.Parse(r) + if err != nil { + return "", errors.Wrapf(err, "Parse url %v", r) + } + + // Build api url. + host := u.Host + if !strings.Contains(host, ":") { + host += ":1985" + } + + api := fmt.Sprintf("http://%v", host) + if !strings.HasPrefix(apiPath, "/") { + api += "/" + } + api += apiPath + + if !strings.HasSuffix(apiPath, "/") { + api += "/" + } + if u.RawQuery != "" { + api += "?" + u.RawQuery + } + + // Build JSON body. + reqBody := struct { + Api string `json:"api"` + ClientIP string `json:"clientip"` + SDP string `json:"sdp"` + StreamURL string `json:"streamurl"` + }{ + api, "", offer, r, + } + + b, err := json.Marshal(reqBody) + if err != nil { + return "", errors.Wrapf(err, "Marshal body %v", reqBody) + } + logger.If(ctx, "Request url api=%v with %v", api, string(b)) + logger.Tf(ctx, "Request url api=%v with %v bytes", api, len(b)) + + req, err := http.NewRequest("POST", api, strings.NewReader(string(b))) + if err != nil { + return "", errors.Wrapf(err, "HTTP request %v", string(b)) + } + + res, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return "", errors.Wrapf(err, "Do HTTP request %v", string(b)) + } + + b2, err := ioutil.ReadAll(res.Body) + if err != nil { + return "", errors.Wrapf(err, "Read response for %v", string(b)) + } + logger.If(ctx, "Response from %v is %v", api, string(b2)) + logger.Tf(ctx, "Response from %v is %v bytes", api, len(b2)) + + resBody := struct { + Code int `json:"code"` + Session string `json:"sessionid"` + SDP string `json:"sdp"` + }{} + if err := json.Unmarshal(b2, &resBody); err != nil { + return "", errors.Wrapf(err, "Marshal %v", string(b2)) + } + + if resBody.Code != 0 { + return "", errors.Errorf("Server fail code=%v %v", resBody.Code, string(b2)) + } + logger.If(ctx, "Parse response to code=%v, session=%v, sdp=%v", + resBody.Code, resBody.Session, escapeSDP(resBody.SDP)) + logger.Tf(ctx, "Parse response to code=%v, session=%v, sdp=%v bytes", + resBody.Code, resBody.Session, len(resBody.SDP)) + + return string(resBody.SDP), nil +} + +func escapeSDP(sdp string) string { + return strings.ReplaceAll(strings.ReplaceAll(sdp, "\r", "\\r"), "\n", "\\n") +} + +func packageAsSTAPA(frames ...*h264reader.NAL) *h264reader.NAL { + first := frames[0] + + buf := bytes.Buffer{} + buf.WriteByte( + byte(first.RefIdc<<5)&0x60 | byte(24), // STAP-A + ) + + for _, frame := range frames { + buf.WriteByte(byte(len(frame.Data) >> 8)) + buf.WriteByte(byte(len(frame.Data))) + buf.Write(frame.Data) + } + + return &h264reader.NAL{ + PictureOrderCount: first.PictureOrderCount, + ForbiddenZeroBit: false, + RefIdc: first.RefIdc, + UnitType: h264reader.NalUnitType(24), // STAP-A + Data: buf.Bytes(), + } +} + +type wallClock struct { + start time.Time + duration time.Duration +} + +func newWallClock() *wallClock { + return &wallClock{start: time.Now()} +} + +func (v *wallClock) Tick(d time.Duration) time.Duration { + v.duration += d + + wc := time.Now().Sub(v.start) + re := v.duration - wc + if re > 30*time.Millisecond { + return re + } + return 0 +} diff --git a/trunk/conf/regression-test.conf b/trunk/conf/regression-test.conf index fcb348f12e0..bd732952e27 100644 --- a/trunk/conf/regression-test.conf +++ b/trunk/conf/regression-test.conf @@ -2,6 +2,7 @@ listen 1935; max_connections 1000; daemon on; +disable_daemon_for_docker off; srs_log_tank file; http_server {