diff --git a/internal/core/path.go b/internal/core/path.go index de34f5b0f56..d9ba8df9be2 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -917,15 +917,15 @@ func (pa *path) setNotReady() { } func (pa *path) startRecording() { - pa.recordAgent = record.NewAgent( - pa.writeQueueSize, - pa.conf.RecordPath, - pa.conf.RecordFormat, - time.Duration(pa.conf.RecordPartDuration), - time.Duration(pa.conf.RecordSegmentDuration), - pa.name, - pa.stream, - func(segmentPath string) { + pa.recordAgent = &record.Agent{ + WriteQueueSize: pa.writeQueueSize, + RecordPath: pa.conf.RecordPath, + Format: pa.conf.RecordFormat, + PartDuration: time.Duration(pa.conf.RecordPartDuration), + SegmentDuration: time.Duration(pa.conf.RecordSegmentDuration), + PathName: pa.name, + Stream: pa.stream, + OnSegmentCreate: func(segmentPath string) { if pa.conf.RunOnRecordSegmentCreate != "" { env := pa.externalCmdEnv() env["MTX_SEGMENT_PATH"] = segmentPath @@ -939,7 +939,7 @@ func (pa *path) startRecording() { nil) } }, - func(segmentPath string) { + OnSegmentComplete: func(segmentPath string) { if pa.conf.RunOnRecordSegmentComplete != "" { env := pa.externalCmdEnv() env["MTX_SEGMENT_PATH"] = segmentPath @@ -953,8 +953,9 @@ func (pa *path) startRecording() { nil) } }, - pa, - ) + Parent: pa, + } + pa.recordAgent.Initialize() } func (pa *path) executeRemoveReader(r reader) { diff --git a/internal/httpserv/wrapped_server_test.go b/internal/httpserv/wrapped_server_test.go index 02b04ff7339..1db5d4e99f8 100644 --- a/internal/httpserv/wrapped_server_test.go +++ b/internal/httpserv/wrapped_server_test.go @@ -14,7 +14,6 @@ import ( type testLogger struct{} func (testLogger) Log(_ logger.Level, _ string, _ ...interface{}) { - // fmt.Printf(format, args...) } func TestFilterEmptyPath(t *testing.T) { diff --git a/internal/record/agent.go b/internal/record/agent.go index f4b37abdcbc..05552f2d488 100644 --- a/internal/record/agent.go +++ b/internal/record/agent.go @@ -1,124 +1,84 @@ package record import ( - "context" - "strings" "time" - "github.com/bluenviron/mediacommon/pkg/formats/fmp4" - - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" ) -// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete -type OnSegmentFunc = func(string) - -type sample struct { - *fmp4.PartSample - dts time.Duration -} - -// Agent saves streams on disk. +// Agent is a record agent. type Agent struct { - path string - partDuration time.Duration - segmentDuration time.Duration - stream *stream.Stream - onSegmentCreate OnSegmentFunc - onSegmentComplete OnSegmentFunc - parent logger.Writer - - ctx context.Context - ctxCancel func() - writer *asyncwriter.Writer - format recFormat - - done chan struct{} + WriteQueueSize int + RecordPath string + Format conf.RecordFormat + PartDuration time.Duration + SegmentDuration time.Duration + PathName string + Stream *stream.Stream + OnSegmentCreate OnSegmentFunc + OnSegmentComplete OnSegmentFunc + Parent logger.Writer + + restartPause time.Duration + + currentInstance *agentInstance + + terminate chan struct{} + done chan struct{} } -// NewAgent allocates an Agent. -func NewAgent( - writeQueueSize int, - path string, - format conf.RecordFormat, - partDuration time.Duration, - segmentDuration time.Duration, - pathName string, - stream *stream.Stream, - onSegmentCreate OnSegmentFunc, - onSegmentComplete OnSegmentFunc, - parent logger.Writer, -) *Agent { - path = strings.ReplaceAll(path, "%path", pathName) - - switch format { - case conf.RecordFormatMPEGTS: - path += ".ts" - - default: - path += ".mp4" +// Initialize initializes Agent. +func (w *Agent) Initialize() { + if w.restartPause == 0 { + w.restartPause = 2 * time.Second } - ctx, ctxCancel := context.WithCancel(context.Background()) - - a := &Agent{ - path: path, - partDuration: partDuration, - segmentDuration: segmentDuration, - stream: stream, - onSegmentCreate: onSegmentCreate, - onSegmentComplete: onSegmentComplete, - parent: parent, - ctx: ctx, - ctxCancel: ctxCancel, - done: make(chan struct{}), - } - - a.writer = asyncwriter.New(writeQueueSize, a) + w.terminate = make(chan struct{}) + w.done = make(chan struct{}) - switch format { - case conf.RecordFormatMPEGTS: - a.format = newRecFormatMPEGTS(a) - - default: - a.format = newRecFormatFMP4(a) + w.currentInstance = &agentInstance{ + wrapper: w, } + w.currentInstance.initialize() - go a.run() - - return a -} - -// Close closes the Agent. -func (a *Agent) Close() { - a.Log(logger.Info, "recording stopped") - - a.ctxCancel() - <-a.done + go w.run() } // Log is the main logging function. -func (a *Agent) Log(level logger.Level, format string, args ...interface{}) { - a.parent.Log(level, "[record] "+format, args...) +func (w *Agent) Log(level logger.Level, format string, args ...interface{}) { + w.Parent.Log(level, "[record] "+format, args...) } -func (a *Agent) run() { - defer close(a.done) - - a.writer.Start() - - select { - case err := <-a.writer.Error(): - a.Log(logger.Error, err.Error()) - a.stream.RemoveReader(a.writer) +// Close closes the agent. +func (w *Agent) Close() { + w.Log(logger.Info, "recording stopped") + close(w.terminate) + <-w.done +} - case <-a.ctx.Done(): - a.stream.RemoveReader(a.writer) - a.writer.Stop() +func (w *Agent) run() { + defer close(w.done) + + for { + select { + case <-w.currentInstance.done: + w.currentInstance.close() + case <-w.terminate: + w.currentInstance.close() + return + } + + select { + case <-time.After(w.restartPause): + case <-w.terminate: + return + } + + w.currentInstance = &agentInstance{ + wrapper: w, + } + w.currentInstance.initialize() } - - a.format.close() } diff --git a/internal/record/agent_instance.go b/internal/record/agent_instance.go new file mode 100644 index 00000000000..be90a2d4917 --- /dev/null +++ b/internal/record/agent_instance.go @@ -0,0 +1,87 @@ +package record + +import ( + "strings" + "time" + + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + + "github.com/bluenviron/mediamtx/internal/asyncwriter" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/logger" +) + +// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete +type OnSegmentFunc = func(string) + +type sample struct { + *fmp4.PartSample + dts time.Duration +} + +type agentInstance struct { + wrapper *Agent + + resolvedPath string + writer *asyncwriter.Writer + format recFormat + + terminate chan struct{} + done chan struct{} +} + +func (a *agentInstance) initialize() { + a.resolvedPath = strings.ReplaceAll(a.wrapper.RecordPath, "%path", a.wrapper.PathName) + + switch a.wrapper.Format { + case conf.RecordFormatMPEGTS: + a.resolvedPath += ".ts" + + default: + a.resolvedPath += ".mp4" + } + + a.terminate = make(chan struct{}) + a.done = make(chan struct{}) + + a.writer = asyncwriter.New(a.wrapper.WriteQueueSize, a.wrapper) + + switch a.wrapper.Format { + case conf.RecordFormatMPEGTS: + a.format = &recFormatMPEGTS{ + a: a, + } + a.format.initialize() + + default: + a.format = &recFormatFMP4{ + a: a, + } + a.format.initialize() + } + + go a.run() +} + +func (a *agentInstance) close() { + close(a.terminate) + <-a.done +} + +func (a *agentInstance) run() { + defer close(a.done) + + a.writer.Start() + + select { + case err := <-a.writer.Error(): + a.wrapper.Log(logger.Error, err.Error()) + a.wrapper.Stream.RemoveReader(a.writer) + + case <-a.terminate: + a.wrapper.Stream.RemoveReader(a.writer) + a.writer.Stop() + } + + a.format.close() +} diff --git a/internal/record/agent_test.go b/internal/record/agent_test.go index 438b298d132..8be45e22faa 100644 --- a/internal/record/agent_test.go +++ b/internal/record/agent_test.go @@ -24,47 +24,111 @@ func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) { } func TestAgent(t *testing.T) { + desc := &description.Session{Medias: []*description.Media{ + { + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H265{ + PayloadTyp: 96, + }}, + }, + { + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, + }, + { + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + }}, + }, + }} + + writeToStream := func(stream *stream.Stream) { + for i := 0; i < 3; i++ { + stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{ + Base: unit.Base{ + PTS: (50 + time.Duration(i)) * time.Second, + }, + AU: [][]byte{ + { // VPS + 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20, + 0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24, + }, + { // SPS + 0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03, + 0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d, + 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, + 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, + 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, + 0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a, + 0x02, 0x02, 0x02, 0x01, + }, + { // PPS + 0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40, + }, + {byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR + }, + }) + + stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{ + Base: unit.Base{ + PTS: (50 + time.Duration(i)) * time.Second, + }, + AU: [][]byte{ + { // SPS + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, + }, + { // PPS + 0x08, 0x06, 0x07, 0x08, + }, + {5}, // IDR + }, + }) + + stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{ + Base: unit.Base{ + PTS: (50 + time.Duration(i)) * time.Second, + }, + AUs: [][]byte{{1, 2, 3, 4}}, + }) + } + } + for _, ca := range []string{"fmp4", "mpegts"} { t.Run(ca, func(t *testing.T) { n := 0 timeNow = func() time.Time { n++ - if n >= 2 { - return time.Date(2008, 0o5, 20, 22, 15, 25, 125000, time.UTC) + switch n { + case 1: + return time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC) + + case 2: + return time.Date(2009, 0o5, 20, 22, 15, 25, 0, time.UTC) + + case 3: + return time.Date(2010, 0o5, 20, 22, 15, 25, 0, time.UTC) + + default: + return time.Date(2011, 0o5, 20, 22, 15, 25, 0, time.UTC) } - return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC) } - desc := &description.Session{Medias: []*description.Media{ - { - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H265{ - PayloadTyp: 96, - }}, - }, - { - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - }}, - }, - { - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, - }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, - }}, - }, - }} - stream, err := stream.New( 1460, desc, @@ -80,8 +144,8 @@ func TestAgent(t *testing.T) { recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") - segCreated := make(chan struct{}, 2) - segDone := make(chan struct{}, 2) + segCreated := make(chan struct{}, 4) + segDone := make(chan struct{}, 4) var f conf.RecordFormat if ca == "fmp4" { @@ -90,83 +154,42 @@ func TestAgent(t *testing.T) { f = conf.RecordFormatMPEGTS } - a := NewAgent( - 1024, - recordPath, - f, - 100*time.Millisecond, - 1*time.Second, - "mypath", - stream, - func(fpath string) { + w := &Agent{ + WriteQueueSize: 1024, + RecordPath: recordPath, + Format: f, + PartDuration: 100 * time.Millisecond, + SegmentDuration: 1 * time.Second, + PathName: "mypath", + Stream: stream, + OnSegmentCreate: func(fpath string) { segCreated <- struct{}{} }, - func(fpath string) { + OnSegmentComplete: func(fpath string) { segDone <- struct{}{} }, - &nilLogger{}, - ) - - for i := 0; i < 3; i++ { - stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{ - Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, - }, - AU: [][]byte{ - { // VPS - 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20, - 0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03, - 0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24, - }, - { // SPS - 0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03, - 0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, - 0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d, - 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, - 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, - 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, - 0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a, - 0x02, 0x02, 0x02, 0x01, - }, - { // PPS - 0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40, - }, - {byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR - }, - }) + Parent: &nilLogger{}, + restartPause: 1 * time.Millisecond, + } + w.Initialize() - stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{ - Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, - }, - AU: [][]byte{ - { // SPS - 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, - 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, - 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, - }, - { // PPS - 0x08, 0x06, 0x07, 0x08, - }, - {5}, // IDR - }, - }) + writeToStream(stream) - stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{ - Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, - }, - AUs: [][]byte{{1, 2, 3, 4}}, - }) - } + // simulate a write error + stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{ + Base: unit.Base{ + PTS: 0, + }, + AU: [][]byte{ + {5}, // IDR + }, + }) for i := 0; i < 2; i++ { <-segCreated <-segDone } - a.Close() - var ext string if ca == "fmp4" { ext = "mp4" @@ -174,10 +197,29 @@ func TestAgent(t *testing.T) { ext = "ts" } - _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125."+ext)) + _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext)) + require.NoError(t, err) + + _, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000000."+ext)) + require.NoError(t, err) + + time.Sleep(50 * time.Millisecond) + + writeToStream(stream) + + time.Sleep(50 * time.Millisecond) + + w.Close() + + for i := 0; i < 2; i++ { + <-segCreated + <-segDone + } + + _, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext)) require.NoError(t, err) - _, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427."+ext)) + _, err = os.Stat(filepath.Join(dir, "mypath", "2011-05-20_22-15-25-000000."+ext)) require.NoError(t, err) }) } diff --git a/internal/record/rec_format.go b/internal/record/rec_format.go index 8ec70d90db3..8a0508f9cee 100644 --- a/internal/record/rec_format.go +++ b/internal/record/rec_format.go @@ -1,5 +1,6 @@ package record type recFormat interface { + initialize() close() } diff --git a/internal/record/rec_format_fmp4.go b/internal/record/rec_format_fmp4.go index 5e63a3dc3cc..d2b87d13e34 100644 --- a/internal/record/rec_format_fmp4.go +++ b/internal/record/rec_format_fmp4.go @@ -97,18 +97,15 @@ func jpegExtractSize(image []byte) (int, int, error) { } type recFormatFMP4 struct { - a *Agent + a *agentInstance + tracks []*recFormatFMP4Track hasVideo bool currentSegment *recFormatFMP4Segment nextSequenceNumber uint32 } -func newRecFormatFMP4(a *Agent) recFormat { - f := &recFormatFMP4{ - a: a, - } - +func (f *recFormatFMP4) initialize() { nextID := 1 addTrack := func(codec fmp4.Codec) *recFormatFMP4Track { @@ -135,7 +132,7 @@ func newRecFormatFMP4(a *Agent) recFormat { } } - for _, media := range a.stream.Desc().Medias { + for _, media := range f.a.wrapper.Stream.Desc().Medias { for _, forma := range media.Formats { switch forma := forma.(type) { case *format.AV1: @@ -148,7 +145,7 @@ func newRecFormatFMP4(a *Agent) recFormat { firstReceived := false - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.AV1) if tunit.TU == nil { return nil @@ -205,7 +202,7 @@ func newRecFormatFMP4(a *Agent) recFormat { firstReceived := false - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.VP9) if tunit.Frame == nil { return nil @@ -302,7 +299,7 @@ func newRecFormatFMP4(a *Agent) recFormat { var dtsExtractor *h265.DTSExtractor - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.H265) if tunit.AU == nil { return nil @@ -387,7 +384,7 @@ func newRecFormatFMP4(a *Agent) recFormat { var dtsExtractor *h264.DTSExtractor - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.H264) if tunit.AU == nil { return nil @@ -463,7 +460,7 @@ func newRecFormatFMP4(a *Agent) recFormat { firstReceived := false var lastPTS time.Duration - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG4Video) if tunit.Frame == nil { return nil @@ -515,7 +512,7 @@ func newRecFormatFMP4(a *Agent) recFormat { firstReceived := false var lastPTS time.Duration - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG1Video) if tunit.Frame == nil { return nil @@ -563,7 +560,7 @@ func newRecFormatFMP4(a *Agent) recFormat { parsed := false - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MJPEG) if tunit.Frame == nil { return nil @@ -599,7 +596,7 @@ func newRecFormatFMP4(a *Agent) recFormat { } track := addTrack(codec) - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.Opus) if tunit.Packets == nil { return nil @@ -632,7 +629,7 @@ func newRecFormatFMP4(a *Agent) recFormat { sampleRate := time.Duration(forma.ClockRate()) - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG4Audio) if tunit.AUs == nil { return nil @@ -665,7 +662,7 @@ func newRecFormatFMP4(a *Agent) recFormat { parsed := false - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG1Audio) if tunit.Frames == nil { return nil @@ -719,7 +716,7 @@ func newRecFormatFMP4(a *Agent) recFormat { parsed := false - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.AC3) if tunit.Frames == nil { return nil @@ -785,7 +782,7 @@ func newRecFormatFMP4(a *Agent) recFormat { } track := addTrack(codec) - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.LPCM) if tunit.Samples == nil { return nil @@ -802,7 +799,7 @@ func newRecFormatFMP4(a *Agent) recFormat { } } - a.Log(logger.Info, "recording %d %s", + f.a.wrapper.Log(logger.Info, "recording %d %s", len(f.tracks), func() string { if len(f.tracks) == 1 { @@ -810,8 +807,6 @@ func newRecFormatFMP4(a *Agent) recFormat { } return "tracks" }()) - - return f } func (f *recFormatFMP4) close() { diff --git a/internal/record/rec_format_fmp4_part.go b/internal/record/rec_format_fmp4_part.go index 4d6d313b64c..890f00bc58e 100644 --- a/internal/record/rec_format_fmp4_part.go +++ b/internal/record/rec_format_fmp4_part.go @@ -65,8 +65,8 @@ func newRecFormatFMP4Part( func (p *recFormatFMP4Part) close() error { if p.s.fi == nil { - p.s.fpath = encodeRecordPath(&recordPathParams{time: p.created}, p.s.f.a.path) - p.s.f.a.Log(logger.Debug, "creating segment %s", p.s.fpath) + p.s.fpath = encodeRecordPath(&recordPathParams{time: p.created}, p.s.f.a.resolvedPath) + p.s.f.a.wrapper.Log(logger.Debug, "creating segment %s", p.s.fpath) err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755) if err != nil { @@ -78,7 +78,7 @@ func (p *recFormatFMP4Part) close() error { return err } - p.s.f.a.onSegmentCreate(p.s.fpath) + p.s.f.a.wrapper.OnSegmentCreate(p.s.fpath) err = writeInit(fi, p.s.f.tracks) if err != nil { diff --git a/internal/record/rec_format_fmp4_segment.go b/internal/record/rec_format_fmp4_segment.go index a2c83a04616..b2af062fb6e 100644 --- a/internal/record/rec_format_fmp4_segment.go +++ b/internal/record/rec_format_fmp4_segment.go @@ -60,14 +60,14 @@ func (s *recFormatFMP4Segment) close() error { } if s.fi != nil { - s.f.a.Log(logger.Debug, "closing segment %s", s.fpath) + s.f.a.wrapper.Log(logger.Debug, "closing segment %s", s.fpath) err2 := s.fi.Close() if err == nil { err = err2 } if err2 == nil { - s.f.a.onSegmentComplete(s.fpath) + s.f.a.wrapper.OnSegmentComplete(s.fpath) } } @@ -78,7 +78,7 @@ func (s *recFormatFMP4Segment) record(track *recFormatFMP4Track, sample *sample) if s.curPart == nil { s.curPart = newRecFormatFMP4Part(s, s.f.nextSequenceNumber, sample.dts) s.f.nextSequenceNumber++ - } else if s.curPart.duration() >= s.f.a.partDuration { + } else if s.curPart.duration() >= s.f.a.wrapper.PartDuration { err := s.curPart.close() s.curPart = nil diff --git a/internal/record/rec_format_fmp4_track.go b/internal/record/rec_format_fmp4_track.go index de0a4a5bc6d..228833753a1 100644 --- a/internal/record/rec_format_fmp4_track.go +++ b/internal/record/rec_format_fmp4_track.go @@ -44,7 +44,7 @@ func (t *recFormatFMP4Track) record(sample *sample) error { if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) && !t.nextSample.IsNonSyncSample && - (t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.segmentDuration { + (t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.wrapper.SegmentDuration { err := t.f.currentSegment.close() if err != nil { return err diff --git a/internal/record/rec_format_mpegts.go b/internal/record/rec_format_mpegts.go index 2683aa02440..2092e18f7b0 100644 --- a/internal/record/rec_format_mpegts.go +++ b/internal/record/rec_format_mpegts.go @@ -39,7 +39,7 @@ func (d *dynamicWriter) setTarget(w io.Writer) { } type recFormatMPEGTS struct { - a *Agent + a *agentInstance dw *dynamicWriter bw *bufio.Writer @@ -48,11 +48,7 @@ type recFormatMPEGTS struct { currentSegment *recFormatMPEGTSSegment } -func newRecFormatMPEGTS(a *Agent) recFormat { - f := &recFormatMPEGTS{ - a: a, - } - +func (f *recFormatMPEGTS) initialize() { var tracks []*mpegts.Track addTrack := func(codec mpegts.Codec) *mpegts.Track { @@ -63,7 +59,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { return track } - for _, media := range a.stream.Desc().Medias { + for _, media := range f.a.wrapper.Stream.Desc().Medias { for _, forma := range media.Formats { switch forma := forma.(type) { case *format.H265: @@ -71,7 +67,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { var dtsExtractor *h265.DTSExtractor - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.H265) if tunit.AU == nil { return nil @@ -99,7 +95,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { var dtsExtractor *h264.DTSExtractor - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.H264) if tunit.AU == nil { return nil @@ -128,7 +124,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { firstReceived := false var lastPTS time.Duration - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG4Video) if tunit.Frame == nil { return nil @@ -158,7 +154,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { firstReceived := false var lastPTS time.Duration - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG1Video) if tunit.Frame == nil { return nil @@ -192,7 +188,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { }(), }) - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.Opus) if tunit.Packets == nil { return nil @@ -211,7 +207,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { Config: *forma.GetConfig(), }) - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG4Audio) if tunit.AUs == nil { return nil @@ -228,7 +224,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { case *format.MPEG1Audio: track := addTrack(&mpegts.CodecMPEG1Audio{}) - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG1Audio) if tunit.Frames == nil { return nil @@ -247,7 +243,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { sampleRate := time.Duration(forma.SampleRate) - a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error { + f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.AC3) if tunit.Frames == nil { return nil @@ -273,7 +269,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize) f.mw = mpegts.NewWriter(f.bw, tracks) - a.Log(logger.Info, "recording %d %s", + f.a.wrapper.Log(logger.Info, "recording %d %s", len(tracks), func() string { if len(tracks) == 1 { @@ -281,8 +277,6 @@ func newRecFormatMPEGTS(a *Agent) recFormat { } return "tracks" }()) - - return f } func (f *recFormatMPEGTS) close() { @@ -298,7 +292,7 @@ func (f *recFormatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAc case (!f.hasVideo || isVideo) && randomAccess && - (dts-f.currentSegment.startDTS) >= f.a.segmentDuration: + (dts-f.currentSegment.startDTS) >= f.a.wrapper.SegmentDuration: err := f.currentSegment.close() if err != nil { return err @@ -306,7 +300,7 @@ func (f *recFormatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAc f.currentSegment = newRecFormatMPEGTSSegment(f, dts) - case (dts - f.currentSegment.lastFlush) >= f.a.partDuration: + case (dts - f.currentSegment.lastFlush) >= f.a.wrapper.PartDuration: err := f.bw.Flush() if err != nil { return err diff --git a/internal/record/rec_format_mpegts_segment.go b/internal/record/rec_format_mpegts_segment.go index 89a1baa5ce8..39f1a0e8cd9 100644 --- a/internal/record/rec_format_mpegts_segment.go +++ b/internal/record/rec_format_mpegts_segment.go @@ -35,14 +35,14 @@ func (s *recFormatMPEGTSSegment) close() error { err := s.f.bw.Flush() if s.fi != nil { - s.f.a.Log(logger.Debug, "closing segment %s", s.fpath) + s.f.a.wrapper.Log(logger.Debug, "closing segment %s", s.fpath) err2 := s.fi.Close() if err == nil { err = err2 } if err2 == nil { - s.f.a.onSegmentComplete(s.fpath) + s.f.a.wrapper.OnSegmentComplete(s.fpath) } } @@ -51,8 +51,8 @@ func (s *recFormatMPEGTSSegment) close() error { func (s *recFormatMPEGTSSegment) Write(p []byte) (int, error) { if s.fi == nil { - s.fpath = encodeRecordPath(&recordPathParams{time: s.created}, s.f.a.path) - s.f.a.Log(logger.Debug, "creating segment %s", s.fpath) + s.fpath = encodeRecordPath(&recordPathParams{time: s.created}, s.f.a.resolvedPath) + s.f.a.wrapper.Log(logger.Debug, "creating segment %s", s.fpath) err := os.MkdirAll(filepath.Dir(s.fpath), 0o755) if err != nil { @@ -64,7 +64,7 @@ func (s *recFormatMPEGTSSegment) Write(p []byte) (int, error) { return 0, err } - s.f.a.onSegmentCreate(s.fpath) + s.f.a.wrapper.OnSegmentCreate(s.fpath) s.fi = fi }