Skip to content

Commit

Permalink
restart recordings in case of errors (#2439)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Oct 26, 2023
1 parent 28452ac commit 26e0956
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 267 deletions.
25 changes: 13 additions & 12 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,15 +917,15 @@ func (pa *path) setNotReady() {
}

func (pa *path) startRecording() {
pa.recordAgent = record.NewAgent(
pa.writeQueueSize,
pa.conf.RecordPath,
pa.conf.RecordFormat,
time.Duration(pa.conf.RecordPartDuration),
time.Duration(pa.conf.RecordSegmentDuration),
pa.name,
pa.stream,
func(segmentPath string) {
pa.recordAgent = &record.Agent{
WriteQueueSize: pa.writeQueueSize,
RecordPath: pa.conf.RecordPath,
Format: pa.conf.RecordFormat,
PartDuration: time.Duration(pa.conf.RecordPartDuration),
SegmentDuration: time.Duration(pa.conf.RecordSegmentDuration),
PathName: pa.name,
Stream: pa.stream,
OnSegmentCreate: func(segmentPath string) {
if pa.conf.RunOnRecordSegmentCreate != "" {
env := pa.externalCmdEnv()
env["MTX_SEGMENT_PATH"] = segmentPath
Expand All @@ -939,7 +939,7 @@ func (pa *path) startRecording() {
nil)
}
},
func(segmentPath string) {
OnSegmentComplete: func(segmentPath string) {
if pa.conf.RunOnRecordSegmentComplete != "" {
env := pa.externalCmdEnv()
env["MTX_SEGMENT_PATH"] = segmentPath
Expand All @@ -953,8 +953,9 @@ func (pa *path) startRecording() {
nil)
}
},
pa,
)
Parent: pa,
}
pa.recordAgent.Initialize()
}

func (pa *path) executeRemoveReader(r reader) {
Expand Down
1 change: 0 additions & 1 deletion internal/httpserv/wrapped_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
type testLogger struct{}

func (testLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
// fmt.Printf(format, args...)
}

func TestFilterEmptyPath(t *testing.T) {
Expand Down
156 changes: 58 additions & 98 deletions internal/record/agent.go
Original file line number Diff line number Diff line change
@@ -1,124 +1,84 @@
package record

import (
"context"
"strings"
"time"

"github.com/bluenviron/mediacommon/pkg/formats/fmp4"

"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
)

// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete
type OnSegmentFunc = func(string)

type sample struct {
*fmp4.PartSample
dts time.Duration
}

// Agent saves streams on disk.
// Agent is a record agent.
type Agent struct {
path string
partDuration time.Duration
segmentDuration time.Duration
stream *stream.Stream
onSegmentCreate OnSegmentFunc
onSegmentComplete OnSegmentFunc
parent logger.Writer

ctx context.Context
ctxCancel func()
writer *asyncwriter.Writer
format recFormat

done chan struct{}
WriteQueueSize int
RecordPath string
Format conf.RecordFormat
PartDuration time.Duration
SegmentDuration time.Duration
PathName string
Stream *stream.Stream
OnSegmentCreate OnSegmentFunc
OnSegmentComplete OnSegmentFunc
Parent logger.Writer

restartPause time.Duration

currentInstance *agentInstance

terminate chan struct{}
done chan struct{}
}

// NewAgent allocates an Agent.
func NewAgent(
writeQueueSize int,
path string,
format conf.RecordFormat,
partDuration time.Duration,
segmentDuration time.Duration,
pathName string,
stream *stream.Stream,
onSegmentCreate OnSegmentFunc,
onSegmentComplete OnSegmentFunc,
parent logger.Writer,
) *Agent {
path = strings.ReplaceAll(path, "%path", pathName)

switch format {
case conf.RecordFormatMPEGTS:
path += ".ts"

default:
path += ".mp4"
// Initialize initializes Agent.
func (w *Agent) Initialize() {
if w.restartPause == 0 {
w.restartPause = 2 * time.Second

Check warning on line 35 in internal/record/agent.go

View check run for this annotation

Codecov / codecov/patch

internal/record/agent.go#L35

Added line #L35 was not covered by tests
}

ctx, ctxCancel := context.WithCancel(context.Background())

a := &Agent{
path: path,
partDuration: partDuration,
segmentDuration: segmentDuration,
stream: stream,
onSegmentCreate: onSegmentCreate,
onSegmentComplete: onSegmentComplete,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
done: make(chan struct{}),
}

a.writer = asyncwriter.New(writeQueueSize, a)
w.terminate = make(chan struct{})
w.done = make(chan struct{})

switch format {
case conf.RecordFormatMPEGTS:
a.format = newRecFormatMPEGTS(a)

default:
a.format = newRecFormatFMP4(a)
w.currentInstance = &agentInstance{
wrapper: w,
}
w.currentInstance.initialize()

go a.run()

return a
}

// Close closes the Agent.
func (a *Agent) Close() {
a.Log(logger.Info, "recording stopped")

a.ctxCancel()
<-a.done
go w.run()
}

// Log is the main logging function.
func (a *Agent) Log(level logger.Level, format string, args ...interface{}) {
a.parent.Log(level, "[record] "+format, args...)
func (w *Agent) Log(level logger.Level, format string, args ...interface{}) {
w.Parent.Log(level, "[record] "+format, args...)
}

func (a *Agent) run() {
defer close(a.done)

a.writer.Start()

select {
case err := <-a.writer.Error():
a.Log(logger.Error, err.Error())
a.stream.RemoveReader(a.writer)
// Close closes the agent.
func (w *Agent) Close() {
w.Log(logger.Info, "recording stopped")
close(w.terminate)
<-w.done
}

case <-a.ctx.Done():
a.stream.RemoveReader(a.writer)
a.writer.Stop()
func (w *Agent) run() {
defer close(w.done)

for {
select {
case <-w.currentInstance.done:
w.currentInstance.close()
case <-w.terminate:
w.currentInstance.close()
return
}

select {
case <-time.After(w.restartPause):
case <-w.terminate:
return

Check warning on line 76 in internal/record/agent.go

View check run for this annotation

Codecov / codecov/patch

internal/record/agent.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}

w.currentInstance = &agentInstance{
wrapper: w,
}
w.currentInstance.initialize()
}

a.format.close()
}
87 changes: 87 additions & 0 deletions internal/record/agent_instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package record

import (
"strings"
"time"

"github.com/bluenviron/mediacommon/pkg/formats/fmp4"

"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
)

// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete
type OnSegmentFunc = func(string)

type sample struct {
*fmp4.PartSample
dts time.Duration
}

type agentInstance struct {
wrapper *Agent

resolvedPath string
writer *asyncwriter.Writer
format recFormat

terminate chan struct{}
done chan struct{}
}

func (a *agentInstance) initialize() {
a.resolvedPath = strings.ReplaceAll(a.wrapper.RecordPath, "%path", a.wrapper.PathName)

switch a.wrapper.Format {
case conf.RecordFormatMPEGTS:
a.resolvedPath += ".ts"

default:
a.resolvedPath += ".mp4"
}

a.terminate = make(chan struct{})
a.done = make(chan struct{})

a.writer = asyncwriter.New(a.wrapper.WriteQueueSize, a.wrapper)

switch a.wrapper.Format {
case conf.RecordFormatMPEGTS:
a.format = &recFormatMPEGTS{
a: a,
}
a.format.initialize()

default:
a.format = &recFormatFMP4{
a: a,
}
a.format.initialize()
}

go a.run()
}

func (a *agentInstance) close() {
close(a.terminate)
<-a.done
}

func (a *agentInstance) run() {
defer close(a.done)

a.writer.Start()

select {
case err := <-a.writer.Error():
a.wrapper.Log(logger.Error, err.Error())
a.wrapper.Stream.RemoveReader(a.writer)

case <-a.terminate:
a.wrapper.Stream.RemoveReader(a.writer)
a.writer.Stop()
}

a.format.close()
}
Loading

0 comments on commit 26e0956

Please sign in to comment.