From 2c28a7f98e1d7d2d677f68215da972ac95d0cfcb Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 14 Jan 2024 22:50:07 +0100 Subject: [PATCH 1/3] add playback server --- apidocs/openapi.yaml | 16 +- go.mod | 3 +- go.sum | 6 +- internal/api/api.go | 14 +- internal/conf/conf.go | 33 +- internal/conf/credential.go | 40 +-- internal/conf/credential_test.go | 4 +- internal/conf/path.go | 44 ++- internal/core/core.go | 65 +++- internal/core/path.go | 77 ++-- internal/core/path_manager.go | 183 ++++------ internal/defs/path.go | 10 +- internal/defs/path_manager.go | 2 +- internal/playback/fmp4.go | 432 +++++++++++++++++++++++ internal/playback/fmp4_test.go | 79 +++++ internal/playback/server.go | 299 ++++++++++++++++ internal/playback/server_test.go | 228 ++++++++++++ internal/record/agent_instance.go | 13 +- internal/record/cleaner.go | 50 +-- internal/record/cleaner_test.go | 2 +- internal/record/format_fmp4_part.go | 10 +- internal/record/format_fmp4_segment.go | 8 +- internal/record/format_mpegts_segment.go | 2 +- internal/record/path.go | 54 ++- internal/record/path_test.go | 12 +- internal/servers/hls/http_server.go | 2 +- internal/servers/webrtc/http_server.go | 2 +- mediamtx.yml | 31 +- 28 files changed, 1381 insertions(+), 340 deletions(-) create mode 100644 internal/playback/fmp4.go create mode 100644 internal/playback/fmp4_test.go create mode 100644 internal/playback/server.go create mode 100644 internal/playback/server_test.go diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 965476e752b..417d353b8f7 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -43,10 +43,6 @@ components: type: integer externalAuthenticationURL: type: string - api: - type: boolean - apiAddress: - type: string metrics: type: boolean metricsAddress: @@ -62,6 +58,18 @@ components: runOnDisconnect: type: string + # API + api: + type: boolean + apiAddress: + type: string + + # Playback server + playback: + type: boolean + playbackAddress: + type: string + # RTSP server rtsp: type: boolean diff --git a/go.mod b/go.mod index 39e52c666e2..b1b1d893bd2 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,7 @@ require ( code.cloudfoundry.org/bytefmt v0.0.0 github.com/abema/go-mp4 v1.2.0 github.com/alecthomas/kong v0.8.1 - github.com/aler9/writerseeker v1.1.0 - github.com/bluenviron/gohlslib v1.2.0 + github.com/bluenviron/gohlslib v1.2.1-0.20240114214154-83fc88edbaad github.com/bluenviron/gortsplib/v4 v4.7.1 github.com/bluenviron/mediacommon v1.8.0 github.com/datarhei/gosrt v0.5.7 diff --git a/go.sum b/go.sum index 3155c4d2ed8..a88a7750213 100644 --- a/go.sum +++ b/go.sum @@ -12,16 +12,14 @@ github.com/aler9/sdp/v3 v3.0.0-20231022165400-33437e07f326 h1:HA7u47vkcxFiHtiOjm github.com/aler9/sdp/v3 v3.0.0-20231022165400-33437e07f326/go.mod h1:I40uD/ZSmK2peI6AdJga5fd55d4bFK0oWOgLS9Q8sVc= github.com/aler9/webrtc/v3 v3.0.0-20231112223655-e402ed2689c6 h1:wMd3D1mLghoYYh31STig8Kwm2qi8QyQKUy09qUUZrVw= github.com/aler9/webrtc/v3 v3.0.0-20231112223655-e402ed2689c6/go.mod h1:1CaT2fcZzZ6VZA+O1i9yK2DU4EOcXVvSbWG9pr5jefs= -github.com/aler9/writerseeker v1.1.0 h1:t+Sm3tjp8scNlqyoa8obpeqwciMNOvdvsxjxEb3Sx3g= -github.com/aler9/writerseeker v1.1.0/go.mod h1:QNCcjSKnLsYoTfMmXkEEfgbz6nNXWxKSaBY+hGJGWDA= github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwfKZ1c= github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= -github.com/bluenviron/gohlslib v1.2.0 h1:Hrx2/n/AcmKKIV+MjZLKs5kmW+O7xCdUSPJQoS39JKw= -github.com/bluenviron/gohlslib v1.2.0/go.mod h1:kG/Sjebsxnf5asMGaGcQ0aSvtFGNChJPgctds2wDHOI= +github.com/bluenviron/gohlslib v1.2.1-0.20240114214154-83fc88edbaad h1:R9Lqf0A2/3TTB4casoU1LC+HRLmsVxNYUTmnbbD8WAE= +github.com/bluenviron/gohlslib v1.2.1-0.20240114214154-83fc88edbaad/go.mod h1:k94WhiVkgJl45Q1WkLw8/GG2AJ1+VU9c/3i4f41xMq8= github.com/bluenviron/gortsplib/v4 v4.7.1 h1:ZiPHjnIsdPDfPGZgfBr2n2xCFZlvmc/5zEqdoJUa1vU= github.com/bluenviron/gortsplib/v4 v4.7.1/go.mod h1:3+IYh85PgIPLHr4D5z7GnRvpu/ogSHMDhsYW/CjrD8E= github.com/bluenviron/mediacommon v1.8.0 h1:z9ZxCkuivs1IN1NrD6lB7d9twqcRNaCTvlG39NL9Sa4= diff --git a/internal/api/api.go b/internal/api/api.go index 44bb6320d52..ee47c122f48 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -274,7 +274,7 @@ func (a *API) writeError(ctx *gin.Context, status int, err error) { // show error in logs a.Log(logger.Error, err.Error()) - // send error in response + // add error to response ctx.JSON(status, &defs.APIError{ Error: err.Error(), }) @@ -303,7 +303,7 @@ func (a *API) onConfigGlobalPatch(ctx *gin.Context) { newConf.PatchGlobal(&c) - err = newConf.Check() + err = newConf.Validate() if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return @@ -341,7 +341,7 @@ func (a *API) onConfigPathDefaultsPatch(ctx *gin.Context) { newConf.PatchPathDefaults(&p) - err = newConf.Check() + err = newConf.Validate() if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return @@ -422,7 +422,7 @@ func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl return } - err = newConf.Check() + err = newConf.Validate() if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return @@ -463,7 +463,7 @@ func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl return } - err = newConf.Check() + err = newConf.Validate() if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return @@ -504,7 +504,7 @@ func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl return } - err = newConf.Check() + err = newConf.Validate() if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return @@ -538,7 +538,7 @@ func (a *API) onConfigPathsDelete(ctx *gin.Context) { return } - err = newConf.Check() + err = newConf.Validate() if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return diff --git a/internal/conf/conf.go b/internal/conf/conf.go index f8226bdd2d8..e952023098b 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -94,8 +94,6 @@ type Conf struct { WriteQueueSize int `json:"writeQueueSize"` UDPMaxPayloadSize int `json:"udpMaxPayloadSize"` ExternalAuthenticationURL string `json:"externalAuthenticationURL"` - API bool `json:"api"` - APIAddress string `json:"apiAddress"` Metrics bool `json:"metrics"` MetricsAddress string `json:"metricsAddress"` PPROF bool `json:"pprof"` @@ -104,6 +102,14 @@ type Conf struct { RunOnConnectRestart bool `json:"runOnConnectRestart"` RunOnDisconnect string `json:"runOnDisconnect"` + // API + API bool `json:"api"` + APIAddress string `json:"apiAddress"` + + // Playback + Playback bool `json:"playback"` + PlaybackAddress string `json:"playbackAddress"` + // RTSP server RTSP bool `json:"rtsp"` RTSPDisable *bool `json:"rtspDisable,omitempty"` // deprecated @@ -195,11 +201,16 @@ func (conf *Conf) setDefaults() { conf.WriteTimeout = 10 * StringDuration(time.Second) conf.WriteQueueSize = 512 conf.UDPMaxPayloadSize = 1472 - conf.APIAddress = "127.0.0.1:9997" conf.MetricsAddress = "127.0.0.1:9998" conf.PPROFAddress = "127.0.0.1:9999" - // RTSP + // API + conf.APIAddress = "127.0.0.1:9997" + + // Playback server + conf.PlaybackAddress = ":9996" + + // RTSP server conf.RTSP = true conf.Protocols = Protocols{ Protocol(gortsplib.TransportUDP): {}, @@ -217,7 +228,7 @@ func (conf *Conf) setDefaults() { conf.ServerCert = "server.crt" conf.AuthMethods = AuthMethods{headers.AuthBasic} - // RTMP + // RTMP server conf.RTMP = true conf.RTMPAddress = ":1935" conf.RTMPSAddress = ":1936" @@ -236,7 +247,7 @@ func (conf *Conf) setDefaults() { conf.HLSSegmentMaxSize = 50 * 1024 * 1024 conf.HLSAllowOrigin = "*" - // WebRTC + // WebRTC server conf.WebRTC = true conf.WebRTCAddress = ":8889" conf.WebRTCServerKey = "server.key" @@ -248,7 +259,7 @@ func (conf *Conf) setDefaults() { conf.WebRTCAdditionalHosts = []string{} conf.WebRTCICEServers2 = []WebRTCICEServer{} - // SRT + // SRT server conf.SRT = true conf.SRTAddress = ":8890" @@ -274,7 +285,7 @@ func Load(fpath string, defaultConfPaths []string) (*Conf, string, error) { return nil, "", err } - err = conf.Check() + err = conf.Validate() if err != nil { return nil, "", err } @@ -337,8 +348,8 @@ func (conf Conf) Clone() *Conf { return &dest } -// Check checks the configuration for errors. -func (conf *Conf) Check() error { +// Validate checks the configuration for errors. +func (conf *Conf) Validate() error { // General if conf.ReadBufferCount != nil { @@ -479,7 +490,7 @@ func (conf *Conf) Check() error { pconf := newPath(&conf.PathDefaults, optional) conf.Paths[name] = pconf - err := pconf.check(conf, name) + err := pconf.validate(conf, name) if err != nil { return err } diff --git a/internal/conf/credential.go b/internal/conf/credential.go index c1683e2908d..7b6d47399fe 100644 --- a/internal/conf/credential.go +++ b/internal/conf/credential.go @@ -39,7 +39,7 @@ func (d *Credential) UnmarshalJSON(b []byte) error { value: in, } - return d.validateConfig() + return d.validate() } // UnmarshalEnv implements env.Unmarshaler. @@ -97,26 +97,24 @@ func (d *Credential) Check(guess string) bool { return d.value == guess } -func (d *Credential) validateConfig() error { - if d.IsEmpty() { - return nil - } - - switch { - case d.IsSha256(): - if !reBase64.MatchString(d.value) { - return fmt.Errorf("credential contains unsupported characters, sha256 hash must be base64 encoded") - } - case d.IsArgon2(): - // TODO: remove matthewhartstonge/argon2 when this PR gets merged into mainline Go: - // https://go-review.googlesource.com/c/crypto/+/502515 - _, err := argon2.Decode([]byte(d.value[len("argon2:"):])) - if err != nil { - return fmt.Errorf("invalid argon2 hash: %w", err) - } - default: - if !rePlainCredential.MatchString(d.value) { - return fmt.Errorf("credential contains unsupported characters. Supported are: %s", plainCredentialSupportedChars) +func (d *Credential) validate() error { + if !d.IsEmpty() { + switch { + case d.IsSha256(): + if !reBase64.MatchString(d.value) { + return fmt.Errorf("credential contains unsupported characters, sha256 hash must be base64 encoded") + } + case d.IsArgon2(): + // TODO: remove matthewhartstonge/argon2 when this PR gets merged into mainline Go: + // https://go-review.googlesource.com/c/crypto/+/502515 + _, err := argon2.Decode([]byte(d.value[len("argon2:"):])) + if err != nil { + return fmt.Errorf("invalid argon2 hash: %w", err) + } + default: + if !rePlainCredential.MatchString(d.value) { + return fmt.Errorf("credential contains unsupported characters. Supported are: %s", plainCredentialSupportedChars) + } } } return nil diff --git a/internal/conf/credential_test.go b/internal/conf/credential_test.go index a8a43c53906..ef325672b91 100644 --- a/internal/conf/credential_test.go +++ b/internal/conf/credential_test.go @@ -102,7 +102,7 @@ func TestCredential(t *testing.T) { assert.False(t, cred.Check("notestuser")) }) - t.Run("validateConfig", func(t *testing.T) { + t.Run("validate", func(t *testing.T) { tests := []struct { name string cred *Credential @@ -155,7 +155,7 @@ func TestCredential(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := tt.cred.validateConfig() + err := tt.cred.validate() if tt.wantErr { assert.Error(t, err) } else { diff --git a/internal/conf/path.go b/internal/conf/path.go index 3633576f6aa..d6da3ee49aa 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -16,8 +16,7 @@ import ( var rePathName = regexp.MustCompile(`^[0-9a-zA-Z_\-/\.~]+$`) -// IsValidPathName checks if a path name is valid. -func IsValidPathName(name string) error { +func isValidPathName(name string) error { if name == "" { return fmt.Errorf("cannot be empty") } @@ -47,6 +46,41 @@ func srtCheckPassphrase(passphrase string) error { } } +// FindPathConf returns the configuration corresponding to the given path name. +func FindPathConf(pathConfs map[string]*Path, name string) (string, *Path, []string, error) { + err := isValidPathName(name) + if err != nil { + return "", nil, nil, fmt.Errorf("invalid path name: %w (%s)", err, name) + } + + // normal path + if pathConf, ok := pathConfs[name]; ok { + return name, pathConf, nil, nil + } + + // regular expression-based path + for pathConfName, pathConf := range pathConfs { + if pathConf.Regexp != nil && pathConfName != "all" && pathConfName != "all_others" { + m := pathConf.Regexp.FindStringSubmatch(name) + if m != nil { + return pathConfName, pathConf, m, nil + } + } + } + + // all_others + for pathConfName, pathConf := range pathConfs { + if pathConfName == "all" || pathConfName == "all_others" { + m := pathConf.Regexp.FindStringSubmatch(name) + if m != nil { + return pathConfName, pathConf, m, nil + } + } + } + + return "", nil, nil, fmt.Errorf("path '%s' is not configured", name) +} + // Path is a path configuration. type Path struct { Regexp *regexp.Regexp `json:"-"` // filled by Check() @@ -212,7 +246,7 @@ func (pconf Path) Clone() *Path { return &dest } -func (pconf *Path) check(conf *Conf, name string) error { +func (pconf *Path) validate(conf *Conf, name string) error { pconf.Name = name switch { @@ -220,7 +254,7 @@ func (pconf *Path) check(conf *Conf, name string) error { pconf.Regexp = regexp.MustCompile("^.*$") case name == "" || name[0] != '~': // normal path - err := IsValidPathName(name) + err := isValidPathName(name) if err != nil { return fmt.Errorf("invalid path name '%s': %w", name, err) } @@ -325,7 +359,7 @@ func (pconf *Path) check(conf *Conf, name string) error { } if pconf.Fallback != "" { if strings.HasPrefix(pconf.Fallback, "/") { - err := IsValidPathName(pconf.Fallback[1:]) + err := isValidPathName(pconf.Fallback[1:]) if err != nil { return fmt.Errorf("'%s': %w", pconf.Fallback, err) } diff --git a/internal/core/core.go b/internal/core/core.go index 688bdadf437..4cb1c333d74 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -22,6 +22,7 @@ import ( "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/metrics" + "github.com/bluenviron/mediamtx/internal/playback" "github.com/bluenviron/mediamtx/internal/pprof" "github.com/bluenviron/mediamtx/internal/record" "github.com/bluenviron/mediamtx/internal/rlimit" @@ -48,7 +49,7 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry { for _, pa := range paths { if pa.Record && pa.RecordDeleteAfter != 0 { entry := record.CleanerEntry{ - PathFormat: pa.RecordPath, + Path: pa.RecordPath, Format: pa.RecordFormat, DeleteAfter: time.Duration(pa.RecordDeleteAfter), } @@ -65,8 +66,8 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry { } sort.Slice(out2, func(i, j int) bool { - if out2[i].PathFormat != out2[j].PathFormat { - return out2[i].PathFormat < out2[j].PathFormat + if out2[i].Path != out2[j].Path { + return out2[i].Path < out2[j].Path } return out2[i].DeleteAfter < out2[j].DeleteAfter }) @@ -90,6 +91,7 @@ type Core struct { metrics *metrics.Metrics pprof *pprof.PPROF recordCleaner *record.Cleaner + playbackServer *playback.Server pathManager *pathManager rtspServer *rtsp.Server rtspsServer *rtsp.Server @@ -312,20 +314,35 @@ func (p *Core) createResources(initial bool) error { p.recordCleaner.Initialize() } + if p.conf.Playback && + p.playbackServer == nil { + p.playbackServer = &playback.Server{ + Address: p.conf.PlaybackAddress, + ReadTimeout: p.conf.ReadTimeout, + PathConfs: p.conf.Paths, + Parent: p, + } + err := p.playbackServer.Initialize() + if err != nil { + return err + } + } + if p.pathManager == nil { - p.pathManager = newPathManager( - p.conf.LogLevel, - p.conf.ExternalAuthenticationURL, - p.conf.RTSPAddress, - p.conf.AuthMethods, - p.conf.ReadTimeout, - p.conf.WriteTimeout, - p.conf.WriteQueueSize, - p.conf.UDPMaxPayloadSize, - p.conf.Paths, - p.externalCmdPool, - p, - ) + p.pathManager = &pathManager{ + logLevel: p.conf.LogLevel, + externalAuthenticationURL: p.conf.ExternalAuthenticationURL, + rtspAddress: p.conf.RTSPAddress, + authMethods: p.conf.AuthMethods, + readTimeout: p.conf.ReadTimeout, + writeTimeout: p.conf.WriteTimeout, + writeQueueSize: p.conf.WriteQueueSize, + udpMaxPayloadSize: p.conf.UDPMaxPayloadSize, + pathConfs: p.conf.Paths, + externalCmdPool: p.externalCmdPool, + parent: p, + } + p.pathManager.initialize() if p.metrics != nil { p.metrics.SetPathManager(p.pathManager) @@ -619,6 +636,15 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { !reflect.DeepEqual(gatherCleanerEntries(newConf.Paths), gatherCleanerEntries(p.conf.Paths)) || closeLogger + closePlaybackServer := newConf == nil || + newConf.Playback != p.conf.Playback || + newConf.PlaybackAddress != p.conf.PlaybackAddress || + newConf.ReadTimeout != p.conf.ReadTimeout || + closeLogger + if !closePlaybackServer && p.playbackServer != nil && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { + p.playbackServer.ReloadPathConfs(newConf.Paths) + } + closePathManager := newConf == nil || newConf.LogLevel != p.conf.LogLevel || newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || @@ -631,7 +657,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { closeMetrics || closeLogger if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { - p.pathManager.ReloadConf(newConf.Paths) + p.pathManager.ReloadPathConfs(newConf.Paths) } closeRTSPServer := newConf == nil || @@ -865,6 +891,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { p.pathManager = nil } + if closePlaybackServer && p.playbackServer != nil { + p.playbackServer.Close() + p.playbackServer = nil + } + if closeRecorderCleaner && p.recordCleaner != nil { p.recordCleaner.Close() p.recordCleaner = nil diff --git a/internal/core/path.go b/internal/core/path.go index e090e9fa69c..a88cbc8e22f 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -64,6 +64,7 @@ type pathAPIPathsGetReq struct { } type path struct { + parentCtx context.Context logLevel conf.LogLevel rtspAddress string readTimeout conf.StringDuration @@ -115,65 +116,33 @@ type path struct { done chan struct{} } -func newPath( - parentCtx context.Context, - logLevel conf.LogLevel, - rtspAddress string, - readTimeout conf.StringDuration, - writeTimeout conf.StringDuration, - writeQueueSize int, - udpMaxPayloadSize int, - confName string, - cnf *conf.Path, - name string, - matches []string, - wg *sync.WaitGroup, - externalCmdPool *externalcmd.Pool, - parent pathParent, -) *path { - ctx, ctxCancel := context.WithCancel(parentCtx) - - pa := &path{ - logLevel: logLevel, - rtspAddress: rtspAddress, - readTimeout: readTimeout, - writeTimeout: writeTimeout, - writeQueueSize: writeQueueSize, - udpMaxPayloadSize: udpMaxPayloadSize, - confName: confName, - conf: cnf, - name: name, - matches: matches, - wg: wg, - externalCmdPool: externalCmdPool, - parent: parent, - ctx: ctx, - ctxCancel: ctxCancel, - readers: make(map[defs.Reader]struct{}), - onDemandStaticSourceReadyTimer: newEmptyTimer(), - onDemandStaticSourceCloseTimer: newEmptyTimer(), - onDemandPublisherReadyTimer: newEmptyTimer(), - onDemandPublisherCloseTimer: newEmptyTimer(), - chReloadConf: make(chan *conf.Path), - chStaticSourceSetReady: make(chan defs.PathSourceStaticSetReadyReq), - chStaticSourceSetNotReady: make(chan defs.PathSourceStaticSetNotReadyReq), - chDescribe: make(chan defs.PathDescribeReq), - chAddPublisher: make(chan defs.PathAddPublisherReq), - chRemovePublisher: make(chan defs.PathRemovePublisherReq), - chStartPublisher: make(chan defs.PathStartPublisherReq), - chStopPublisher: make(chan defs.PathStopPublisherReq), - chAddReader: make(chan defs.PathAddReaderReq), - chRemoveReader: make(chan defs.PathRemoveReaderReq), - chAPIPathsGet: make(chan pathAPIPathsGetReq), - done: make(chan struct{}), - } +func (pa *path) initialize() { + ctx, ctxCancel := context.WithCancel(pa.parentCtx) + + pa.ctx = ctx + pa.ctxCancel = ctxCancel + pa.readers = make(map[defs.Reader]struct{}) + pa.onDemandStaticSourceReadyTimer = newEmptyTimer() + pa.onDemandStaticSourceCloseTimer = newEmptyTimer() + pa.onDemandPublisherReadyTimer = newEmptyTimer() + pa.onDemandPublisherCloseTimer = newEmptyTimer() + pa.chReloadConf = make(chan *conf.Path) + pa.chStaticSourceSetReady = make(chan defs.PathSourceStaticSetReadyReq) + pa.chStaticSourceSetNotReady = make(chan defs.PathSourceStaticSetNotReadyReq) + pa.chDescribe = make(chan defs.PathDescribeReq) + pa.chAddPublisher = make(chan defs.PathAddPublisherReq) + pa.chRemovePublisher = make(chan defs.PathRemovePublisherReq) + pa.chStartPublisher = make(chan defs.PathStartPublisherReq) + pa.chStopPublisher = make(chan defs.PathStopPublisherReq) + pa.chAddReader = make(chan defs.PathAddReaderReq) + pa.chRemoveReader = make(chan defs.PathRemoveReaderReq) + pa.chAPIPathsGet = make(chan pathAPIPathsGetReq) + pa.done = make(chan struct{}) pa.Log(logger.Debug, "created") pa.wg.Add(1) go pa.run() - - return pa } func (pa *path) close() { diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 944e1090c72..c4a0ff48a1f 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -35,40 +35,6 @@ func pathConfCanBeUpdated(oldPathConf *conf.Path, newPathConf *conf.Path) bool { return newPathConf.Equal(clone) } -func getConfForPath(pathConfs map[string]*conf.Path, name string) (string, *conf.Path, []string, error) { - err := conf.IsValidPathName(name) - if err != nil { - return "", nil, nil, fmt.Errorf("invalid path name: %w (%s)", err, name) - } - - // normal path - if pathConf, ok := pathConfs[name]; ok { - return name, pathConf, nil, nil - } - - // regular expression-based path - for pathConfName, pathConf := range pathConfs { - if pathConf.Regexp != nil && pathConfName != "all" && pathConfName != "all_others" { - m := pathConf.Regexp.FindStringSubmatch(name) - if m != nil { - return pathConfName, pathConf, m, nil - } - } - } - - // process path configuration "all_others" after everything else - for pathConfName, pathConf := range pathConfs { - if pathConfName == "all" || pathConfName == "all_others" { - m := pathConf.Regexp.FindStringSubmatch(name) - if m != nil { - return pathConfName, pathConf, m, nil - } - } - } - - return "", nil, nil, fmt.Errorf("path '%s' is not configured", name) -} - type pathManagerHLSServer interface { PathReady(defs.Path) PathNotReady(defs.Path) @@ -99,62 +65,37 @@ type pathManager struct { pathsByConf map[string]map[*path]struct{} // in - chReloadConf chan map[string]*conf.Path - chSetHLSServer chan pathManagerHLSServer - chClosePath chan *path - chPathReady chan *path - chPathNotReady chan *path - chGetConfForPath chan defs.PathGetConfForPathReq - chDescribe chan defs.PathDescribeReq - chAddReader chan defs.PathAddReaderReq - chAddPublisher chan defs.PathAddPublisherReq - chAPIPathsList chan pathAPIPathsListReq - chAPIPathsGet chan pathAPIPathsGetReq -} - -func newPathManager( - logLevel conf.LogLevel, - externalAuthenticationURL string, - rtspAddress string, - authMethods conf.AuthMethods, - readTimeout conf.StringDuration, - writeTimeout conf.StringDuration, - writeQueueSize int, - udpMaxPayloadSize int, - pathConfs map[string]*conf.Path, - externalCmdPool *externalcmd.Pool, - parent pathManagerParent, -) *pathManager { + chReloadConf chan map[string]*conf.Path + chSetHLSServer chan pathManagerHLSServer + chClosePath chan *path + chPathReady chan *path + chPathNotReady chan *path + chFindPathConf chan defs.PathFindPathConfReq + chDescribe chan defs.PathDescribeReq + chAddReader chan defs.PathAddReaderReq + chAddPublisher chan defs.PathAddPublisherReq + chAPIPathsList chan pathAPIPathsListReq + chAPIPathsGet chan pathAPIPathsGetReq +} + +func (pm *pathManager) initialize() { ctx, ctxCancel := context.WithCancel(context.Background()) - pm := &pathManager{ - logLevel: logLevel, - externalAuthenticationURL: externalAuthenticationURL, - rtspAddress: rtspAddress, - authMethods: authMethods, - readTimeout: readTimeout, - writeTimeout: writeTimeout, - writeQueueSize: writeQueueSize, - udpMaxPayloadSize: udpMaxPayloadSize, - pathConfs: pathConfs, - externalCmdPool: externalCmdPool, - parent: parent, - ctx: ctx, - ctxCancel: ctxCancel, - paths: make(map[string]*path), - pathsByConf: make(map[string]map[*path]struct{}), - chReloadConf: make(chan map[string]*conf.Path), - chSetHLSServer: make(chan pathManagerHLSServer), - chClosePath: make(chan *path), - chPathReady: make(chan *path), - chPathNotReady: make(chan *path), - chGetConfForPath: make(chan defs.PathGetConfForPathReq), - chDescribe: make(chan defs.PathDescribeReq), - chAddReader: make(chan defs.PathAddReaderReq), - chAddPublisher: make(chan defs.PathAddPublisherReq), - chAPIPathsList: make(chan pathAPIPathsListReq), - chAPIPathsGet: make(chan pathAPIPathsGetReq), - } + pm.ctx = ctx + pm.ctxCancel = ctxCancel + pm.paths = make(map[string]*path) + pm.pathsByConf = make(map[string]map[*path]struct{}) + pm.chReloadConf = make(chan map[string]*conf.Path) + pm.chSetHLSServer = make(chan pathManagerHLSServer) + pm.chClosePath = make(chan *path) + pm.chPathReady = make(chan *path) + pm.chPathNotReady = make(chan *path) + pm.chFindPathConf = make(chan defs.PathFindPathConfReq) + pm.chDescribe = make(chan defs.PathDescribeReq) + pm.chAddReader = make(chan defs.PathAddReaderReq) + pm.chAddPublisher = make(chan defs.PathAddPublisherReq) + pm.chAPIPathsList = make(chan pathAPIPathsListReq) + pm.chAPIPathsGet = make(chan pathAPIPathsGetReq) for pathConfName, pathConf := range pm.pathConfs { if pathConf.Regexp == nil { @@ -166,8 +107,6 @@ func newPathManager( pm.wg.Add(1) go pm.run() - - return pm } func (pm *pathManager) close() { @@ -202,8 +141,8 @@ outer: case pa := <-pm.chPathNotReady: pm.doPathNotReady(pa) - case req := <-pm.chGetConfForPath: - pm.doGetConfForPath(req) + case req := <-pm.chFindPathConf: + pm.doFindPathConf(req) case req := <-pm.chDescribe: pm.doDescribe(req) @@ -288,25 +227,25 @@ func (pm *pathManager) doPathNotReady(pa *path) { } } -func (pm *pathManager) doGetConfForPath(req defs.PathGetConfForPathReq) { - _, pathConf, _, err := getConfForPath(pm.pathConfs, req.AccessRequest.Name) +func (pm *pathManager) doFindPathConf(req defs.PathFindPathConfReq) { + _, pathConf, _, err := conf.FindPathConf(pm.pathConfs, req.AccessRequest.Name) if err != nil { - req.Res <- defs.PathGetConfForPathRes{Err: err} + req.Res <- defs.PathFindPathConfRes{Err: err} return } err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, pathConf, req.AccessRequest) if err != nil { - req.Res <- defs.PathGetConfForPathRes{Err: err} + req.Res <- defs.PathFindPathConfRes{Err: err} return } - req.Res <- defs.PathGetConfForPathRes{Conf: pathConf} + req.Res <- defs.PathFindPathConfRes{Conf: pathConf} } func (pm *pathManager) doDescribe(req defs.PathDescribeReq) { - pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.AccessRequest.Name) + pathConfName, pathConf, pathMatches, err := conf.FindPathConf(pm.pathConfs, req.AccessRequest.Name) if err != nil { req.Res <- defs.PathDescribeRes{Err: err} return @@ -328,7 +267,7 @@ func (pm *pathManager) doDescribe(req defs.PathDescribeReq) { } func (pm *pathManager) doAddReader(req defs.PathAddReaderReq) { - pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.AccessRequest.Name) + pathConfName, pathConf, pathMatches, err := conf.FindPathConf(pm.pathConfs, req.AccessRequest.Name) if err != nil { req.Res <- defs.PathAddReaderRes{Err: err} return @@ -352,7 +291,7 @@ func (pm *pathManager) doAddReader(req defs.PathAddReaderReq) { } func (pm *pathManager) doAddPublisher(req defs.PathAddPublisherReq) { - pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.AccessRequest.Name) + pathConfName, pathConf, pathMatches, err := conf.FindPathConf(pm.pathConfs, req.AccessRequest.Name) if err != nil { req.Res <- defs.PathAddPublisherRes{Err: err} return @@ -401,21 +340,23 @@ func (pm *pathManager) createPath( name string, matches []string, ) { - pa := newPath( - pm.ctx, - pm.logLevel, - pm.rtspAddress, - pm.readTimeout, - pm.writeTimeout, - pm.writeQueueSize, - pm.udpMaxPayloadSize, - pathConfName, - pathConf, - name, - matches, - &pm.wg, - pm.externalCmdPool, - pm) + pa := &path{ + parentCtx: pm.ctx, + logLevel: pm.logLevel, + rtspAddress: pm.rtspAddress, + readTimeout: pm.readTimeout, + writeTimeout: pm.writeTimeout, + writeQueueSize: pm.writeQueueSize, + udpMaxPayloadSize: pm.udpMaxPayloadSize, + confName: pathConfName, + conf: pathConf, + name: name, + matches: matches, + wg: &pm.wg, + externalCmdPool: pm.externalCmdPool, + parent: pm, + } + pa.initialize() pm.paths[name] = pa @@ -433,8 +374,8 @@ func (pm *pathManager) removePath(pa *path) { delete(pm.paths, pa.name) } -// ReloadConf is called by core. -func (pm *pathManager) ReloadConf(pathConfs map[string]*conf.Path) { +// ReloadPathConfs is called by core. +func (pm *pathManager) ReloadPathConfs(pathConfs map[string]*conf.Path) { select { case pm.chReloadConf <- pathConfs: case <-pm.ctx.Done(): @@ -469,14 +410,14 @@ func (pm *pathManager) closePath(pa *path) { } // GetConfForPath is called by a reader or publisher. -func (pm *pathManager) GetConfForPath(req defs.PathGetConfForPathReq) defs.PathGetConfForPathRes { - req.Res = make(chan defs.PathGetConfForPathRes) +func (pm *pathManager) FindPathConf(req defs.PathFindPathConfReq) defs.PathFindPathConfRes { + req.Res = make(chan defs.PathFindPathConfRes) select { - case pm.chGetConfForPath <- req: + case pm.chFindPathConf <- req: return <-req.Res case <-pm.ctx.Done(): - return defs.PathGetConfForPathRes{Err: fmt.Errorf("terminated")} + return defs.PathFindPathConfRes{Err: fmt.Errorf("terminated")} } } diff --git a/internal/defs/path.go b/internal/defs/path.go index b1b257a18f1..fb5be3686fa 100644 --- a/internal/defs/path.go +++ b/internal/defs/path.go @@ -52,16 +52,16 @@ type PathAccessRequest struct { RTSPNonce string } -// PathGetConfForPathRes contains the response of GetConfForPath(). -type PathGetConfForPathRes struct { +// PathFindPathConfRes contains the response of FindPathConf(). +type PathFindPathConfRes struct { Conf *conf.Path Err error } -// PathGetConfForPathReq contains arguments of GetConfForPath(). -type PathGetConfForPathReq struct { +// PathFindPathConfReq contains arguments of FindPathConf(). +type PathFindPathConfReq struct { AccessRequest PathAccessRequest - Res chan PathGetConfForPathRes + Res chan PathFindPathConfRes } // PathDescribeRes contains the response of Describe(). diff --git a/internal/defs/path_manager.go b/internal/defs/path_manager.go index b6e5c715bd1..98cd1ca3a29 100644 --- a/internal/defs/path_manager.go +++ b/internal/defs/path_manager.go @@ -2,7 +2,7 @@ package defs // PathManager is a path manager. type PathManager interface { - GetConfForPath(req PathGetConfForPathReq) PathGetConfForPathRes + FindPathConf(req PathFindPathConfReq) PathFindPathConfRes Describe(req PathDescribeReq) PathDescribeRes AddPublisher(req PathAddPublisherReq) PathAddPublisherRes AddReader(req PathAddReaderReq) PathAddReaderRes diff --git a/internal/playback/fmp4.go b/internal/playback/fmp4.go new file mode 100644 index 00000000000..179a90ef7eb --- /dev/null +++ b/internal/playback/fmp4.go @@ -0,0 +1,432 @@ +package playback + +import ( + "bytes" + "errors" + "fmt" + "io" + "os" + "time" + + "github.com/abema/go-mp4" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer" +) + +const ( + sampleFlagIsNonSyncSample = 1 << 16 +) + +func durationGoToMp4(v time.Duration, timeScale uint32) uint64 { + timeScale64 := uint64(timeScale) + secs := v / time.Second + dec := v % time.Second + return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second) +} + +func durationMp4ToGo(v uint64, timeScale uint32) time.Duration { + timeScale64 := uint64(timeScale) + secs := v / timeScale64 + dec := v % timeScale64 + return time.Duration(secs)*time.Second + time.Duration(dec)*time.Second/time.Duration(timeScale64) +} + +var errTerminated = errors.New("terminated") + +func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) { + buf := make([]byte, 8) + _, err := io.ReadFull(r, buf) + if err != nil { + return nil, err + } + + if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) { + return nil, fmt.Errorf("ftyp box not found") + } + + ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + _, err = r.Seek(int64(ftypSize), io.SeekStart) + if err != nil { + return nil, err + } + + _, err = io.ReadFull(r, buf) + if err != nil { + return nil, err + } + + if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) { + return nil, fmt.Errorf("moov box not found") + } + + moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + _, err = r.Seek(0, io.SeekStart) + if err != nil { + return nil, err + } + + buf = make([]byte, ftypSize+moovSize) + + _, err = io.ReadFull(r, buf) + if err != nil { + return nil, err + } + + return buf, nil +} + +func seekAndMuxParts( + r io.ReadSeeker, + init []byte, + minTime time.Duration, + maxTime time.Duration, + w io.Writer, +) (time.Duration, error) { + minTimeMP4 := durationGoToMp4(minTime, 90000) + maxTimeMP4 := durationGoToMp4(maxTime, 90000) + moofOffset := uint64(0) + var tfhd *mp4.Tfhd + var tfdt *mp4.Tfdt + var outPart *fmp4.Part + var outTrack *fmp4.PartTrack + var outBuf seekablebuffer.Buffer + elapsed := uint64(0) + initWritten := false + firstSampleWritten := make(map[uint32]struct{}) + gop := make(map[uint32][]*fmp4.PartSample) + + _, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) { + switch h.BoxInfo.Type.String() { + case "moof": + moofOffset = h.BoxInfo.Offset + outPart = &fmp4.Part{} + return h.Expand() + + case "traf": + return h.Expand() + + case "tfhd": + box, _, err := h.ReadPayload() + if err != nil { + return nil, err + } + tfhd = box.(*mp4.Tfhd) + + case "tfdt": + box, _, err := h.ReadPayload() + if err != nil { + return nil, err + } + tfdt = box.(*mp4.Tfdt) + + if tfdt.BaseMediaDecodeTimeV1 >= maxTimeMP4 { + return nil, errTerminated + } + + outTrack = &fmp4.PartTrack{ID: int(tfhd.TrackID)} + + case "trun": + box, _, err := h.ReadPayload() + if err != nil { + return nil, err + } + trun := box.(*mp4.Trun) + + dataOffset := moofOffset + uint64(trun.DataOffset) + + _, err = r.Seek(int64(dataOffset), io.SeekStart) + if err != nil { + return nil, err + } + + elapsed = tfdt.BaseMediaDecodeTimeV1 + baseTimeSet := false + + for _, e := range trun.Entries { + payload := make([]byte, e.SampleSize) + _, err := io.ReadFull(r, payload) + if err != nil { + return nil, err + } + + if elapsed >= maxTimeMP4 { + break + } + + isRandom := (e.SampleFlags & sampleFlagIsNonSyncSample) == 0 + _, fsw := firstSampleWritten[tfhd.TrackID] + + sa := &fmp4.PartSample{ + Duration: e.SampleDuration, + PTSOffset: e.SampleCompositionTimeOffsetV1, + IsNonSyncSample: !isRandom, + Payload: payload, + } + + if !fsw { + if isRandom { + gop[tfhd.TrackID] = []*fmp4.PartSample{sa} + } else { + gop[tfhd.TrackID] = append(gop[tfhd.TrackID], sa) + } + } + + if elapsed >= minTimeMP4 { + if !baseTimeSet { + outTrack.BaseTime = elapsed - minTimeMP4 + + if !fsw { + if !isRandom { + /* + var nalus [][]byte + for _, sa2 := range gop[tfhd.TrackID] { + auNALUS, err := h264.AVCCUnmarshal(sa2.Payload) + if err != nil { + return nil, err + } + // auNALUS = append([][]byte{{byte(h264.NALUTypeAccessUnitDelimiter), 240}}, auNALUS...) + nalus = append(nalus, auNALUS...) + } + + newPayload, err := h264.AVCCMarshal(nalus) + if err != nil { + return nil, err + } + + sa.Payload = newPayload + // sa.PTSOffset = gop[tfhd.TrackID][0].PTSOffset + sa.IsNonSyncSample = false + + */ + + for _, sa2 := range gop[tfhd.TrackID][:len(gop[tfhd.TrackID])-1] { + sa2.Duration = 0 + sa2.PTSOffset = 0 // sa.PTSOffset // // -90000 //30 * 90000 + outTrack.Samples = append(outTrack.Samples, sa2) + } + } + + delete(gop, tfhd.TrackID) + firstSampleWritten[tfhd.TrackID] = struct{}{} + } + } + + outTrack.Samples = append(outTrack.Samples, sa) + } + + elapsed += uint64(e.SampleDuration) + } + + if outTrack.Samples != nil { + outPart.Tracks = append(outPart.Tracks, outTrack) + } + + outTrack = nil + + case "mdat": + if outPart.Tracks != nil { + if !initWritten { + initWritten = true + _, err := w.Write(init) + if err != nil { + return nil, err + } + } + + err := outPart.Marshal(&outBuf) + if err != nil { + return nil, err + } + + _, err = w.Write(outBuf.Bytes()) + if err != nil { + return nil, err + } + + outBuf.Reset() + } + + outPart = nil + } + return nil, nil + }) + if err != nil && !errors.Is(err, errTerminated) { + return 0, err + } + + if !initWritten { + return 0, errNoSegmentsFound + } + + elapsed -= minTimeMP4 + + return durationMp4ToGo(elapsed, 90000), nil +} + +func muxParts( + r io.ReadSeeker, + startTime time.Duration, + maxTime time.Duration, + w io.Writer, +) (time.Duration, error) { + maxTimeMP4 := durationGoToMp4(maxTime, 90000) + moofOffset := uint64(0) + var tfhd *mp4.Tfhd + var tfdt *mp4.Tfdt + var outPart *fmp4.Part + var outTrack *fmp4.PartTrack + var outBuf seekablebuffer.Buffer + elapsed := uint64(0) + + _, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) { + switch h.BoxInfo.Type.String() { + case "moof": + moofOffset = h.BoxInfo.Offset + outPart = &fmp4.Part{} + return h.Expand() + + case "traf": + return h.Expand() + + case "tfhd": + box, _, err := h.ReadPayload() + if err != nil { + return nil, err + } + tfhd = box.(*mp4.Tfhd) + + case "tfdt": + box, _, err := h.ReadPayload() + if err != nil { + return nil, err + } + tfdt = box.(*mp4.Tfdt) + + if tfdt.BaseMediaDecodeTimeV1 >= maxTimeMP4 { + return nil, errTerminated + } + + outTrack = &fmp4.PartTrack{ + ID: int(tfhd.TrackID), + BaseTime: tfdt.BaseMediaDecodeTimeV1 + durationGoToMp4(startTime, 90000), + } + + case "trun": + box, _, err := h.ReadPayload() + if err != nil { + return nil, err + } + trun := box.(*mp4.Trun) + + dataOffset := moofOffset + uint64(trun.DataOffset) + + _, err = r.Seek(int64(dataOffset), io.SeekStart) + if err != nil { + return nil, err + } + + elapsed = tfdt.BaseMediaDecodeTimeV1 + + for _, e := range trun.Entries { + payload := make([]byte, e.SampleSize) + _, err := io.ReadFull(r, payload) + if err != nil { + return nil, err + } + + if elapsed >= maxTimeMP4 { + break + } + + isRandom := (e.SampleFlags & sampleFlagIsNonSyncSample) == 0 + + sa := &fmp4.PartSample{ + Duration: e.SampleDuration, + PTSOffset: e.SampleCompositionTimeOffsetV1, + IsNonSyncSample: !isRandom, + Payload: payload, + } + + outTrack.Samples = append(outTrack.Samples, sa) + + elapsed += uint64(e.SampleDuration) + } + + if outTrack.Samples != nil { + outPart.Tracks = append(outPart.Tracks, outTrack) + } + + outTrack = nil + + case "mdat": + if outPart.Tracks != nil { + err := outPart.Marshal(&outBuf) + if err != nil { + return nil, err + } + + _, err = w.Write(outBuf.Bytes()) + if err != nil { + return nil, err + } + + outBuf.Reset() + } + + outPart = nil + } + return nil, nil + }) + if err != nil && !errors.Is(err, errTerminated) { + return 0, err + } + + return durationMp4ToGo(elapsed, 90000), nil +} + +func fmp4SeekAndMux( + fpath string, + minTime time.Duration, + maxTime time.Duration, + w io.Writer, +) (time.Duration, error) { + f, err := os.Open(fpath) + if err != nil { + return 0, err + } + defer f.Close() + + init, err := fmp4ReadInit(f) + if err != nil { + return 0, err + } + + elapsed, err := seekAndMuxParts(f, init, minTime, maxTime, w) + if err != nil { + return 0, err + } + + return elapsed, nil +} + +func fmp4Mux( + fpath string, + startTime time.Duration, + maxTime time.Duration, + w io.Writer, +) (time.Duration, error) { + f, err := os.Open(fpath) + if err != nil { + return 0, err + } + defer f.Close() + + elapsed, err := muxParts(f, startTime, maxTime, w) + if err != nil { + return 0, err + } + + return elapsed, nil +} diff --git a/internal/playback/fmp4_test.go b/internal/playback/fmp4_test.go new file mode 100644 index 00000000000..1df5ef195a9 --- /dev/null +++ b/internal/playback/fmp4_test.go @@ -0,0 +1,79 @@ +package playback + +import ( + "io" + "os" + "testing" + + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" +) + +func writeBenchInit(f io.WriteSeeker) { + init := fmp4.Init{ + Tracks: []*fmp4.InitTrack{ + { + ID: 1, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: []byte{ + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, + 0x20, + }, + PPS: []byte{0x08}, + }, + }, + { + ID: 2, + TimeScale: 90000, + Codec: &fmp4.CodecMPEG4Audio{ + Config: mpeg4audio.Config{ + Type: mpeg4audio.ObjectTypeAACLC, + SampleRate: 48000, + ChannelCount: 2, + }, + }, + }, + }, + } + + err := init.Marshal(f) + if err != nil { + panic(err) + } + + _, err = f.Write([]byte{ + 'm', 'o', 'o', 'f', 0x00, 0x00, 0x00, 0x10, + }) + if err != nil { + panic(err) + } +} + +func BenchmarkFMP4ReadInit(b *testing.B) { + f, err := os.CreateTemp(os.TempDir(), "mediamtx-playback-fmp4-") + if err != nil { + panic(err) + } + defer os.Remove(f.Name()) + + writeBenchInit(f) + f.Close() + + for n := 0; n < b.N; n++ { + func() { + f, err := os.Open(f.Name()) + if err != nil { + panic(err) + } + defer f.Close() + + _, err = fmp4ReadInit(f) + if err != nil { + panic(err) + } + }() + } +} diff --git a/internal/playback/server.go b/internal/playback/server.go new file mode 100644 index 00000000000..87513c54c2e --- /dev/null +++ b/internal/playback/server.go @@ -0,0 +1,299 @@ +// Package playback contains the playback server. +package playback + +import ( + "errors" + "fmt" + "io/fs" + "net" + "net/http" + "path/filepath" + "sort" + "strings" + "sync" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/protocols/httpserv" + "github.com/bluenviron/mediamtx/internal/record" + "github.com/bluenviron/mediamtx/internal/restrictnetwork" + "github.com/gin-gonic/gin" +) + +const ( + concatenationTolerance = 1 * time.Second +) + +var errNoSegmentsFound = errors.New("no recording segments found for the given timestamp") + +type writerWrapper struct { + ctx *gin.Context + written bool +} + +func (w *writerWrapper) Write(p []byte) (int, error) { + if !w.written { + w.written = true + w.ctx.Header("Accept-Ranges", "none") + w.ctx.Header("Content-Type", "video/mp4") + } + return w.ctx.Writer.Write(p) +} + +type segment struct { + fpath string + start time.Time +} + +func findSegments( + pathConf *conf.Path, + pathName string, + start time.Time, + duration time.Duration, +) ([]segment, error) { + if !pathConf.Record { + return nil, fmt.Errorf("record is disabled on path '%s'", pathName) + } + + recordPath := record.PathAddExtension( + strings.ReplaceAll(pathConf.RecordPath, "%path", pathName), + pathConf.RecordFormat, + ) + + // we have to convert to absolute paths + // otherwise, recordPath and fpath inside Walk() won't have common elements + recordPath, _ = filepath.Abs(recordPath) + + commonPath := record.CommonPath(recordPath) + end := start.Add(duration) + var segments []segment + + // gather all segments that starts before the end of the playback + err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() { + var pa record.Path + ok := pa.Decode(recordPath, fpath) + if ok && !end.Before(time.Time(pa)) { + segments = append(segments, segment{ + fpath: fpath, + start: time.Time(pa), + }) + } + } + + return nil + }) + if err != nil { + return nil, err + } + + if segments == nil { + return nil, errNoSegmentsFound + } + + sort.Slice(segments, func(i, j int) bool { + return segments[i].start.Before(segments[j].start) + }) + + // find the segment that may contain the start of the playback and remove all previous ones + found := false + for i := 0; i < len(segments)-1; i++ { + if !start.Before(segments[i].start) && start.Before(segments[i+1].start) { + segments = segments[i:] + found = true + break + } + } + + // otherwise, keep the last segment only and check whether it may contain the start of the playback + if !found { + segments = segments[len(segments)-1:] + if segments[len(segments)-1].start.After(start) { + return nil, errNoSegmentsFound + } + } + + return segments, nil +} + +// Server is the playback server. +type Server struct { + Address string + ReadTimeout conf.StringDuration + PathConfs map[string]*conf.Path + Parent logger.Writer + + httpServer *httpserv.WrappedServer + mutex sync.RWMutex +} + +// Initialize initializes API. +func (p *Server) Initialize() error { + router := gin.New() + router.SetTrustedProxies(nil) //nolint:errcheck + + group := router.Group("/") + + group.GET("/get", p.onGet) + + network, address := restrictnetwork.Restrict("tcp", p.Address) + + var err error + p.httpServer, err = httpserv.NewWrappedServer( + network, + address, + time.Duration(p.ReadTimeout), + "", + "", + router, + p, + ) + if err != nil { + return err + } + + p.Log(logger.Info, "listener opened on "+address) + + return nil +} + +// Close closes Server. +func (p *Server) Close() { + p.Log(logger.Info, "listener is closing") + p.httpServer.Close() +} + +// Log implements logger.Writer. +func (p *Server) Log(level logger.Level, format string, args ...interface{}) { + p.Parent.Log(level, "[playback] "+format, args...) +} + +// ReloadPathConfs is called by core.Core. +func (p *Server) ReloadPathConfs(pathConfs map[string]*conf.Path) { + p.mutex.Lock() + defer p.mutex.Unlock() + p.PathConfs = pathConfs +} + +func (p *Server) writeError(ctx *gin.Context, status int, err error) { + // show error in logs + p.Log(logger.Error, err.Error()) + + // add error to response + ctx.String(status, err.Error()) +} + +func (p *Server) safeFindPathConf(name string) (*conf.Path, error) { + p.mutex.RLock() + defer p.mutex.RUnlock() + + _, pathConf, _, err := conf.FindPathConf(p.PathConfs, name) + return pathConf, err +} + +func (p *Server) onGet(ctx *gin.Context) { + pathName := ctx.Query("path") + + start, err := time.Parse(time.RFC3339, ctx.Query("start")) + if err != nil { + p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid start: %w", err)) + return + } + + duration, err := time.ParseDuration(ctx.Query("duration")) + if err != nil { + p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid duration: %w", err)) + return + } + + format := ctx.Query("format") + if format != "fmp4" { + p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid format: %s", format)) + return + } + + pathConf, err := p.safeFindPathConf(pathName) + if err != nil { + p.writeError(ctx, http.StatusBadRequest, err) + return + } + + segments, err := findSegments(pathConf, pathName, start, duration) + if err != nil { + if errors.Is(err, errNoSegmentsFound) { + p.writeError(ctx, http.StatusNotFound, err) + } else { + p.writeError(ctx, http.StatusBadRequest, err) + } + return + } + + if pathConf.RecordFormat != conf.RecordFormatFMP4 { + p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("format of recording segments is not fmp4")) + return + } + + ww := &writerWrapper{ctx: ctx} + minTime := start.Sub(segments[0].start) + maxTime := minTime + duration + + elapsed, err := fmp4SeekAndMux( + segments[0].fpath, + minTime, + maxTime, + ww) + if err != nil { + // user aborted the download + var neterr *net.OpError + if errors.As(err, &neterr) { + return + } + + // nothing has been written yet; send back JSON + if !ww.written { + if errors.Is(err, errNoSegmentsFound) { + p.writeError(ctx, http.StatusNotFound, err) + } else { + p.writeError(ctx, http.StatusBadRequest, err) + } + return + } + + // something has been already written: abort and write to logs only + p.Log(logger.Error, err.Error()) + return + } + + start = start.Add(elapsed) + duration -= elapsed + overallElapsed := elapsed + + for _, seg := range segments[1:] { + // there's a gap between segments; stop serving the recording. + if seg.start.Before(start.Add(-concatenationTolerance)) || seg.start.After(start.Add(concatenationTolerance)) { + return + } + + elapsed, err := fmp4Mux(seg.fpath, overallElapsed, duration, ctx.Writer) + if err != nil { + // user aborted the download + var neterr *net.OpError + if errors.As(err, &neterr) { + return + } + + // something has been already written: abort and write to logs only + p.Log(logger.Error, err.Error()) + return + } + + start = seg.start.Add(elapsed) + duration -= elapsed + overallElapsed += elapsed + } +} diff --git a/internal/playback/server_test.go b/internal/playback/server_test.go new file mode 100644 index 00000000000..e54c0047936 --- /dev/null +++ b/internal/playback/server_test.go @@ -0,0 +1,228 @@ +package playback + +import ( + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "testing" + "time" + + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/stretchr/testify/require" +) + +type nilLogger struct{} + +func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) { +} + +func writeSegment1(t *testing.T, fpath string) { + init := fmp4.Init{ + Tracks: []*fmp4.InitTrack{{ + ID: 1, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: []byte{ + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, + 0x20, + }, + PPS: []byte{0x08}, + }, + }}, + } + + var buf1 seekablebuffer.Buffer + err := init.Marshal(&buf1) + require.NoError(t, err) + + var buf2 seekablebuffer.Buffer + parts := fmp4.Parts{ + { + SequenceNumber: 1, + Tracks: []*fmp4.PartTrack{{ + ID: 1, + BaseTime: 0, + Samples: []*fmp4.PartSample{}, + }}, + }, + { + SequenceNumber: 1, + Tracks: []*fmp4.PartTrack{{ + ID: 1, + BaseTime: 30 * 90000, + Samples: []*fmp4.PartSample{ + { + Duration: 30 * 90000, + IsNonSyncSample: false, + Payload: []byte{1, 2}, + }, + { + Duration: 1 * 90000, + IsNonSyncSample: false, + Payload: []byte{3, 4}, + }, + { + Duration: 1 * 90000, + IsNonSyncSample: true, + Payload: []byte{5, 6}, + }, + }, + }}, + }, + } + err = parts.Marshal(&buf2) + require.NoError(t, err) + + err = os.WriteFile(fpath, append(buf1.Bytes(), buf2.Bytes()...), 0o644) + require.NoError(t, err) +} + +func writeSegment2(t *testing.T, fpath string) { + init := fmp4.Init{ + Tracks: []*fmp4.InitTrack{{ + ID: 1, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: []byte{ + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, + 0x20, + }, + PPS: []byte{0x08}, + }, + }}, + } + + var buf1 seekablebuffer.Buffer + err := init.Marshal(&buf1) + require.NoError(t, err) + + var buf2 seekablebuffer.Buffer + parts := fmp4.Parts{ + { + SequenceNumber: 1, + Tracks: []*fmp4.PartTrack{{ + ID: 1, + BaseTime: 0, + Samples: []*fmp4.PartSample{ + { + Duration: 1 * 90000, + IsNonSyncSample: false, + Payload: []byte{7, 8}, + }, + { + Duration: 1 * 90000, + IsNonSyncSample: false, + Payload: []byte{9, 10}, + }, + }, + }}, + }, + } + err = parts.Marshal(&buf2) + require.NoError(t, err) + + err = os.WriteFile(fpath, append(buf1.Bytes(), buf2.Bytes()...), 0o644) + require.NoError(t, err) +} + +func TestServer(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755) + require.NoError(t, err) + + writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-000000.mp4")) + writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-000000.mp4")) + + s := &Server{ + Address: "127.0.0.1:9996", + ReadTimeout: conf.StringDuration(10 * time.Second), + PathConfs: map[string]*conf.Path{ + "mypath": { + Record: true, + RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"), + }, + }, + Parent: &nilLogger{}, + } + err = s.Initialize() + require.NoError(t, err) + defer s.Close() + + v := url.Values{} + v.Set("path", "mypath") + v.Set("start", time.Date(2008, 11, 0o7, 11, 23, 1, 0, time.Local).Format(time.RFC3339)) + v.Set("duration", "2s") + v.Set("format", "fmp4") + + u := &url.URL{ + Scheme: "http", + Host: "localhost:9996", + Path: "/get", + RawQuery: v.Encode(), + } + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + require.NoError(t, err) + + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + + buf, err := io.ReadAll(res.Body) + require.NoError(t, err) + + var parts fmp4.Parts + err = parts.Unmarshal(buf) + require.NoError(t, err) + + require.Equal(t, fmp4.Parts{ + { + SequenceNumber: 0, + Tracks: []*fmp4.PartTrack{ + { + ID: 1, + Samples: []*fmp4.PartSample{ + { + Duration: 0, + Payload: []byte{3, 4}, + }, + { + Duration: 90000, + IsNonSyncSample: true, + Payload: []byte{5, 6}, + }, + }, + }, + }, + }, + { + SequenceNumber: 0, + Tracks: []*fmp4.PartTrack{ + { + ID: 1, + BaseTime: 90000, + Samples: []*fmp4.PartSample{ + { + Duration: 90000, + Payload: []byte{7, 8}, + }, + }, + }, + }, + }, + }, parts) +} diff --git a/internal/record/agent_instance.go b/internal/record/agent_instance.go index 7081de99314..8e722a8fa4a 100644 --- a/internal/record/agent_instance.go +++ b/internal/record/agent_instance.go @@ -34,15 +34,10 @@ type agentInstance struct { func (a *agentInstance) initialize() { a.pathFormat = a.agent.PathFormat - a.pathFormat = strings.ReplaceAll(a.pathFormat, "%path", a.agent.PathName) - - switch a.agent.Format { - case conf.RecordFormatMPEGTS: - a.pathFormat += ".ts" - - default: - a.pathFormat += ".mp4" - } + a.pathFormat = PathAddExtension( + strings.ReplaceAll(a.pathFormat, "%path", a.agent.PathName), + a.agent.Format, + ) a.terminate = make(chan struct{}) a.done = make(chan struct{}) diff --git a/internal/record/cleaner.go b/internal/record/cleaner.go index 291116f9936..695b0d1d301 100644 --- a/internal/record/cleaner.go +++ b/internal/record/cleaner.go @@ -5,7 +5,6 @@ import ( "io/fs" "os" "path/filepath" - "strings" "time" "github.com/bluenviron/mediamtx/internal/conf" @@ -14,36 +13,9 @@ import ( var timeNow = time.Now -func commonPath(v string) string { - common := "" - remaining := v - - for { - i := strings.IndexAny(remaining, "\\/") - if i < 0 { - break - } - - var part string - part, remaining = remaining[:i+1], remaining[i+1:] - - if strings.Contains(part, "%") { - break - } - - common += part - } - - if len(common) > 0 { - common = common[:len(common)-1] - } - - return common -} - // CleanerEntry is a cleaner entry. type CleanerEntry struct { - PathFormat string + Path string Format conf.RecordFormat DeleteAfter time.Duration } @@ -108,21 +80,13 @@ func (c *Cleaner) doRun() { } func (c *Cleaner) doRunEntry(e *CleanerEntry) error { - pathFormat := e.PathFormat - - switch e.Format { - case conf.RecordFormatMPEGTS: - pathFormat += ".ts" - - default: - pathFormat += ".mp4" - } + entryPath := PathAddExtension(e.Path, e.Format) // we have to convert to absolute paths - // otherwise, commonPath and fpath inside Walk() won't have common elements - pathFormat, _ = filepath.Abs(pathFormat) + // otherwise, entryPath and fpath inside Walk() won't have common elements + entryPath, _ = filepath.Abs(entryPath) - commonPath := commonPath(pathFormat) + commonPath := CommonPath(entryPath) now := timeNow() filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { //nolint:errcheck @@ -131,8 +95,8 @@ func (c *Cleaner) doRunEntry(e *CleanerEntry) error { } if !info.IsDir() { - var pa path - ok := pa.decode(pathFormat, fpath) + var pa Path + ok := pa.Decode(entryPath, fpath) if ok { if now.Sub(time.Time(pa)) > e.DeleteAfter { c.Log(logger.Debug, "removing %s", fpath) diff --git a/internal/record/cleaner_test.go b/internal/record/cleaner_test.go index f7e9eb3498b..788a41f637a 100644 --- a/internal/record/cleaner_test.go +++ b/internal/record/cleaner_test.go @@ -32,7 +32,7 @@ func TestCleaner(t *testing.T) { c := &Cleaner{ Entries: []CleanerEntry{{ - PathFormat: filepath.Join(dir, specialChars+"_%path/%Y-%m-%d_%H-%M-%S-%f"), + Path: filepath.Join(dir, specialChars+"_%path/%Y-%m-%d_%H-%M-%S-%f"), Format: conf.RecordFormatFMP4, DeleteAfter: 10 * time.Second, }}, diff --git a/internal/record/format_fmp4_part.go b/internal/record/format_fmp4_part.go index 926499eaca7..dbe8a4a5d41 100644 --- a/internal/record/format_fmp4_part.go +++ b/internal/record/format_fmp4_part.go @@ -6,8 +6,8 @@ import ( "path/filepath" "time" - "github.com/aler9/writerseeker" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer" "github.com/bluenviron/mediamtx/internal/logger" ) @@ -29,13 +29,13 @@ func writePart( Tracks: fmp4PartTracks, } - var ws writerseeker.WriterSeeker - err := part.Marshal(&ws) + var buf seekablebuffer.Buffer + err := part.Marshal(&buf) if err != nil { return err } - _, err = f.Write(ws.Bytes()) + _, err = f.Write(buf.Bytes()) return err } @@ -54,7 +54,7 @@ func (p *formatFMP4Part) initialize() { func (p *formatFMP4Part) close() error { if p.s.fi == nil { - p.s.path = path(p.s.startNTP).encode(p.s.f.a.pathFormat) + p.s.path = Path(p.s.startNTP).Encode(p.s.f.a.pathFormat) p.s.f.a.agent.Log(logger.Debug, "creating segment %s", p.s.path) err := os.MkdirAll(filepath.Dir(p.s.path), 0o755) diff --git a/internal/record/format_fmp4_segment.go b/internal/record/format_fmp4_segment.go index db25da07798..f0695c006c1 100644 --- a/internal/record/format_fmp4_segment.go +++ b/internal/record/format_fmp4_segment.go @@ -5,8 +5,8 @@ import ( "os" "time" - "github.com/aler9/writerseeker" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer" "github.com/bluenviron/mediamtx/internal/logger" ) @@ -21,13 +21,13 @@ func writeInit(f io.Writer, tracks []*formatFMP4Track) error { Tracks: fmp4Tracks, } - var ws writerseeker.WriterSeeker - err := init.Marshal(&ws) + var buf seekablebuffer.Buffer + err := init.Marshal(&buf) if err != nil { return err } - _, err = f.Write(ws.Bytes()) + _, err = f.Write(buf.Bytes()) return err } diff --git a/internal/record/format_mpegts_segment.go b/internal/record/format_mpegts_segment.go index 8bdc230e4fc..a3870241057 100644 --- a/internal/record/format_mpegts_segment.go +++ b/internal/record/format_mpegts_segment.go @@ -43,7 +43,7 @@ func (s *formatMPEGTSSegment) close() error { func (s *formatMPEGTSSegment) Write(p []byte) (int, error) { if s.fi == nil { - s.path = path(s.startNTP).encode(s.f.a.pathFormat) + s.path = Path(s.startNTP).Encode(s.f.a.pathFormat) s.f.a.agent.Log(logger.Debug, "creating segment %s", s.path) err := os.MkdirAll(filepath.Dir(s.path), 0o755) diff --git a/internal/record/path.go b/internal/record/path.go index c3bf2ae2493..1c31a0495a9 100644 --- a/internal/record/path.go +++ b/internal/record/path.go @@ -5,6 +5,8 @@ import ( "strconv" "strings" "time" + + "github.com/bluenviron/mediamtx/internal/conf" ) func leadingZeros(v int, size int) string { @@ -21,9 +23,50 @@ func leadingZeros(v int, size int) string { return out2 + out } -type path time.Time +// PathAddExtension adds the file extension to path. +func PathAddExtension(path string, format conf.RecordFormat) string { + switch format { + case conf.RecordFormatMPEGTS: + return path + ".ts" + + default: + return path + ".mp4" + } +} + +// CommonPath returns the common path between all segments with given recording path. +func CommonPath(v string) string { + common := "" + remaining := v + + for { + i := strings.IndexAny(remaining, "\\/") + if i < 0 { + break + } + + var part string + part, remaining = remaining[:i+1], remaining[i+1:] + + if strings.Contains(part, "%") { + break + } + + common += part + } + + if len(common) > 0 { + common = common[:len(common)-1] + } + + return common +} + +// Path is a record path. +type Path time.Time -func (p *path) decode(format string, v string) bool { +// Decode decodes a Path. +func (p *Path) Decode(format string, v string) bool { re := format for _, ch := range []uint8{ @@ -141,15 +184,16 @@ func (p *path) decode(format string, v string) bool { } if unixSec > 0 { - *p = path(time.Unix(unixSec, 0)) + *p = Path(time.Unix(unixSec, 0)) } else { - *p = path(time.Date(year, month, day, hour, minute, second, micros*1000, time.Local)) + *p = Path(time.Date(year, month, day, hour, minute, second, micros*1000, time.Local)) } return true } -func (p path) encode(format string) string { +// Encode encodes a path. +func (p Path) Encode(format string) string { format = strings.ReplaceAll(format, "%Y", strconv.FormatInt(int64(time.Time(p).Year()), 10)) format = strings.ReplaceAll(format, "%m", leadingZeros(int(time.Time(p).Month()), 2)) format = strings.ReplaceAll(format, "%d", leadingZeros(time.Time(p).Day(), 2)) diff --git a/internal/record/path_test.go b/internal/record/path_test.go index 0acdc66177f..3831561e0f4 100644 --- a/internal/record/path_test.go +++ b/internal/record/path_test.go @@ -10,19 +10,19 @@ import ( var pathCases = []struct { name string format string - dec path + dec Path enc string }{ { "standard", "%path/%Y-%m-%d_%H-%M-%S-%f.mp4", - path(time.Date(2008, 11, 0o7, 11, 22, 4, 123456000, time.Local)), + Path(time.Date(2008, 11, 0o7, 11, 22, 4, 123456000, time.Local)), "%path/2008-11-07_11-22-04-123456.mp4", }, { "unix seconds", "%path/%s.mp4", - path(time.Date(2021, 12, 2, 12, 15, 23, 0, time.UTC).Local()), + Path(time.Date(2021, 12, 2, 12, 15, 23, 0, time.UTC).Local()), "%path/1638447323.mp4", }, } @@ -30,8 +30,8 @@ var pathCases = []struct { func TestPathDecode(t *testing.T) { for _, ca := range pathCases { t.Run(ca.name, func(t *testing.T) { - var dec path - ok := dec.decode(ca.format, ca.enc) + var dec Path + ok := dec.Decode(ca.format, ca.enc) require.Equal(t, true, ok) require.Equal(t, ca.dec, dec) }) @@ -41,7 +41,7 @@ func TestPathDecode(t *testing.T) { func TestPathEncode(t *testing.T) { for _, ca := range pathCases { t.Run(ca.name, func(t *testing.T) { - require.Equal(t, ca.enc, ca.dec.encode(ca.format)) + require.Equal(t, ca.enc, ca.dec.Encode(ca.format)) }) } } diff --git a/internal/servers/hls/http_server.go b/internal/servers/hls/http_server.go index 9241fb6d119..3145f618898 100644 --- a/internal/servers/hls/http_server.go +++ b/internal/servers/hls/http_server.go @@ -147,7 +147,7 @@ func (s *httpServer) onRequest(ctx *gin.Context) { user, pass, hasCredentials := ctx.Request.BasicAuth() - res := s.pathManager.GetConfForPath(defs.PathGetConfForPathReq{ + res := s.pathManager.FindPathConf(defs.PathFindPathConfReq{ AccessRequest: defs.PathAccessRequest{ Name: dir, Query: ctx.Request.URL.RawQuery, diff --git a/internal/servers/webrtc/http_server.go b/internal/servers/webrtc/http_server.go index f76710205b0..10c78bc26e6 100644 --- a/internal/servers/webrtc/http_server.go +++ b/internal/servers/webrtc/http_server.go @@ -112,7 +112,7 @@ func (s *httpServer) checkAuthOutsideSession(ctx *gin.Context, path string, publ remoteAddr := net.JoinHostPort(ip, port) user, pass, hasCredentials := ctx.Request.BasicAuth() - res := s.pathManager.GetConfForPath(defs.PathGetConfForPathReq{ + res := s.pathManager.FindPathConf(defs.PathFindPathConfReq{ AccessRequest: defs.PathAccessRequest{ Name: path, Query: ctx.Request.URL.RawQuery, diff --git a/mediamtx.yml b/mediamtx.yml index 714af4f60da..2cc101bc20f 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -41,11 +41,6 @@ udpMaxPayloadSize: 1472 # it is discarded. externalAuthenticationURL: -# Enable the HTTP API. -api: no -# Address of the API listener. -apiAddress: 127.0.0.1:9997 - # Enable Prometheus-compatible metrics. metrics: no # Address of the metrics listener. @@ -69,10 +64,26 @@ runOnConnectRestart: no # Environment variables are the same of runOnConnect. runOnDisconnect: +############################################### +# Global settings -> API + +# Enable controlling the server through the API. +api: no +# Address of the API listener. +apiAddress: 127.0.0.1:9997 + +############################################### +# Global settings -> Playback server + +# Enable downloading recordings from the playback server. +playback: no +# Address of the playback server listener. +playbackAddress: :9996 + ############################################### # Global settings -> RTSP server -# Allow publishing and reading streams with the RTSP protocol. +# Enable publishing and reading streams with the RTSP protocol. rtsp: yes # List of enabled RTSP transport protocols. # UDP is the most performant, but doesn't work when there's a NAT/firewall between @@ -112,7 +123,7 @@ authMethods: [basic] ############################################### # Global settings -> RTMP server -# Allow publishing and reading streams with the RTMP protocol. +# Enable publishing and reading streams with the RTMP protocol. rtmp: yes # Address of the RTMP listener. This is needed only when encryption is "no" or "optional". rtmpAddress: :1935 @@ -132,7 +143,7 @@ rtmpServerCert: server.crt ############################################### # Global settings -> HLS server -# Allow reading streams with the HLS protocol. +# Enable reading streams with the HLS protocol. hls: yes # Address of the HLS listener. hlsAddress: :8888 @@ -188,7 +199,7 @@ hlsDirectory: '' ############################################### # Global settings -> WebRTC server -# Allow publishing and reading streams with the WebRTC protocol. +# Enable publishing and reading streams with the WebRTC protocol. webrtc: yes # Address of the WebRTC HTTP listener. webrtcAddress: :8889 @@ -236,7 +247,7 @@ webrtcICEServers2: [] ############################################### # Global settings -> SRT server -# Allow publishing and reading streams with the SRT protocol. +# Enable publishing and reading streams with the SRT protocol. srt: yes # Address of the SRT listener. srtAddress: :8890 From 76afe3fdd212ad61330568ae5f42bf3396ffc6ef Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 23 Jan 2024 20:15:59 +0100 Subject: [PATCH 2/3] add playback switch --- apidocs/openapi.yaml | 4 +++- internal/conf/conf.go | 2 +- internal/conf/conf_test.go | 1 + internal/conf/path.go | 6 ++++-- internal/playback/fmp4.go | 24 +----------------------- internal/playback/server.go | 4 ++-- mediamtx.yml | 4 +++- 7 files changed, 15 insertions(+), 30 deletions(-) diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 417d353b8f7..ad82a07bbe5 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -221,9 +221,11 @@ components: fallback: type: string - # Record + # Record and playback record: type: boolean + playback: + type: boolean recordPath: type: string recordFormat: diff --git a/internal/conf/conf.go b/internal/conf/conf.go index e952023098b..30e3df54fab 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -447,7 +447,7 @@ func (conf *Conf) Validate() error { } } - // Record + // Record (deprecated) if conf.Record != nil { conf.PathDefaults.Record = *conf.Record } diff --git a/internal/conf/conf_test.go b/internal/conf/conf_test.go index e61262137b4..6575c545c33 100644 --- a/internal/conf/conf_test.go +++ b/internal/conf/conf_test.go @@ -52,6 +52,7 @@ func TestConfFromFile(t *testing.T) { Source: "publisher", SourceOnDemandStartTimeout: 10 * StringDuration(time.Second), SourceOnDemandCloseAfter: 10 * StringDuration(time.Second), + Playback: true, RecordPath: "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f", RecordFormat: RecordFormatFMP4, RecordPartDuration: 100000000, diff --git a/internal/conf/path.go b/internal/conf/path.go index d6da3ee49aa..a6b840a0c8b 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -96,8 +96,9 @@ type Path struct { SRTReadPassphrase string `json:"srtReadPassphrase"` Fallback string `json:"fallback"` - // Record + // Record and playback Record bool `json:"record"` + Playback bool `json:"playback"` RecordPath string `json:"recordPath"` RecordFormat RecordFormat `json:"recordFormat"` RecordPartDuration StringDuration `json:"recordPartDuration"` @@ -186,7 +187,8 @@ func (pconf *Path) setDefaults() { pconf.SourceOnDemandStartTimeout = 10 * StringDuration(time.Second) pconf.SourceOnDemandCloseAfter = 10 * StringDuration(time.Second) - // Record + // Record and playback + pconf.Playback = true pconf.RecordPath = "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f" pconf.RecordFormat = RecordFormatFMP4 pconf.RecordPartDuration = 100 * StringDuration(time.Millisecond) diff --git a/internal/playback/fmp4.go b/internal/playback/fmp4.go index 179a90ef7eb..a19e41de4e2 100644 --- a/internal/playback/fmp4.go +++ b/internal/playback/fmp4.go @@ -179,31 +179,9 @@ func seekAndMuxParts( if !fsw { if !isRandom { - /* - var nalus [][]byte - for _, sa2 := range gop[tfhd.TrackID] { - auNALUS, err := h264.AVCCUnmarshal(sa2.Payload) - if err != nil { - return nil, err - } - // auNALUS = append([][]byte{{byte(h264.NALUTypeAccessUnitDelimiter), 240}}, auNALUS...) - nalus = append(nalus, auNALUS...) - } - - newPayload, err := h264.AVCCMarshal(nalus) - if err != nil { - return nil, err - } - - sa.Payload = newPayload - // sa.PTSOffset = gop[tfhd.TrackID][0].PTSOffset - sa.IsNonSyncSample = false - - */ - for _, sa2 := range gop[tfhd.TrackID][:len(gop[tfhd.TrackID])-1] { sa2.Duration = 0 - sa2.PTSOffset = 0 // sa.PTSOffset // // -90000 //30 * 90000 + sa2.PTSOffset = 0 outTrack.Samples = append(outTrack.Samples, sa2) } } diff --git a/internal/playback/server.go b/internal/playback/server.go index 87513c54c2e..7aee0a468b9 100644 --- a/internal/playback/server.go +++ b/internal/playback/server.go @@ -52,8 +52,8 @@ func findSegments( start time.Time, duration time.Duration, ) ([]segment, error) { - if !pathConf.Record { - return nil, fmt.Errorf("record is disabled on path '%s'", pathName) + if !pathConf.Playback { + return nil, fmt.Errorf("playback is disabled on path '%s'", pathName) } recordPath := record.PathAddExtension( diff --git a/mediamtx.yml b/mediamtx.yml index 2cc101bc20f..d0d63d1800a 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -303,10 +303,12 @@ pathDefaults: fallback: ############################################### - # Default path settings -> Recording + # Default path settings -> Record and playback # Record streams to disk. record: no + # Enable serving recordings with the playback server. + playback: yes # Path of recording segments. # Extension is added automatically. # Available variables are %path (path name), %Y %m %d %H %M %S %f %s (time in strftime format) From 1c31f18fec067b3ef9b158abebf30f21e373ab4b Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 23 Jan 2024 20:22:31 +0100 Subject: [PATCH 3/3] update readme --- README.md | 40 ++++++++++++++++++++++++++++++++ internal/playback/server_test.go | 2 +- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c1e9ecabd9d..0896b5c42c5 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ And can be recorded with: * Streams are automatically converted from a protocol to another * Serve multiple streams at once in separate paths * Record streams to disk +* Playback recordings * Authenticate users; use internal or external authentication * Redirect readers to other RTSP servers (load balancing) * Query and control the server through the API @@ -115,6 +116,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi * [Encrypt the configuration](#encrypt-the-configuration) * [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression) * [Record streams to disk](#record-streams-to-disk) + * [Playback recordings](#playback-recordings) * [Forward streams to other servers](#forward-streams-to-other-servers) * [Proxy requests to other servers](#proxy-requests-to-other-servers) * [On-demand publishing](#on-demand-publishing) @@ -1183,6 +1185,44 @@ To upload recordings to a remote location, you can use _MediaMTX_ together with If you want to delete local segments after they are uploaded, replace `rclone sync` with `rclone move`. +### Playback recordings + +Recordings can be served to users through a dedicated HTTP server, that can be enabled inside the configuration: + +```yml +playback: yes +playbackAddress: :9996 +``` + +The server can be queried for recordings by using the URL: + +``` +http://localhost:9996/get?path=[mypath]&start=[start_date]&duration=[duration]&format=[format] +``` + +Where: + +* [mypath] is the path name +* [start_date] is the start date in RFC3339 format +* [duration] is the maximum duration of the recording in Golang format (example: 20s, 20h) +* [format] must be fmp4 + +All parameters must be [url-encoded](https://www.urlencoder.org/). + +For instance: + +``` +http://localhost:9996/get?path=stream2&start=2024-01-14T16%3A33%3A17%2B00%3A00&duration=200s&format=fmp4 +``` + +The resulting stream is natively compatible with any browser, therefore its URL can be directly inserted into a \