diff --git a/src/pkg/parser/ffmpeg/ffmpeg.go b/src/pkg/parser/ffmpeg/ffmpeg.go index cb13f683..f8b4f8c1 100644 --- a/src/pkg/parser/ffmpeg/ffmpeg.go +++ b/src/pkg/parser/ffmpeg/ffmpeg.go @@ -33,12 +33,18 @@ func (b *builder) Build(cfg map[string]string) (parser.Parser, error) { if debugFlag, ok := cfg["debug"]; ok && debugFlag != "" { debug = true } + ctx, cancel := context.WithCancel(context.Background()) return &Parser{ - debug: debug, + cmd: &exec.Cmd{}, + cmdStdIn: nil, + cmdStdout: nil, closeOnce: new(sync.Once), + debug: debug, + timeoutInUs: cfg["timeout_in_us"], + ctx: ctx, + cancel: cancel, statusReq: make(chan struct{}, 1), statusResp: make(chan map[string]string, 1), - timeoutInUs: cfg["timeout_in_us"], }, nil } @@ -49,9 +55,10 @@ type Parser struct { closeOnce *sync.Once debug bool timeoutInUs string - - statusReq chan struct{} - statusResp chan map[string]string + ctx context.Context + cancel context.CancelFunc + statusReq chan struct{} + statusResp chan map[string]string } func (p *Parser) scanFFmpegStatus() <-chan []byte { @@ -127,7 +134,8 @@ func (p *Parser) ParseLiveStream(ctx context.Context, url *url.URL, live live.Li if err != nil { return err } - p.cmd = exec.Command( + p.cmd = exec.CommandContext( + p.ctx, ffmpegPath, "-nostats", "-progress", "-", @@ -158,13 +166,7 @@ func (p *Parser) ParseLiveStream(ctx context.Context, url *url.URL, live live.Li return p.cmd.Wait() } -var Locker sync.Mutex - func (p *Parser) Stop() error { - Locker.Lock() - defer Locker.Unlock() - if p.cmd.ProcessState == nil { - p.cmdStdIn.Write([]byte("q")) - } + p.cancel() return nil }