diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 56cb97eaa0c..6ebc07e0bbc 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -18,6 +18,7 @@ on: [push, pull_request]
# multiple-arch-amd64(2m)
# utest(3m)
# coverage(3m)
+# blackbox(3m)
jobs:
cygwin64-cache:
@@ -182,6 +183,23 @@ jobs:
cd 3rdparty/srs-bench && ./objs/srs_test -test.v && ./objs/srs_gb28181_test -test.v'
runs-on: ubuntu-20.04
+ blackbox:
+ name: blackbox
+ needs:
+ - fast
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v3
+ # Tests
+ - name: Build test image
+ run: docker build --tag srs:test --build-arg MAKEARGS='-j2' -f trunk/Dockerfile.test .
+ # For blackbox-test
+ - name: Run SRS blackbox-test
+ run: |
+ docker run --rm -w /srs/trunk/3rdparty/srs-bench srs:test \
+ ./objs/srs_blackbox_test -test.v -test.parallel 64
+ runs-on: ubuntu-20.04
+
coverage:
name: coverage
needs:
@@ -292,6 +310,7 @@ jobs:
needs:
- cygwin64
- coverage
+ - blackbox
- utest
- build-centos7
- build-ubuntu16
diff --git a/trunk/3rdparty/srs-bench/Makefile b/trunk/3rdparty/srs-bench/Makefile
index 842fb83c989..f7446c14993 100644
--- a/trunk/3rdparty/srs-bench/Makefile
+++ b/trunk/3rdparty/srs-bench/Makefile
@@ -3,27 +3,56 @@
default: bench test
clean:
- rm -f ./objs/srs_bench ./objs/srs_test ./objs/srs_gb28181_test
+ rm -rf ./objs
-.format.txt: *.go srs/*.go vnet/*.go janus/*.go gb28181/*.go
+#########################################################################################################
+# SRS benchmark tool for SRS, janus, GB28181.
+./objs/.format.bench.txt: *.go srs/*.go vnet/*.go janus/*.go gb28181/*.go
gofmt -w .
- echo "done" > .format.txt
+ echo "done" > ./objs/.format.bench.txt
bench: ./objs/srs_bench
-./objs/srs_bench: .format.txt *.go srs/*.go vnet/*.go janus/*.go gb28181/*.go Makefile
+./objs/srs_bench: ./objs/.format.bench.txt *.go srs/*.go vnet/*.go janus/*.go gb28181/*.go Makefile
go build -mod=vendor -o objs/srs_bench .
-test: ./objs/srs_test ./objs/srs_gb28181_test
+#########################################################################################################
+# For all regression tests.
+test: ./objs/srs_test ./objs/srs_gb28181_test ./objs/srs_blackbox_test
-./objs/srs_test: .format.txt *.go srs/*.go vnet/*.go Makefile
+#########################################################################################################
+# For SRS regression test.
+./objs/.format.srs.txt: srs/*.go vnet/*.go
+ gofmt -w srs vnet
+ echo "done" > ./objs/.format.srs.txt
+
+./objs/srs_test: ./objs/.format.srs.txt *.go srs/*.go vnet/*.go Makefile
go test ./srs -mod=vendor -c -o ./objs/srs_test
-./objs/srs_gb28181_test: .format.txt *.go gb28181/*.go Makefile
+#########################################################################################################
+# For gb28181 test.
+./objs/.format.gb28181.txt: gb28181/*.go
+ gofmt -w gb28181
+ echo "done" > ./objs/.format.gb28181.txt
+
+./objs/srs_gb28181_test: ./objs/.format.gb28181.txt *.go gb28181/*.go Makefile
go test ./gb28181 -mod=vendor -c -o ./objs/srs_gb28181_test
+#########################################################################################################
+# For blackbox test.
+./objs/.format.blackbox.txt: blackbox/*.go
+ gofmt -w blackbox
+ echo "done" > ./objs/.format.blackbox.txt
+
+./objs/srs_blackbox_test: ./objs/.format.blackbox.txt *.go blackbox/*.go Makefile
+ go test ./blackbox -mod=vendor -c -o ./objs/srs_blackbox_test
+
+#########################################################################################################
+# Help menu.
help:
- @echo "Usage: make [bench|test]"
+ @echo "Usage: make [default|bench|test|clean]"
+ @echo " default The default entry for make is bench+test"
@echo " bench Make the bench to ./objs/srs_bench"
- @echo " test Make the test tool to ./objs/srs_test and ./objs/srs_gb28181_test"
+ @echo " test Make the test tool to ./objs/srs_test and ./objs/srs_gb28181_test ./objs/srs_blackbox_test"
+ @echo " clean Remove all tools at ./objs"
diff --git a/trunk/3rdparty/srs-bench/README.md b/trunk/3rdparty/srs-bench/README.md
index d4c741d232c..bd9a9d6e109 100644
--- a/trunk/3rdparty/srs-bench/README.md
+++ b/trunk/3rdparty/srs-bench/README.md
@@ -1,17 +1,29 @@
# srs-bench
-WebRTC benchmark on [pion/webrtc](https://github.com/pion/webrtc) for [SRS](https://github.com/ossrs/srs).
+SB(SRS Bench) is a set of benchmark and regression test tools, for SRS and other media servers, supports HTTP-FLV, RTMP,
+HLS, WebRTC and GB28181.
+
+For RTMP/HLS/FLV benchmark, please use branch [master](https://github.com/ossrs/srs-bench/tree/master).
## Usage
-编译和使用:
+下载代码和编译:
```bash
git clone -b feature/rtc https://github.com/ossrs/srs-bench.git &&
-cd srs-bench && make && ./objs/srs_bench -h
+cd srs-bench && make
```
-编译和启动SRS:
+编译会生成下面的工具:
+
+* `./objs/srs_bench` 压测,模拟大量客户端的负载测试,支持SRS、GB28181和Janus三种场景。
+* `./objs/srs_test` 回归测试(SRS),SRS服务器的回归测试。
+* `./objs/srs_gb28181_test` 回归测试(GB28181),GB服务器的回归测试。
+* `./objs/srs_blackbox_test` 黑盒测试(SRS),SRS服务器的黑盒测试,也可以换成其他媒体服务器。
+
+> Note: 查看工具的全部参数请执行`./objs/xx -h`
+
+有些场景,若需要编译和启动SRS:
```bash
git clone https://github.com/ossrs/srs.git &&
@@ -19,7 +31,7 @@ cd srs/trunk && ./configure && make &&
./objs/srs -c conf/console.conf
```
-请按下面的操作启动测试。
+具体场景,请按下面的操作启动测试。
## Player for Live
@@ -71,7 +83,8 @@ ffmpeg -re -i doc/source.200kbps.768x320.flv -c copy -f flv -y rtmp://localhost/
> 备注:URL的变量格式参考Go的`fmt.Sprintf`,比如可以用`webrtc://localhost/live/livestream_%03d`。
-## DVR
+
+## DVR for Benchmark
录制场景,主要是把内容录制下来后,可分析,也可以用于推流。
@@ -173,6 +186,86 @@ make && ./objs/srs_test -test.v -srs-log -test.run TestRtcBasic_PublishPlay
* `-srs-play-pli`,播放时,PLI的间隔,毫秒。默认值:`5000`,即5秒。
* `-srs-dtls-drop-packets`,DTLS丢包测试,丢了多少个包算成功,默认值:`5`
+> Note: 查看全部参数请执行`./objs/srs_test -h`
+
+
+## GB28181 Test
+
+支持GB28181的压测,使用选项`-sfu gb28181`可以查看帮助:
+
+```bash
+make && ./objs/srs_bench -sfu gb28181 --help
+```
+
+运行回归测试用例,更多命令请参考[Regression Test](#regression-test):
+
+```bash
+go test ./gb28181 -mod=vendor -v -count=1
+```
+
+也可以用make编译出重复使用的二进制:
+
+```bash
+make && ./objs/srs_gb28181_test -test.v
+```
+
+支持的参数如下:
+
+* `-srs-sip`,SIP服务器地址。默认值:`tcp://127.0.0.1:5060`
+* `-srs-stream`,GB的user,即流名称,一般会加上随机的后缀。默认值:`3402000000`
+* `-srs-timeout`,每个Case的超时时间,毫秒。默认值:`11000`,即11秒。
+* `-srs-publish-audio`,推流时,使用的音频文件。默认值:`avatar.aac`
+* `-srs-publish-video`,推流时,使用的视频文件。默认值:`avatar.h264`
+* `-srs-publish-video-fps`,推流时,视频文件的FPS。默认值:`25`
+
+其他不常用参数:
+
+* `-srs-log`,是否开启详细日志。默认值:`false`
+
+> Note: 查看全部参数请执行`./objs/srs_gb28181_test -h`
+
+## Blackbox Test
+
+使用FFmpeg作为客户端,对流媒体服务器SRS进行黑盒压测,完全黑盒的回归测试。
+
+运行回归测试用例,如果只跑一次,可以直接运行:
+
+```bash
+go test ./blackbox -mod=vendor -v -count=1
+```
+
+也可以用make编译出重复使用的二进制:
+
+```bash
+make && ./objs/srs_blackbox_test -test.v
+```
+
+支持的参数如下:
+
+* `-srs-binary`,每个测试用例都需要启动一个SRS服务,因此需要设置SRS的位置。默认值:`../../objs/srs`
+* `-srs-ffmpeg`,FFmpeg工具的位置,用来推流和录制。默认值:`ffmpeg`
+* `-srs-ffprobe`,ffprobe工具的位置,用来分析流的信息。默认值:`ffprobe`
+* `-srs-timeout`,每个Case的超时时间,毫秒。默认值:`64000`,即64秒。
+* `-srs-publish-avatar`,测试源文件路径。默认值:`avatar.flv`。
+* `-srs-ffprobe-duration`,每个Case的探测时间,毫秒。默认值:`16000`,即16秒。
+* `-srs-ffprobe-timeout`,每个Case的探测超时时间,毫秒。默认值:`21000`,即21秒。
+
+其他不常用参数:
+
+* `-srs-log`,是否开启详细日志。默认值:`false`
+* `-srs-stdout`,是否开启SRS的stdout详细日志。默认值:`false`
+* `-srs-ffmpeg-stderr`,是否开启FFmpeg的stderr详细日志。默认值:`false`
+* `-srs-dvr-stderr`,是否开启DVR的stderr详细日志。默认值:`false`
+* `-srs-ffprobe-stdout`,是否开启FFprobe的stdout详细日志。默认值:`false`
+
+由于每个黑盒的用例时间都很长,可以开启并行:
+
+```bash
+./objs/srs_blackbox_test -test.v -test.parallel 8
+```
+
+> Note: 查看全部参数请执行`./objs/srs_blackbox_test -h`
+
## GCOVR
本机生成覆盖率时,我们使用工具[gcovr](https://gcovr.com/en/stable/guide.html)。
@@ -227,37 +320,4 @@ make -j10 && ./objs/srs_bench -sfu janus \
-nn 5
```
-## GB28181
-
-支持GB28181的压测,使用选项`-sfu gb28181`可以查看帮助:
-
-```bash
-make && ./objs/srs_bench -sfu gb28181 --help
-```
-
-运行回归测试用例,更多命令请参考[Regression Test](#regression-test):
-
-```bash
-go test ./gb28181 -mod=vendor -v -count=1
-```
-
-也可以用make编译出重复使用的二进制:
-
-```bash
-make && ./objs/srs_gb28181_test -test.v
-```
-
-支持的参数如下:
-
-* `-srs-sip`,SIP服务器地址。默认值:`tcp://127.0.0.1:5060`
-* `-srs-stream`,GB的user,即流名称,一般会加上随机的后缀。默认值:`3402000000`
-* `-srs-timeout`,每个Case的超时时间,毫秒。默认值:`11000`,即11秒。
-* `-srs-publish-audio`,推流时,使用的音频文件。默认值:`avatar.aac`
-* `-srs-publish-video`,推流时,使用的视频文件。默认值:`avatar.h264`
-* `-srs-publish-video-fps`,推流时,视频文件的FPS。默认值:`25`
-
-其他不常用参数:
-
-* `-srs-log`,是否开启详细日志。默认值:`false`
-
2021.01, Winlin
diff --git a/trunk/3rdparty/srs-bench/blackbox/blackbox.go b/trunk/3rdparty/srs-bench/blackbox/blackbox.go
new file mode 100644
index 00000000000..38deb621834
--- /dev/null
+++ b/trunk/3rdparty/srs-bench/blackbox/blackbox.go
@@ -0,0 +1,21 @@
+// The MIT License (MIT)
+//
+// # Copyright (c) 2023 Winlin
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy of
+// this software and associated documentation files (the "Software"), to deal in
+// the Software without restriction, including without limitation the rights to
+// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software is furnished to do so,
+// subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+package blackbox
diff --git a/trunk/3rdparty/srs-bench/blackbox/blackbox_test.go b/trunk/3rdparty/srs-bench/blackbox/blackbox_test.go
new file mode 100644
index 00000000000..fa3e7d1b070
--- /dev/null
+++ b/trunk/3rdparty/srs-bench/blackbox/blackbox_test.go
@@ -0,0 +1,50 @@
+// The MIT License (MIT)
+//
+// # Copyright (c) 2023 Winlin
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy of
+// this software and associated documentation files (the "Software"), to deal in
+// the Software without restriction, including without limitation the rights to
+// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software is furnished to do so,
+// subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+package blackbox
+
+import (
+ "github.com/ossrs/go-oryx-lib/logger"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "testing"
+ "time"
+)
+
+func TestMain(m *testing.M) {
+ if err := prepareTest(); err != nil {
+ logger.Ef(nil, "Prepare test fail, err %+v", err)
+ os.Exit(-1)
+ }
+
+ // Disable the logger during all tests.
+ if *srsLog == false {
+ olw := logger.Switch(ioutil.Discard)
+ defer func() {
+ logger.Switch(olw)
+ }()
+ }
+
+ // Init rand seed.
+ rand.Seed(time.Now().UnixNano())
+
+ os.Exit(m.Run())
+}
diff --git a/trunk/3rdparty/srs-bench/blackbox/rtmp_test.go b/trunk/3rdparty/srs-bench/blackbox/rtmp_test.go
new file mode 100644
index 00000000000..12cbdc9e608
--- /dev/null
+++ b/trunk/3rdparty/srs-bench/blackbox/rtmp_test.go
@@ -0,0 +1,186 @@
+// The MIT License (MIT)
+//
+// # Copyright (c) 2023 Winlin
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy of
+// this software and associated documentation files (the "Software"), to deal in
+// the Software without restriction, including without limitation the rights to
+// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software is furnished to do so,
+// subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+package blackbox
+
+import (
+ "context"
+ "fmt"
+ "github.com/ossrs/go-oryx-lib/errors"
+ "github.com/ossrs/go-oryx-lib/logger"
+ "math/rand"
+ "os"
+ "path"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) {
+ // This case is run in parallel.
+ t.Parallel()
+
+ // Setup the max timeout for this case.
+ ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
+ defer cancel()
+
+ // Check a set of errors.
+ var r0, r1, r2, r3 error
+ defer func(ctx context.Context) {
+ if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil {
+ t.Errorf("Fail for err %+v", err)
+ } else {
+ logger.Tf(ctx, "test done with err %+v", err)
+ }
+ }(ctx)
+
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ // Start SRS server and wait for it to be ready.
+ svr := NewSRSServer(func(v *srsServer) {
+ v.envs = []string{
+ fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen),
+ }
+ })
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ r0 = svr.Run(ctx, cancel)
+ }()
+
+ // Start FFmpeg to publish stream.
+ streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
+ streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID)
+ ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
+ v.args = []string{
+ "-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL,
+ }
+ })
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-svr.ReadyCtx().Done()
+ r1 = ffmpeg.Run(ctx, cancel)
+ }()
+
+ // Start FFprobe to detect and verify stream.
+ duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
+ ffprobe := NewFFprobe(func(v *ffprobeClient) {
+ v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.mp4", streamID))
+ v.streamURL, v.duration, v.timeout = streamURL, duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
+ })
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-svr.ReadyCtx().Done()
+ r2 = ffprobe.Run(ctx, cancel)
+ }()
+
+ // Fast quit for probe done.
+ select {
+ case <-ctx.Done():
+ case <-ffprobe.ProbeDoneCtx().Done():
+ defer cancel()
+
+ str, m := ffprobe.Result()
+ if len(m.Streams) != 2 {
+ r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
+ }
+ }
+}
+
+func TestRtmpPublish_FlvPlay_Basic(t *testing.T) {
+ // This case is run in parallel.
+ t.Parallel()
+
+ // Setup the max timeout for this case.
+ ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
+ defer cancel()
+
+ // Check a set of errors.
+ var r0, r1, r2, r3 error
+ defer func(ctx context.Context) {
+ if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil {
+ t.Errorf("Fail for err %+v", err)
+ } else {
+ logger.Tf(ctx, "test done with err %+v", err)
+ }
+ }(ctx)
+
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ // Start SRS server and wait for it to be ready.
+ svr := NewSRSServer(func(v *srsServer) {
+ v.envs = []string{
+ fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen),
+ "SRS_HTTP_SERVER_ENABLED=on",
+ fmt.Sprintf("SRS_HTTP_SERVER_LISTEN=%v", v.httpListen),
+ "SRS_VHOST_HTTP_REMUX_ENABLED=on",
+ }
+ })
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ r0 = svr.Run(ctx, cancel)
+ }()
+
+ // Start FFmpeg to publish stream.
+ streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
+ streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID)
+ ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
+ v.args = []string{
+ "-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL,
+ }
+ })
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-svr.ReadyCtx().Done()
+ r1 = ffmpeg.Run(ctx, cancel)
+ }()
+
+ // Start FFprobe to detect and verify stream.
+ duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
+ ffprobe := NewFFprobe(func(v *ffprobeClient) {
+ v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.mp4", streamID))
+ v.streamURL = fmt.Sprintf("http://localhost:%v/live/%v.flv", svr.HTTPPort(), streamID)
+ v.duration, v.timeout = duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
+ })
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-svr.ReadyCtx().Done()
+ r2 = ffprobe.Run(ctx, cancel)
+ }()
+
+ // Fast quit for probe done.
+ select {
+ case <-ctx.Done():
+ case <-ffprobe.ProbeDoneCtx().Done():
+ defer cancel()
+
+ str, m := ffprobe.Result()
+ if len(m.Streams) != 2 {
+ r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
+ }
+ }
+}
diff --git a/trunk/3rdparty/srs-bench/blackbox/util.go b/trunk/3rdparty/srs-bench/blackbox/util.go
new file mode 100644
index 00000000000..b3c328ff3c1
--- /dev/null
+++ b/trunk/3rdparty/srs-bench/blackbox/util.go
@@ -0,0 +1,999 @@
+// The MIT License (MIT)
+//
+// # Copyright (c) 2023 Winlin
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy of
+// this software and associated documentation files (the "Software"), to deal in
+// the Software without restriction, including without limitation the rights to
+// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software is furnished to do so,
+// subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+package blackbox
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "github.com/ossrs/go-oryx-lib/errors"
+ "github.com/ossrs/go-oryx-lib/logger"
+ "io/ioutil"
+ "math/rand"
+ "net/http"
+ "net/url"
+ "os"
+ "os/exec"
+ "path"
+ "strconv"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+)
+
+var srsLog *bool
+var srsStdout *bool
+var srsFFmpegStderr *bool
+var srsDVRStderr *bool
+var srsFFprobeStdout *bool
+
+var srsTimeout *int
+var srsFFprobeDuration *int
+var srsFFprobeTimeout *int
+
+var srsBinary *string
+var srsFFmpeg *string
+var srsFFprobe *string
+
+var srsPublishAvatar *string
+
+func prepareTest() (err error) {
+ srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
+ srsStdout = flag.Bool("srs-stdout", false, "Whether enable the SRS stdout log")
+ srsFFmpegStderr = flag.Bool("srs-ffmpeg-stderr", false, "Whether enable the FFmpeg stderr log")
+ srsDVRStderr = flag.Bool("srs-dvr-stderr", false, "Whether enable the DVR stderr log")
+ srsFFprobeStdout = flag.Bool("srs-ffprobe-stdout", false, "Whether enable the FFprobe stdout log")
+ srsTimeout = flag.Int("srs-timeout", 64000, "For each case, the timeout in ms")
+ srsFFprobeDuration = flag.Int("srs-ffprobe-duration", 16000, "For each case, the duration for ffprobe in ms")
+ srsFFprobeTimeout = flag.Int("srs-ffprobe-timeout", 21000, "For each case, the timeout for ffprobe in ms")
+ srsBinary = flag.String("srs-binary", "../../objs/srs", "The binary to start SRS server")
+ srsFFmpeg = flag.String("srs-ffmpeg", "ffmpeg", "The FFmpeg tool")
+ srsFFprobe = flag.String("srs-ffprobe", "ffprobe", "The FFprobe tool")
+ srsPublishAvatar = flag.String("srs-publish-avatar", "avatar.flv", "The avatar file for publisher.")
+
+ // Parse user options.
+ flag.Parse()
+
+ // Try to locate file.
+ tryOpenFile := func(filename string) (string, error) {
+ // Match if file exists.
+ if _, err := os.Stat(filename); err == nil {
+ return filename, nil
+ }
+
+ // If we run in GoLand, the current directory is in blackbox, so we use parent directory.
+ nFilename := path.Join("../", filename)
+ if _, err := os.Stat(nFilename); err == nil {
+ return nFilename, nil
+ }
+
+ // Try to find file by which if it's a command like ffmpeg.
+ cmd := exec.Command("which", filename)
+ cmd.Env = []string{"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"}
+ if v, err := cmd.Output(); err == nil {
+ return strings.TrimSpace(string(v)), nil
+ }
+
+ return filename, errors.Errorf("file %v not found", filename)
+ }
+
+ // Check and relocate path of tools.
+ if *srsBinary, err = tryOpenFile(*srsBinary); err != nil {
+ return err
+ }
+ if *srsFFmpeg, err = tryOpenFile(*srsFFmpeg); err != nil {
+ return err
+ }
+ if *srsFFprobe, err = tryOpenFile(*srsFFprobe); err != nil {
+ return err
+ }
+ if *srsPublishAvatar, err = tryOpenFile(*srsPublishAvatar); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Filter the test error, ignore context.Canceled
+func filterTestError(errs ...error) error {
+ var filteredErrors []error
+
+ for _, err := range errs {
+ if err == nil || errors.Cause(err) == context.Canceled {
+ continue
+ }
+
+ // If url error, server maybe error, do not print the detail log.
+ if r0 := errors.Cause(err); r0 != nil {
+ if r1, ok := r0.(*url.Error); ok {
+ err = r1
+ }
+ }
+
+ filteredErrors = append(filteredErrors, err)
+ }
+
+ if len(filteredErrors) == 0 {
+ return nil
+ }
+ if len(filteredErrors) == 1 {
+ return filteredErrors[0]
+ }
+
+ var descs []string
+ for i, err := range filteredErrors[1:] {
+ descs = append(descs, fmt.Sprintf("err #%d, %+v", i, err))
+ }
+ return errors.Wrapf(filteredErrors[0], "with %v", strings.Join(descs, ","))
+}
+
+// The SRSPortAllocator is SRS port manager.
+type SRSPortAllocator struct {
+ ports sync.Map
+}
+
+func NewSRSPortAllocator() *SRSPortAllocator {
+ return &SRSPortAllocator{}
+}
+
+func (v *SRSPortAllocator) Allocate() int {
+ for i := 0; i < 1024; i++ {
+ port := 10000 + rand.Int()%50000
+ if _, ok := v.ports.LoadOrStore(port, true); !ok {
+ return port
+ }
+ }
+
+ panic("Allocate port failed")
+}
+
+func (v *SRSPortAllocator) Free(port int) {
+ v.ports.Delete(port)
+}
+
+var allocator *SRSPortAllocator
+
+func init() {
+ allocator = NewSRSPortAllocator()
+}
+
+type backendService struct {
+ // The context for case.
+ caseCtx context.Context
+ caseCtxCancel context.CancelFunc
+
+ // When SRS process started.
+ readyCtx context.Context
+ readyCtxCancel context.CancelFunc
+
+ // Whether already closed.
+ closedCtx context.Context
+ closedCtxCancel context.CancelFunc
+
+ // All goroutines
+ wg sync.WaitGroup
+
+ // The name, args and env for cmd.
+ name string
+ args []string
+ env []string
+
+ // The process stdout and stderr.
+ stdout bytes.Buffer
+ stderr bytes.Buffer
+ // The process error.
+ r0 error
+ // The process pid.
+ pid int
+
+ // Hooks for owner.
+ onBeforeStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
+ onAfterStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
+ onStop func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error
+ onDispose func(ctx context.Context, bs *backendService) error
+}
+
+func newBackendService(opts ...func(v *backendService)) *backendService {
+ v := &backendService{}
+
+ v.readyCtx, v.readyCtxCancel = context.WithCancel(context.Background())
+ v.closedCtx, v.closedCtxCancel = context.WithCancel(context.Background())
+
+ for _, opt := range opts {
+ opt(v)
+ }
+
+ return v
+}
+
+func (v *backendService) Close() error {
+ if v.closedCtx.Err() != nil {
+ return v.r0
+ }
+ v.closedCtxCancel()
+
+ if v.caseCtxCancel != nil {
+ v.caseCtxCancel()
+ }
+ if v.readyCtxCancel != nil {
+ v.readyCtxCancel()
+ }
+
+ v.wg.Wait()
+
+ if v.onDispose != nil {
+ v.onDispose(v.caseCtx, v)
+ }
+
+ logger.Tf(v.caseCtx, "Service is closed, pid=%v, r0=%v", v.pid, v.r0)
+ return nil
+}
+
+func (v *backendService) ReadyCtx() context.Context {
+ return v.readyCtx
+}
+
+func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) error {
+ // Start SRS with -e, which only use environment variables.
+ cmd := exec.Command(v.name, v.args...)
+
+ // If not started, we also need to callback the onStop.
+ var processStarted bool
+ defer func() {
+ if v.onStop != nil && !processStarted {
+ v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr)
+ }
+ }()
+
+ // Ignore if already error.
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+
+ // Save the context of case.
+ v.caseCtx, v.caseCtxCancel = ctx, cancel
+
+ // Setup stdout and stderr.
+ cmd.Stdout = &v.stdout
+ cmd.Stderr = &v.stderr
+ cmd.Env = v.env
+ if v.onBeforeStart != nil {
+ if err := v.onBeforeStart(ctx, v, cmd); err != nil {
+ return errors.Wrapf(err, "onBeforeStart failed")
+ }
+ }
+
+ // Try to start the SRS server.
+ if err := cmd.Start(); err != nil {
+ return err
+ }
+ defer v.Close()
+
+ // Now process started, query the pid.
+ v.pid = cmd.Process.Pid
+ v.readyCtxCancel()
+ processStarted = true
+ if v.onAfterStart != nil {
+ if err := v.onAfterStart(ctx, v, cmd); err != nil {
+ return errors.Wrapf(err, "onAfterStart failed")
+ }
+ }
+
+ // The context for SRS process.
+ processDone, processDoneCancel := context.WithCancel(context.Background())
+
+ // If SRS process terminated, notify case to stop.
+ v.wg.Add(1)
+ go func() {
+ defer v.wg.Done()
+
+ // When SRS quit, also terminate the case.
+ defer cancel()
+
+ // Notify other goroutine, SRS already done.
+ defer processDoneCancel()
+
+ v.r0 = cmd.Wait()
+ if v.onStop != nil {
+ if r1 := v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr); r1 != nil {
+ logger.Wf(ctx, "Process onStop err %v", r1)
+ if v.r0 == nil {
+ v.r0 = r1
+ }
+ }
+ }
+ }()
+
+ // If case terminated, notify SRS process to stop.
+ v.wg.Add(1)
+ go func() {
+ defer v.wg.Done()
+
+ select {
+ case <-ctx.Done():
+ // When case terminated, also terminate the SRS process.
+ cmd.Process.Signal(syscall.SIGINT)
+ case <-processDone.Done():
+ // Ignore if already done.
+ return
+ }
+
+ // Start a goroutine to ensure SRS killed.
+ go func() {
+ time.Sleep(3 * time.Second)
+ if processDone.Err() == nil { // Ignore if already done.
+ cmd.Process.Signal(syscall.SIGKILL)
+ }
+ }()
+ }()
+
+ // Wait for SRS or case done.
+ select {
+ case <-ctx.Done():
+ case <-processDone.Done():
+ }
+
+ return v.r0
+}
+
+// ServiceRunner is an interface to run backend service.
+type ServiceRunner interface {
+ Run(ctx context.Context, cancel context.CancelFunc) error
+}
+
+// ServiceReadyQuerier is an interface to detect whether service is ready.
+type ServiceReadyQuerier interface {
+ ReadyCtx() context.Context
+}
+
+// SRSServer is the interface for SRS server.
+type SRSServer interface {
+ ServiceRunner
+ ServiceReadyQuerier
+ // WorkDir is the current working directory for SRS.
+ WorkDir() string
+ // RTMPPort is the RTMP stream port.
+ RTMPPort() int
+ // HTTPPort is the HTTP stream port.
+ HTTPPort() int
+}
+
+// srsServer is a SRS server instance.
+type srsServer struct {
+ // The backend service process.
+ process *backendService
+
+ // When SRS process started.
+ readyCtx context.Context
+ readyCtxCancel context.CancelFunc
+
+ // SRS server ID.
+ srsID string
+ // SRS workdir.
+ workDir string
+ // SRS PID file, relative to the workdir.
+ srsRelativePidFile string
+ // SRS server ID cache file, relative to the workdir.
+ srsRelativeIDFile string
+
+ // SRS RTMP server listen port.
+ rtmpListen int
+ // HTTP API listen port.
+ apiListen int
+ // HTTP server listen port.
+ httpListen int
+
+ // The envs from user.
+ envs []string
+}
+
+func NewSRSServer(opts ...func(v *srsServer)) SRSServer {
+ rid := fmt.Sprintf("%v-%v", os.Getpid(), rand.Int())
+ v := &srsServer{
+ workDir: "./",
+ srsID: fmt.Sprintf("srs-id-%v", rid),
+ process: newBackendService(),
+ }
+ v.readyCtx, v.readyCtxCancel = context.WithCancel(context.Background())
+
+ // If we run in GoLand, the current directory is in blackbox, so we use parent directory.
+ if _, err := os.Stat("objs"); err != nil {
+ v.workDir = "../"
+ }
+
+ // Do allocate resource.
+ v.srsRelativePidFile = path.Join("objs", fmt.Sprintf("srs-%v.pid", rid))
+ v.srsRelativeIDFile = path.Join("objs", fmt.Sprintf("srs-%v.id", rid))
+ v.rtmpListen = allocator.Allocate()
+ v.apiListen = allocator.Allocate()
+ v.httpListen = allocator.Allocate()
+
+ // Do cleanup.
+ v.process.onDispose = func(ctx context.Context, bs *backendService) error {
+ allocator.Free(v.rtmpListen)
+ allocator.Free(v.apiListen)
+ allocator.Free(v.httpListen)
+
+ pidFile := path.Join(v.workDir, v.srsRelativePidFile)
+ if _, err := os.Stat(pidFile); !os.IsNotExist(err) {
+ os.Remove(pidFile)
+ }
+
+ idFile := path.Join(v.workDir, v.srsRelativeIDFile)
+ if _, err := os.Stat(idFile); !os.IsNotExist(err) {
+ os.Remove(idFile)
+ }
+
+ logger.Tf(ctx, "SRS server is closed, id=%v, pid=%v, r0=%v", v.srsID, bs.pid, bs.r0)
+ return nil
+ }
+
+ for _, opt := range opts {
+ opt(v)
+ }
+
+ return v
+}
+
+func (v *srsServer) ReadyCtx() context.Context {
+ return v.readyCtx
+}
+
+func (v *srsServer) RTMPPort() int {
+ return v.rtmpListen
+}
+
+func (v *srsServer) HTTPPort() int {
+ return v.httpListen
+}
+
+func (v *srsServer) WorkDir() string {
+ return v.workDir
+}
+
+func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error {
+ logger.Tf(ctx, "Starting SRS server, dir=%v, binary=%v, id=%v, pid=%v, rtmp=%v",
+ v.workDir, *srsBinary, v.srsID, v.srsRelativePidFile, v.rtmpListen,
+ )
+
+ // Setup the name and args of process.
+ v.process.name = *srsBinary
+ v.process.args = []string{"-e"}
+
+ // Setup the envrionment variables.
+ v.process.env = []string{
+ // SRS working directory.
+ fmt.Sprintf("SRS_WORK_DIR=%v", v.workDir),
+ // Run in frontend.
+ "SRS_DAEMON=off",
+ // Write logs to stdout and stderr.
+ "SRS_SRS_LOG_FILE=console",
+ // Disable warning for asan.
+ "MallocNanoZone=0",
+ // SRS PID file.
+ fmt.Sprintf("SRS_PID=%v", v.srsRelativePidFile),
+ // SRS ID file.
+ fmt.Sprintf("SRS_SERVER_ID=%v", v.srsID),
+ // HTTP API to detect the service.
+ fmt.Sprintf("SRS_HTTP_API_ENABLED=on"),
+ fmt.Sprintf("SRS_HTTP_API_LISTEN=%v", v.apiListen),
+ // Avoid error for macOS, which ulimit to 256.
+ "SRS_MAX_CONNECTIONS=100",
+ }
+ // Rewrite envs by case.
+ if v.envs != nil {
+ v.process.env = append(v.process.env, v.envs...)
+ }
+ // Allow user to rewrite them.
+ for _, env := range os.Environ() {
+ if strings.HasPrefix(env, "SRS") || strings.HasPrefix(env, "PATH") {
+ v.process.env = append(v.process.env, env)
+ }
+ }
+
+ // Wait for all goroutine to done.
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ // Start a task to detect the HTTP API.
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for ctx.Err() == nil {
+ time.Sleep(100 * time.Millisecond)
+
+ res, err := http.Get(fmt.Sprintf("http://localhost:%v/api/v1/versions", v.apiListen))
+ if err != nil {
+ continue
+ }
+ defer res.Body.Close()
+
+ b, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ continue
+ }
+
+ logger.Tf(ctx, "SRS API is ready, %v", string(b))
+ v.readyCtxCancel()
+ return
+ }
+ }()
+
+ // Hooks for process.
+ v.process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
+ logger.Tf(ctx, "SRS id=%v, env=%v, cmd is %v %v",
+ v.srsID, cmd.Env, bs.name, strings.Join(bs.args, " "))
+ return nil
+ }
+ v.process.onAfterStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
+ logger.Tf(ctx, "SRS id=%v, pid=%v", v.srsID, bs.pid)
+ return nil
+ }
+ v.process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
+ // Should be ready when process stop.
+ defer v.readyCtxCancel()
+
+ logger.Tf(ctx, "SRS process pid=%v exit, r0=%v", bs.pid, r0)
+ if *srsStdout == true {
+ logger.Tf(ctx, "SRS process pid=%v, stdout is \n%v", bs.pid, stdout.String())
+ }
+ if stderr.Len() > 0 {
+ logger.Tf(ctx, "SRS process pid=%v, stderr is \n%v", bs.pid, stderr.String())
+ }
+ return nil
+ }
+
+ // Run the process util quit.
+ return v.process.Run(ctx, cancel)
+}
+
+type FFmpegClient interface {
+ ServiceRunner
+ ServiceReadyQuerier
+}
+
+type ffmpegClient struct {
+ // The backend service process.
+ process *backendService
+
+ // FFmpeg cli args, without ffmpeg binary.
+ args []string
+}
+
+func NewFFmpeg(opts ...func(v *ffmpegClient)) FFmpegClient {
+ v := &ffmpegClient{
+ process: newBackendService(),
+ }
+
+ // Do cleanup.
+ v.process.onDispose = func(ctx context.Context, bs *backendService) error {
+ return nil
+ }
+
+ for _, opt := range opts {
+ opt(v)
+ }
+
+ return v
+}
+
+func (v *ffmpegClient) ReadyCtx() context.Context {
+ return v.process.ReadyCtx()
+}
+
+func (v *ffmpegClient) Run(ctx context.Context, cancel context.CancelFunc) error {
+ logger.Tf(ctx, "Starting FFmpeg by %v", strings.Join(v.args, " "))
+
+ v.process.name = *srsFFmpeg
+ v.process.args = v.args
+ v.process.env = os.Environ()
+
+ v.process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
+ logger.Tf(ctx, "FFmpeg process pid=%v exit, r0=%v, stdout=%v", bs.pid, r0, stdout.String())
+ if *srsFFmpegStderr && stderr.Len() > 0 {
+ logger.Tf(ctx, "FFmpeg process pid=%v, stderr is \n%v", bs.pid, stderr.String())
+ }
+ return nil
+ }
+
+ return v.process.Run(ctx, cancel)
+}
+
+type FFprobeClient interface {
+ ServiceRunner
+ // ProbeDoneCtx indicates the probe is done.
+ ProbeDoneCtx() context.Context
+ // Result return the raw string and metadata.
+ Result() (string, *ffprobeObject)
+}
+
+type ffprobeClient struct {
+ // The stream to probe.
+ streamURL string
+
+ // The DVR file for ffprobe. We DVR stream to file, then use ffprobe to detect it.
+ dvrFile string
+ // The duration of video file for DVR.
+ duration time.Duration
+ // The timeout to wait for task to done.
+ timeout time.Duration
+
+ // When probe stream metadata object.
+ doneCtx context.Context
+ doneCancel context.CancelFunc
+ // The metadata object.
+ metadata *ffprobeObject
+ // The raw string of ffprobe.
+ rawString string
+}
+
+func NewFFprobe(opts ...func(v *ffprobeClient)) FFprobeClient {
+ v := &ffprobeClient{
+ metadata: &ffprobeObject{},
+ }
+ v.doneCtx, v.doneCancel = context.WithCancel(context.Background())
+
+ for _, opt := range opts {
+ opt(v)
+ }
+
+ return v
+}
+
+func (v *ffprobeClient) ProbeDoneCtx() context.Context {
+ return v.doneCtx
+}
+
+func (v *ffprobeClient) Result() (string, *ffprobeObject) {
+ return v.rawString, v.metadata
+}
+
+func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFunc) error {
+ ctx, cancel := context.WithTimeout(ctxCase, v.timeout)
+ defer cancel()
+
+ logger.Tf(ctx, "Starting FFprobe for stream=%v, dvr=%v, duration=%v, timeout=%v",
+ v.streamURL, v.dvrFile, v.duration, v.timeout)
+
+ // Try to start a DVR process.
+ for ctx.Err() == nil {
+ // If error, just ignore and retry, because the stream might not be ready. For example, for HLS, the DVR process
+ // might need to wait for a duration of segment, 10s as such.
+ _ = v.doDVR(ctx)
+
+ // Check whether DVR file is ok.
+ if fs, err := os.Stat(v.dvrFile); err == nil && fs.Size() > 1024 {
+ logger.Tf(ctx, "DVR FFprobe file is ok, file=%v, size=%v", v.dvrFile, fs.Size())
+ break
+ }
+ }
+
+ // Ignore if case terminated.
+ if ctxCase.Err() != nil {
+ return nil
+ }
+
+ // Start a probe process for the DVR file.
+ return v.doProbe(ctx, cancelCase)
+}
+
+func (v *ffprobeClient) doDVR(ctx context.Context) error {
+ ctx, cancel := context.WithCancel(ctx)
+
+ process := newBackendService()
+ process.name = *srsFFmpeg
+ process.args = []string{
+ "-t", fmt.Sprintf("%v", int64(v.duration/time.Second)),
+ "-i", v.streamURL, "-c", "copy", "-y", v.dvrFile,
+ }
+ process.env = os.Environ()
+
+ process.onDispose = func(ctx context.Context, bs *backendService) error {
+ return nil
+ }
+ process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
+ logger.Tf(ctx, "DVR start %v %v", bs.name, strings.Join(bs.args, " "))
+ return nil
+ }
+ process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
+ logger.Tf(ctx, "DVR process pid=%v exit, r0=%v, stdout=%v", bs.pid, r0, stdout.String())
+ if *srsDVRStderr && stderr.Len() > 0 {
+ logger.Tf(ctx, "DVR process pid=%v, stderr is \n%v", bs.pid, stderr.String())
+ }
+ return nil
+ }
+
+ return process.Run(ctx, cancel)
+}
+
+func (v *ffprobeClient) doProbe(ctx context.Context, cancel context.CancelFunc) error {
+ process := newBackendService()
+ process.name = *srsFFprobe
+ process.args = []string{
+ "-show_error", "-show_private_data", "-v", "quiet", "-find_stream_info",
+ "-analyzeduration", fmt.Sprintf("%v", int64(v.duration/time.Microsecond)),
+ "-print_format", "json", "-show_format", "-show_streams", v.dvrFile,
+ }
+ process.env = os.Environ()
+
+ process.onDispose = func(ctx context.Context, bs *backendService) error {
+ if _, err := os.Stat(v.dvrFile); !os.IsNotExist(err) {
+ os.Remove(v.dvrFile)
+ }
+ return nil
+ }
+ process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
+ logger.Tf(ctx, "FFprobe start %v %v", bs.name, strings.Join(bs.args, " "))
+ return nil
+ }
+ process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
+ logger.Tf(ctx, "FFprobe process pid=%v exit, r0=%v, stderr=%v", bs.pid, r0, stderr.String())
+ if *srsFFprobeStdout && stdout.Len() > 0 {
+ logger.Tf(ctx, "FFprobe process pid=%v, stdout is \n%v", bs.pid, stdout.String())
+ }
+
+ str := stdout.String()
+ v.rawString = str
+
+ if err := json.Unmarshal([]byte(str), v.metadata); err != nil {
+ return err
+ }
+
+ m := v.metadata
+ logger.Tf(ctx, "FFprobe done pid=%v, %v", bs.pid, m.String())
+
+ if ts := 90; m.Format.ProbeScore < ts {
+ return errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str)
+ }
+ if dv := m.Duration(); dv < v.duration {
+ return errors.Errorf("short duration=%v < %v, %v, %v", dv, v.duration, m.String(), str)
+ }
+
+ v.doneCancel()
+ return nil
+ }
+
+ return process.Run(ctx, cancel)
+}
+
+/*
+ "index": 0,
+ "codec_name": "h264",
+ "codec_long_name": "H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10",
+ "profile": "High",
+ "codec_type": "video",
+ "codec_tag_string": "avc1",
+ "codec_tag": "0x31637661",
+ "width": 768,
+ "height": 320,
+ "coded_width": 768,
+ "coded_height": 320,
+ "closed_captions": 0,
+ "film_grain": 0,
+ "has_b_frames": 2,
+ "sample_aspect_ratio": "1:1",
+ "display_aspect_ratio": "12:5",
+ "pix_fmt": "yuv420p",
+ "level": 32,
+ "chroma_location": "left",
+ "field_order": "progressive",
+ "refs": 1,
+ "is_avc": "true",
+ "nal_length_size": "4",
+ "id": "0x1",
+ "r_frame_rate": "25/1",
+ "avg_frame_rate": "25/1",
+ "time_base": "1/16000",
+ "start_pts": 1280,
+ "start_time": "0.080000",
+ "duration_ts": 160000,
+ "duration": "10.000000",
+ "bit_rate": "196916",
+ "bits_per_raw_sample": "8",
+ "nb_frames": "250",
+ "extradata_size": 41,
+ "disposition": {
+ "default": 1,
+ "dub": 0,
+ "original": 0,
+ "comment": 0,
+ "lyrics": 0,
+ "karaoke": 0,
+ "forced": 0,
+ "hearing_impaired": 0,
+ "visual_impaired": 0,
+ "clean_effects": 0,
+ "attached_pic": 0,
+ "timed_thumbnails": 0,
+ "captions": 0,
+ "descriptions": 0,
+ "metadata": 0,
+ "dependent": 0,
+ "still_image": 0
+ },
+ "tags": {
+ "language": "und",
+ "handler_name": "VideoHandler",
+ "vendor_id": "[0][0][0][0]"
+ }
+*/
+/*
+ "index": 1,
+ "codec_name": "aac",
+ "codec_long_name": "AAC (Advanced Audio Coding)",
+ "profile": "LC",
+ "codec_type": "audio",
+ "codec_tag_string": "mp4a",
+ "codec_tag": "0x6134706d",
+ "sample_fmt": "fltp",
+ "sample_rate": "44100",
+ "channels": 2,
+ "channel_layout": "stereo",
+ "bits_per_sample": 0,
+ "id": "0x2",
+ "r_frame_rate": "0/0",
+ "avg_frame_rate": "0/0",
+ "time_base": "1/44100",
+ "start_pts": 132,
+ "start_time": "0.002993",
+ "duration_ts": 441314,
+ "duration": "10.007120",
+ "bit_rate": "29827",
+ "nb_frames": "431",
+ "extradata_size": 2,
+ "disposition": {
+ "default": 1,
+ "dub": 0,
+ "original": 0,
+ "comment": 0,
+ "lyrics": 0,
+ "karaoke": 0,
+ "forced": 0,
+ "hearing_impaired": 0,
+ "visual_impaired": 0,
+ "clean_effects": 0,
+ "attached_pic": 0,
+ "timed_thumbnails": 0,
+ "captions": 0,
+ "descriptions": 0,
+ "metadata": 0,
+ "dependent": 0,
+ "still_image": 0
+ },
+ "tags": {
+ "language": "und",
+ "handler_name": "SoundHandler",
+ "vendor_id": "[0][0][0][0]"
+ }
+*/
+type ffprobeObjectMedia struct {
+ Index int `json:"index"`
+ CodecName string `json:"codec_name"`
+ CodecType string `json:"codec_type"`
+ Timebase string `json:"time_base"`
+ Bitrate string `json:"bit_rate"`
+ Profile string `json:"profile"`
+ Duration string `json:"duration"`
+ CodecTagString string `json:"codec_tag_string"`
+
+ // For video codec.
+ Width int `json:"width"`
+ Height int `json:"height"`
+ CodedWidth int `json:"coded_width"`
+ CodedHeight int `json:"coded_height"`
+ RFramerate string `json:"r_frame_rate"`
+ AvgFramerate string `json:"avg_frame_rate"`
+ PixFmt string `json:"pix_fmt"`
+ Level int `json:"level"`
+
+ // For audio codec.
+ Channels int `json:"channels"`
+ ChannelLayout string `json:"channel_layout"`
+ SampleFmt string `json:"sample_fmt"`
+ SampleRate string `json:"sample_rate"`
+}
+
+func (v *ffprobeObjectMedia) String() string {
+ sb := strings.Builder{}
+
+ sb.WriteString(fmt.Sprintf("index=%v, codec=%v, type=%v, tb=%v, bitrate=%v, profile=%v, duration=%v",
+ v.Index, v.CodecName, v.CodecType, v.Timebase, v.Bitrate, v.Profile, v.Duration))
+ sb.WriteString(fmt.Sprintf(", codects=%v", v.CodecTagString))
+
+ if v.CodecType == "video" {
+ sb.WriteString(fmt.Sprintf(", size=%vx%v, csize=%vx%v, rfr=%v, afr=%v, pix=%v, level=%v",
+ v.Width, v.Height, v.CodedWidth, v.CodedHeight, v.RFramerate, v.AvgFramerate, v.PixFmt, v.Level))
+ } else if v.CodecType == "audio" {
+ sb.WriteString(fmt.Sprintf(", channels=%v, layout=%v, fmt=%v, srate=%v",
+ v.Channels, v.ChannelLayout, v.SampleFmt, v.SampleRate))
+ }
+
+ return sb.String()
+}
+
+/*
+"filename": "../objs/srs-ffprobe-stream-84487-8369019999559815097.mp4",
+"nb_streams": 2,
+"nb_programs": 0,
+"format_name": "mov,mp4,m4a,3gp,3g2,mj2",
+"format_long_name": "QuickTime / MOV",
+"start_time": "0.002993",
+"duration": "10.080000",
+"size": "292725",
+"bit_rate": "232321",
+"probe_score": 100,
+
+ "tags": {
+ "major_brand": "isom",
+ "minor_version": "512",
+ "compatible_brands": "isomiso2avc1mp41",
+ "encoder": "Lavf59.27.100"
+ }
+*/
+type ffprobeObjectFormat struct {
+ Filename string `json:"filename"`
+ Duration string `json:"duration"`
+ NBStream int16 `json:"nb_streams"`
+ Size string `json:"size"`
+ Bitrate string `json:"bit_rate"`
+ ProbeScore int `json:"probe_score"`
+}
+
+func (v *ffprobeObjectFormat) String() string {
+ return fmt.Sprintf("file=%v, duration=%v, score=%v, size=%v, bitrate=%v, streams=%v",
+ v.Filename, v.Duration, v.ProbeScore, v.Size, v.Bitrate, v.NBStream)
+}
+
+/*
+ {
+ "streams": [{ffprobeObjectMedia}, {ffprobeObjectMedia}],
+ "format": {ffprobeObjectFormat}
+ }
+*/
+type ffprobeObject struct {
+ Format ffprobeObjectFormat `json:"format"`
+ Streams []ffprobeObjectMedia `json:"streams"`
+}
+
+func (v *ffprobeObject) String() string {
+ sb := strings.Builder{}
+ sb.WriteString(v.Format.String())
+ sb.WriteString(", [")
+ for _, stream := range v.Streams {
+ sb.WriteString("{")
+ sb.WriteString(stream.String())
+ sb.WriteString("}")
+ }
+ sb.WriteString("]")
+ return sb.String()
+}
+
+func (v *ffprobeObject) Duration() time.Duration {
+ dv, err := strconv.ParseFloat(v.Format.Duration, 10)
+ if err != nil {
+ return time.Duration(0)
+ }
+
+ return time.Duration(dv*1000) * time.Millisecond
+}
diff --git a/trunk/3rdparty/srs-bench/srs/srs_test.go b/trunk/3rdparty/srs-bench/srs/srs_test.go
index 21b4f53b51f..c7f868f8c87 100644
--- a/trunk/3rdparty/srs-bench/srs/srs_test.go
+++ b/trunk/3rdparty/srs-bench/srs/srs_test.go
@@ -23,8 +23,10 @@ package srs
import (
"github.com/ossrs/go-oryx-lib/logger"
"io/ioutil"
+ "math/rand"
"os"
"testing"
+ "time"
)
func TestMain(m *testing.M) {
@@ -41,5 +43,8 @@ func TestMain(m *testing.M) {
}()
}
+ // Init rand seed.
+ rand.Seed(time.Now().UnixNano())
+
os.Exit(m.Run())
}