Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose MTX_SEGMENT_DURATION to runOnRecordSegmentComplete (#3440) (#2983) #3456

Merged
merged 4 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,7 @@ pathDefaults:
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
# * MTX_SEGMENT_DURATION: segment duration
runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
```

Expand Down
3 changes: 2 additions & 1 deletion internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,10 +806,11 @@ func (pa *path) startRecording() {
nil)
}
},
OnSegmentComplete: func(segmentPath string) {
OnSegmentComplete: func(segmentPath string, segmentDuration time.Duration) {
if pa.conf.RunOnRecordSegmentComplete != "" {
env := pa.ExternalCmdEnv()
env["MTX_SEGMENT_PATH"] = segmentPath
env["MTX_SEGMENT_DURATION"] = strconv.FormatFloat(segmentDuration.Seconds(), 'f', -1, 64)

pa.Log(logger.Info, "runOnRecordSegmentComplete command launched")
externalcmd.NewCmd(
Expand Down
120 changes: 91 additions & 29 deletions internal/core/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func (sh *testServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
var _ defs.Path = &path{}

func TestPathRunOnDemand(t *testing.T) {
onDemandFile := filepath.Join(os.TempDir(), "ondemand")
onUnDemandFile := filepath.Join(os.TempDir(), "onundemand")
onDemand := filepath.Join(os.TempDir(), "on_demand")
onUnDemand := filepath.Join(os.TempDir(), "on_undemand")

srcFile := filepath.Join(os.TempDir(), "ondemand.go")
err := os.WriteFile(srcFile,
[]byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemandFile)), 0o644)
[]byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemand)), 0o644)
require.NoError(t, err)

execFile := filepath.Join(os.TempDir(), "ondemand_cmd")
Expand All @@ -125,8 +125,8 @@ func TestPathRunOnDemand(t *testing.T) {

for _, ca := range []string{"describe", "setup", "describe and setup"} {
t.Run(ca, func(t *testing.T) {
defer os.Remove(onDemandFile)
defer os.Remove(onUnDemandFile)
defer os.Remove(onDemand)
defer os.Remove(onUnDemand)

p1, ok := newInstance(fmt.Sprintf("rtmp: no\n"+
"hls: no\n"+
Expand All @@ -135,7 +135,7 @@ func TestPathRunOnDemand(t *testing.T) {
" '~^(on)demand$':\n"+
" runOnDemand: %s\n"+
" runOnDemandCloseAfter: 1s\n"+
" runOnUnDemand: touch %s\n", execFile, onUnDemandFile))
" runOnUnDemand: touch %s\n", execFile, onUnDemand))
require.Equal(t, true, ok)
defer p1.Close()

Expand Down Expand Up @@ -204,14 +204,14 @@ func TestPathRunOnDemand(t *testing.T) {
}()

for {
_, err := os.Stat(onUnDemandFile)
_, err := os.Stat(onUnDemand)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}

_, err := os.Stat(onDemandFile)
_, err := os.Stat(onDemand)
require.NoError(t, err)
})
}
Expand All @@ -220,19 +220,19 @@ func TestPathRunOnDemand(t *testing.T) {
func TestPathRunOnConnect(t *testing.T) {
for _, ca := range []string{"rtsp", "rtmp", "srt"} {
t.Run(ca, func(t *testing.T) {
onConnectFile := filepath.Join(os.TempDir(), "onconnect")
defer os.Remove(onConnectFile)
onConnect := filepath.Join(os.TempDir(), "on_connect")
defer os.Remove(onConnect)

onDisconnectFile := filepath.Join(os.TempDir(), "ondisconnect")
defer os.Remove(onDisconnectFile)
onDisconnect := filepath.Join(os.TempDir(), "on_disconnect")
defer os.Remove(onDisconnect)

func() {
p, ok := newInstance(fmt.Sprintf(
"paths:\n"+
" test:\n"+
"runOnConnect: touch %s\n"+
"runOnDisconnect: touch %s\n",
onConnectFile, onDisconnectFile))
onConnect, onDisconnect))
require.Equal(t, true, ok)
defer p.Close()

Expand Down Expand Up @@ -273,21 +273,21 @@ func TestPathRunOnConnect(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}()

_, err := os.Stat(onConnectFile)
_, err := os.Stat(onConnect)
require.NoError(t, err)

_, err = os.Stat(onDisconnectFile)
_, err = os.Stat(onDisconnect)
require.NoError(t, err)
})
}
}

func TestPathRunOnReady(t *testing.T) {
onReadyFile := filepath.Join(os.TempDir(), "onready")
defer os.Remove(onReadyFile)
onReady := filepath.Join(os.TempDir(), "on_ready")
defer os.Remove(onReady)

onNotReadyFile := filepath.Join(os.TempDir(), "onunready")
defer os.Remove(onNotReadyFile)
onNotReady := filepath.Join(os.TempDir(), "on_unready")
defer os.Remove(onNotReady)

func() {
p, ok := newInstance(fmt.Sprintf("rtmp: no\n"+
Expand All @@ -297,7 +297,7 @@ func TestPathRunOnReady(t *testing.T) {
" test:\n"+
" runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+
" runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n",
onReadyFile, onNotReadyFile))
onReady, onNotReady))
require.Equal(t, true, ok)
defer p.Close()

Expand All @@ -312,31 +312,31 @@ func TestPathRunOnReady(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}()

byts, err := os.ReadFile(onReadyFile)
byts, err := os.ReadFile(onReady)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))

byts, err = os.ReadFile(onNotReadyFile)
byts, err = os.ReadFile(onNotReady)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))
}

func TestPathRunOnRead(t *testing.T) {
for _, ca := range []string{"rtsp", "rtmp", "srt", "webrtc"} {
t.Run(ca, func(t *testing.T) {
onReadFile := filepath.Join(os.TempDir(), "onread")
defer os.Remove(onReadFile)
onRead := filepath.Join(os.TempDir(), "on_read")
defer os.Remove(onRead)

onUnreadFile := filepath.Join(os.TempDir(), "onunread")
defer os.Remove(onUnreadFile)
onUnread := filepath.Join(os.TempDir(), "on_unread")
defer os.Remove(onUnread)

func() {
p, ok := newInstance(fmt.Sprintf(
"paths:\n"+
" test:\n"+
" runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+
" runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n",
onReadFile, onUnreadFile))
onRead, onUnread))
require.Equal(t, true, ok)
defer p.Close()

Expand Down Expand Up @@ -449,17 +449,79 @@ func TestPathRunOnRead(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}()

byts, err := os.ReadFile(onReadFile)
byts, err := os.ReadFile(onRead)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))

byts, err = os.ReadFile(onUnreadFile)
byts, err = os.ReadFile(onUnread)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))
})
}
}

func TestPathRunOnRecordSegment(t *testing.T) {
onRecordSegmentCreate := filepath.Join(os.TempDir(), "on_record_segment_create")
defer os.Remove(onRecordSegmentCreate)

onRecordSegmentComplete := filepath.Join(os.TempDir(), "on_record_segment_complete")
defer os.Remove(onRecordSegmentComplete)

recordDir, err := os.MkdirTemp("", "rtsp-path-record")
require.NoError(t, err)
defer os.RemoveAll(recordDir)

func() {
p, ok := newInstance("record: yes\n" +
"recordPath: " + filepath.Join(recordDir, "%path/%Y-%m-%d_%H-%M-%S-%f") + "\n" +
"paths:\n" +
" test:\n" +
" runOnRecordSegmentCreate: " +
"sh -c 'echo \"$MTX_SEGMENT_PATH\" > " + onRecordSegmentCreate + "'\n" +
" runOnRecordSegmentComplete: " +
"sh -c 'echo \"$MTX_SEGMENT_PATH $MTX_SEGMENT_DURATION\" > " + onRecordSegmentComplete + "'\n")
require.Equal(t, true, ok)
defer p.Close()

media0 := test.UniqueMediaH264()

source := gortsplib.Client{}

err = source.StartRecording(
"rtsp://localhost:8554/test",
&description.Session{Medias: []*description.Media{media0}})
require.NoError(t, err)
defer source.Close()

for i := 0; i < 4; i++ {
err = source.WritePacketRTP(media0, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 1123 + uint16(i),
Timestamp: 45343 + 90000*uint32(i),
SSRC: 563423,
},
Payload: []byte{5},
})
require.NoError(t, err)
}

time.Sleep(500 * time.Millisecond)
}()

byts, err := os.ReadFile(onRecordSegmentCreate)
require.NoError(t, err)
require.Equal(t, true, strings.HasPrefix(string(byts), recordDir))

byts, err = os.ReadFile(onRecordSegmentComplete)
require.NoError(t, err)
parts := strings.Split(string(byts[:len(byts)-1]), " ")
require.Equal(t, true, strings.HasPrefix(parts[0], recordDir))
require.Equal(t, "3", parts[1])
}

func TestPathMaxReaders(t *testing.T) {
p, ok := newInstance("paths:\n" +
" all_others:\n" +
Expand Down
12 changes: 9 additions & 3 deletions internal/record/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
"github.com/bluenviron/mediamtx/internal/stream"
)

// OnSegmentCreateFunc is the prototype of the function passed as OnSegmentCreate
type OnSegmentCreateFunc = func(path string)

// OnSegmentCompleteFunc is the prototype of the function passed as OnSegmentComplete
type OnSegmentCompleteFunc = func(path string, duration time.Duration)

// Agent writes recordings to disk.
type Agent struct {
WriteQueueSize int
Expand All @@ -17,8 +23,8 @@ type Agent struct {
SegmentDuration time.Duration
PathName string
Stream *stream.Stream
OnSegmentCreate OnSegmentFunc
OnSegmentComplete OnSegmentFunc
OnSegmentCreate OnSegmentCreateFunc
OnSegmentComplete OnSegmentCompleteFunc
Parent logger.Writer

restartPause time.Duration
Expand All @@ -36,7 +42,7 @@ func (w *Agent) Initialize() {
}
}
if w.OnSegmentComplete == nil {
w.OnSegmentComplete = func(string) {
w.OnSegmentComplete = func(string, time.Duration) {
}
}
if w.restartPause == 0 {
Expand Down
3 changes: 0 additions & 3 deletions internal/record/agent_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"github.com/bluenviron/mediamtx/internal/logger"
)

// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete
type OnSegmentFunc = func(string)

type sample struct {
*fmp4.PartSample
dts time.Duration
Expand Down
Loading
Loading