diff --git a/examples/tutorials/streaming_api_tutorial.py b/examples/tutorials/streaming_api_tutorial.py index 2a68b5e02f8..f9e29aa064b 100644 --- a/examples/tutorials/streaming_api_tutorial.py +++ b/examples/tutorials/streaming_api_tutorial.py @@ -250,21 +250,24 @@ # When the StreamReader buffered this number of chunks and is asked to pull # more frames, StreamReader drops the old frames/chunks. # - ``stream_index``: The index of the source stream. +# - ``decoder``: If provided, override the decoder. Useful if it fails to detect +# the codec. +# - ``decoder_option``: The option for the decoder. # # For audio output stream, you can provide the following additional # parameters to change the audio properties. # -# - ``sample_rate``: When provided, StreamReader resamples the audio on-the-fly. -# - ``dtype``: By default the StreamReader returns tensor of `float32` dtype, -# with sample values ranging `[-1, 1]`. By providing ``dtype`` argument +# - ``format``: By default the StreamReader returns tensor of `float32` dtype, +# with sample values ranging `[-1, 1]`. By providing ``format`` argument # the resulting dtype and value range is changed. +# - ``sample_rate``: When provided, StreamReader resamples the audio on-the-fly. # # For video output stream, the following parameters are available. # +# - ``format``: Change the image format. # - ``frame_rate``: Change the frame rate by dropping or duplicating # frames. No interpolation is performed. # - ``width``, ``height``: Change the image size. -# - ``format``: Change the image format. # ###################################################################### @@ -298,7 +301,7 @@ # streamer.add_basic_video_stream( # frames_per_chunk=10, # frame_rate=30, -# format="RGB" +# format="rgb24" # ) # # # Stream video from source stream `j`, @@ -310,7 +313,7 @@ # frame_rate=30, # width=128, # height=128, -# format="BGR" +# format="bgr24" # ) # @@ -428,7 +431,7 @@ frame_rate=1, width=960, height=540, - format="RGB", + format="rgb24", ) # Video stream with 320x320 (stretched) at 3 FPS, grayscale @@ -437,7 +440,7 @@ frame_rate=3, width=320, height=320, - format="GRAY", + format="gray", ) # fmt: on diff --git a/test/torchaudio_unittest/common_utils/case_utils.py b/test/torchaudio_unittest/common_utils/case_utils.py index b33146f1970..84c9c0f9bf0 100644 --- a/test/torchaudio_unittest/common_utils/case_utils.py +++ b/test/torchaudio_unittest/common_utils/case_utils.py @@ -36,7 +36,6 @@ def get_base_temp_dir(cls): @classmethod def tearDownClass(cls): - super().tearDownClass() if cls.temp_dir_ is not None: try: cls.temp_dir_.cleanup() @@ -52,6 +51,7 @@ def tearDownClass(cls): # # Following the above thread, we ignore it. pass + super().tearDownClass() def get_temp_path(self, *paths): temp_dir = os.path.join(self.get_base_temp_dir(), self.id()) diff --git a/test/torchaudio_unittest/io/stream_reader_test.py b/test/torchaudio_unittest/io/stream_reader_test.py index f14c003c56d..8acca7f7219 100644 --- a/test/torchaudio_unittest/io/stream_reader_test.py +++ b/test/torchaudio_unittest/io/stream_reader_test.py @@ -1,5 +1,5 @@ import torch -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from torchaudio_unittest.common_utils import ( get_asset_path, get_image, @@ -22,14 +22,46 @@ ) -def get_video_asset(file="nasa_13013.mp4"): - return get_asset_path(file) +################################################################################ +# Helper decorator and Mixin to duplicate the tests for fileobj +_media_source = parameterized_class( + ("test_fileobj",), + [(False,), (True,)], + class_name_func=lambda cls, _, params: f'{cls.__name__}{"_fileobj" if params["test_fileobj"] else "_path"}', +) + + +class _MediaSourceMixin: + def setUp(self): + super().setUp() + self.src = None + + def get_src(self, path): + if not self.test_fileobj: + return path + if self.src is not None: + raise ValueError("get_video_asset can be called only once.") + + self.src = open(path, "rb") + return self.src + + def tearDown(self): + if self.src is not None: + self.src.close() + super().tearDown() + + +################################################################################ @skipIfNoFFmpeg -class StreamReaderInterfaceTest(TempDirMixin, TorchaudioTestCase): +@_media_source +class StreamReaderInterfaceTest(_MediaSourceMixin, TempDirMixin, TorchaudioTestCase): """Test suite for interface behaviors around StreamReader""" + def get_src(self, file="nasa_13013.mp4"): + return super().get_src(get_asset_path(file)) + def test_streamer_invalid_input(self): """StreamReader constructor does not segfault but raise an exception when the input is invalid""" with self.assertRaises(RuntimeError): @@ -48,14 +80,13 @@ def test_streamer_invalid_input(self): def test_streamer_invalide_option(self, invalid_keys, options): """When invalid options are given, StreamReader raises an exception with these keys""" options.update({k: k for k in invalid_keys}) - src = get_video_asset() with self.assertRaises(RuntimeError) as ctx: - StreamReader(src, option=options) + StreamReader(self.get_src(), option=options) assert all(f'"{k}"' in str(ctx.exception) for k in invalid_keys) def test_src_info(self): """`get_src_stream_info` properly fetches information""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) assert s.num_src_streams == 6 expected = [ @@ -112,35 +143,35 @@ def test_src_info(self): bit_rate=None, ), ] - for i, exp in enumerate(expected): - assert exp == s.get_src_stream_info(i) + output = [s.get_src_stream_info(i) for i in range(6)] + assert expected == output def test_src_info_invalid_index(self): """`get_src_stream_info` does not segfault but raise an exception when input is invalid""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) for i in [-1, 6, 7, 8]: - with self.assertRaises(IndexError): + with self.assertRaises(RuntimeError): s.get_src_stream_info(i) def test_default_streams(self): """default stream is not None""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) assert s.default_audio_stream is not None assert s.default_video_stream is not None def test_default_audio_stream_none(self): """default audio stream is None for video without audio""" - s = StreamReader(get_video_asset("nasa_13013_no_audio.mp4")) + s = StreamReader(self.get_src("nasa_13013_no_audio.mp4")) assert s.default_audio_stream is None def test_default_video_stream_none(self): """default video stream is None for video with only audio""" - s = StreamReader(get_video_asset("nasa_13013_no_video.mp4")) + s = StreamReader(self.get_src("nasa_13013_no_video.mp4")) assert s.default_video_stream is None def test_num_out_stream(self): """num_out_streams gives the correct count of output streams""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) n, m = 6, 4 for i in range(n): assert s.num_out_streams == i @@ -158,10 +189,10 @@ def test_num_out_stream(self): def test_basic_audio_stream(self): """`add_basic_audio_stream` constructs a correct filter.""" - s = StreamReader(get_video_asset()) - s.add_basic_audio_stream(frames_per_chunk=-1, dtype=None) + s = StreamReader(self.get_src()) + s.add_basic_audio_stream(frames_per_chunk=-1, format=None) s.add_basic_audio_stream(frames_per_chunk=-1, sample_rate=8000) - s.add_basic_audio_stream(frames_per_chunk=-1, dtype=torch.int16) + s.add_basic_audio_stream(frames_per_chunk=-1, format="s16p") sinfo = s.get_out_stream_info(0) assert sinfo.source_index == s.default_audio_stream @@ -177,11 +208,11 @@ def test_basic_audio_stream(self): def test_basic_video_stream(self): """`add_basic_video_stream` constructs a correct filter.""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) s.add_basic_video_stream(frames_per_chunk=-1, format=None) s.add_basic_video_stream(frames_per_chunk=-1, width=3, height=5) s.add_basic_video_stream(frames_per_chunk=-1, frame_rate=7) - s.add_basic_video_stream(frames_per_chunk=-1, format="BGR") + s.add_basic_video_stream(frames_per_chunk=-1, format="bgr24") sinfo = s.get_out_stream_info(0) assert sinfo.source_index == s.default_video_stream @@ -201,7 +232,7 @@ def test_basic_video_stream(self): def test_remove_streams(self): """`remove_stream` removes the correct output stream""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) s.add_basic_audio_stream(frames_per_chunk=-1, sample_rate=24000) s.add_basic_video_stream(frames_per_chunk=-1, width=16, height=16) s.add_basic_audio_stream(frames_per_chunk=-1, sample_rate=8000) @@ -221,21 +252,21 @@ def test_remove_streams(self): def test_remove_stream_invalid(self): """Attempt to remove invalid output streams raises IndexError""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) for i in range(-3, 3): - with self.assertRaises(IndexError): + with self.assertRaises(RuntimeError): s.remove_stream(i) s.add_audio_stream(frames_per_chunk=-1) for i in range(-3, 3): if i == 0: continue - with self.assertRaises(IndexError): + with self.assertRaises(RuntimeError): s.remove_stream(i) def test_process_packet(self): """`process_packet` method returns 0 while there is a packet in source stream""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) # nasa_1013.mp3 contains 1023 packets. for _ in range(1023): code = s.process_packet() @@ -246,19 +277,19 @@ def test_process_packet(self): def test_pop_chunks_no_output_stream(self): """`pop_chunks` method returns empty list when there is no output stream""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) assert s.pop_chunks() == [] def test_pop_chunks_empty_buffer(self): """`pop_chunks` method returns None when a buffer is empty""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) s.add_basic_audio_stream(frames_per_chunk=-1) s.add_basic_video_stream(frames_per_chunk=-1) assert s.pop_chunks() == [None, None] def test_pop_chunks_exhausted_stream(self): """`pop_chunks` method returns None when the source stream is exhausted""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) # video is 16.57 seconds. # audio streams per 10 second chunk # video streams per 20 second chunk @@ -284,14 +315,14 @@ def test_pop_chunks_exhausted_stream(self): def test_stream_empty(self): """`stream` fails when no output stream is configured""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) with self.assertRaises(RuntimeError): next(s.stream()) def test_stream_smoke_test(self): """`stream` streams chunks fine""" w, h = 256, 198 - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) s.add_basic_audio_stream(frames_per_chunk=2000, sample_rate=8000) s.add_basic_video_stream(frames_per_chunk=15, frame_rate=60, width=w, height=h) for i, (achunk, vchunk) in enumerate(s.stream()): @@ -302,7 +333,7 @@ def test_stream_smoke_test(self): def test_seek(self): """Calling `seek` multiple times should not segfault""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_src()) for i in range(10): s.seek(i) for _ in range(0): @@ -312,13 +343,14 @@ def test_seek(self): def test_seek_negative(self): """Calling `seek` with negative value should raise an exception""" - s = StreamReader(get_video_asset()) - with self.assertRaises(ValueError): + s = StreamReader(self.get_src()) + with self.assertRaises(RuntimeError): s.seek(-1.0) @skipIfNoFFmpeg -class StreamReaderAudioTest(TempDirMixin, TorchaudioTestCase): +@_media_source +class StreamReaderAudioTest(_MediaSourceMixin, TempDirMixin, TorchaudioTestCase): """Test suite for audio streaming""" def _get_reference_wav(self, sample_rate, channels_first=False, **kwargs): @@ -327,9 +359,14 @@ def _get_reference_wav(self, sample_rate, channels_first=False, **kwargs): save_wav(path, data, sample_rate, channels_first=channels_first) return path, data - def _test_wav(self, path, original, dtype): - s = StreamReader(path) - s.add_basic_audio_stream(frames_per_chunk=-1, dtype=dtype) + def get_src(self, *args, **kwargs): + path, data = self._get_reference_wav(*args, **kwargs) + src = super().get_src(path) + return src, data + + def _test_wav(self, src, original, fmt): + s = StreamReader(src) + s.add_basic_audio_stream(frames_per_chunk=-1, format=fmt) s.process_all_packets() (output,) = s.pop_chunks() self.assertEqual(original, output) @@ -340,12 +377,19 @@ def _test_wav(self, path, original, dtype): ) def test_basic_audio_stream(self, dtype, num_channels): """`basic_audio_stream` can load WAV file properly.""" - path, original = self._get_reference_wav(8000, dtype=dtype, num_channels=num_channels) + src, original = self.get_src(8000, dtype=dtype, num_channels=num_channels) + + fmt = { + "uint8": "u8p", + "int16": "s16p", + "int32": "s32p", + }[dtype] # provide the matching dtype - self._test_wav(path, original, getattr(torch, dtype)) - # use the internal dtype ffmpeg picks - self._test_wav(path, original, None) + self._test_wav(src, original, fmt=fmt) + if not self.test_fileobj: + # use the internal dtype ffmpeg picks + self._test_wav(src, original, fmt=None) @nested_params( ["int16", "uint8", "int32"], # "float", "double", "int64"] @@ -353,11 +397,11 @@ def test_basic_audio_stream(self, dtype, num_channels): ) def test_audio_stream(self, dtype, num_channels): """`add_audio_stream` can apply filter""" - path, original = self._get_reference_wav(8000, dtype=dtype, num_channels=num_channels) + src, original = self.get_src(8000, dtype=dtype, num_channels=num_channels) expected = torch.flip(original, dims=(0,)) - s = StreamReader(path) + s = StreamReader(src) s.add_audio_stream(frames_per_chunk=-1, filter_desc="areverse") s.process_all_packets() (output,) = s.pop_chunks() @@ -369,10 +413,13 @@ def test_audio_stream(self, dtype, num_channels): ) def test_audio_seek(self, dtype, num_channels): """`seek` changes the position properly""" - path, original = self._get_reference_wav(1, dtype=dtype, num_channels=num_channels, num_frames=30) + src, original = self.get_src(1, dtype=dtype, num_channels=num_channels, num_frames=30) + for t in range(10, 20): expected = original[t:, :] - s = StreamReader(path) + if self.test_fileobj: + src.seek(0) + s = StreamReader(src) s.add_audio_stream(frames_per_chunk=-1) s.seek(float(t)) s.process_all_packets() @@ -381,9 +428,9 @@ def test_audio_seek(self, dtype, num_channels): def test_audio_seek_multiple(self): """Calling `seek` after streaming is started should change the position properly""" - path, original = self._get_reference_wav(1, dtype="int16", num_channels=2, num_frames=30) + src, original = self.get_src(1, dtype="int16", num_channels=2, num_frames=30) - s = StreamReader(path) + s = StreamReader(src) s.add_audio_stream(frames_per_chunk=-1) ts = list(range(20)) + list(range(20, 0, -1)) + list(range(20)) @@ -405,11 +452,11 @@ def test_audio_seek_multiple(self): def test_audio_frames_per_chunk(self, frame_param, num_channels): """Different chunk parameter covers the source media properly""" num_frames, frames_per_chunk, buffer_chunk_size = frame_param - path, original = self._get_reference_wav( + src, original = self.get_src( 8000, dtype="int16", num_channels=num_channels, num_frames=num_frames, channels_first=False ) - s = StreamReader(path) + s = StreamReader(src) s.add_audio_stream(frames_per_chunk=frames_per_chunk, buffer_chunk_size=buffer_chunk_size) i, outputs = 0, [] for (output,) in s.stream(): @@ -422,13 +469,19 @@ def test_audio_frames_per_chunk(self, frame_param, num_channels): @skipIfNoFFmpeg -class StreamReaderImageTest(TempDirMixin, TorchaudioTestCase): +@_media_source +class StreamReaderImageTest(_MediaSourceMixin, TempDirMixin, TorchaudioTestCase): def _get_reference_png(self, width: int, height: int, grayscale: bool): original = get_image(width, height, grayscale=grayscale) path = self.get_temp_path("ref.png") save_image(path, original, mode="L" if grayscale else "RGB") return path, original + def get_src(self, *args, **kwargs): + path, data = self._get_reference_png(*args, **kwargs) + src = super().get_src(path) + return src, data + def _test_png(self, path, original, format=None): s = StreamReader(path) s.add_basic_video_stream(frames_per_chunk=-1, format=format) @@ -441,9 +494,9 @@ def test_png(self, grayscale): # TODO: # Add test with alpha channel (RGBA, ARGB, BGRA, ABGR) w, h = 32, 18 - path, original = self._get_reference_png(w, h, grayscale=grayscale) + src, original = self.get_src(w, h, grayscale=grayscale) expected = original[None, ...] - self._test_png(path, expected) + self._test_png(src, expected) @parameterized.expand( [ @@ -453,10 +506,10 @@ def test_png(self, grayscale): ) def test_png_effect(self, filter_desc, index): h, w = 111, 250 - path, original = self._get_reference_png(w, h, grayscale=False) + src, original = self.get_src(w, h, grayscale=False) expected = torch.flip(original, dims=(index,))[None, ...] - s = StreamReader(path) + s = StreamReader(src) s.add_video_stream(frames_per_chunk=-1, filter_desc=filter_desc) s.process_all_packets() output = s.pop_chunks()[0] diff --git a/tools/setup_helpers/extension.py b/tools/setup_helpers/extension.py index ad9529d96f8..ed038e22172 100644 --- a/tools/setup_helpers/extension.py +++ b/tools/setup_helpers/extension.py @@ -57,7 +57,12 @@ def get_ext_modules(): ] ) if _USE_FFMPEG: - modules.append(Extension(name="torchaudio.lib.libtorchaudio_ffmpeg", sources=[])) + modules.extend( + [ + Extension(name="torchaudio.lib.libtorchaudio_ffmpeg", sources=[]), + Extension(name="torchaudio._torchaudio_ffmpeg", sources=[]), + ] + ) return modules diff --git a/torchaudio/csrc/CMakeLists.txt b/torchaudio/csrc/CMakeLists.txt index 0b2a0ad33c6..ac81ffd63a3 100644 --- a/torchaudio/csrc/CMakeLists.txt +++ b/torchaudio/csrc/CMakeLists.txt @@ -1,3 +1,9 @@ +# the following line is added in order to export symbols when building on Windows +# this approach has some limitations as documented in https://github.com/pytorch/pytorch/pull/3650 +if (MSVC) + set(CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS ON) +endif() + ################################################################################ # libtorchaudio ################################################################################ @@ -204,11 +210,11 @@ if (BUILD_TORCHAUDIO_PYTHON_EXTENSION) find_package(Python3 ${PYTHON_VERSION} EXACT COMPONENTS Development) set(ADDITIONAL_ITEMS Python3::Python) endif() - function(define_extension name sources libraries definitions) + function(define_extension name sources include_dirs libraries definitions) add_library(${name} SHARED ${sources}) target_compile_definitions(${name} PRIVATE "${definitions}") target_include_directories( - ${name} PRIVATE ${PROJECT_SOURCE_DIR} ${Python_INCLUDE_DIR}) + ${name} PRIVATE ${PROJECT_SOURCE_DIR} ${Python_INCLUDE_DIR} ${include_dirs}) target_link_libraries( ${name} ${libraries} @@ -254,6 +260,7 @@ if (BUILD_TORCHAUDIO_PYTHON_EXTENSION) define_extension( _torchaudio "${EXTENSION_SOURCES}" + "" libtorchaudio "${LIBTORCHAUDIO_COMPILE_DEFINITIONS}" ) @@ -265,8 +272,23 @@ if (BUILD_TORCHAUDIO_PYTHON_EXTENSION) define_extension( _torchaudio_decoder "${DECODER_EXTENSION_SOURCES}" + "" "libtorchaudio_decoder" "${LIBTORCHAUDIO_DECODER_DEFINITIONS}" ) endif() + if(USE_FFMPEG) + set( + FFMPEG_EXTENSION_SOURCES + ffmpeg/pybind/pybind.cpp + ffmpeg/pybind/stream_reader.cpp + ) + define_extension( + _torchaudio_ffmpeg + "${FFMPEG_EXTENSION_SOURCES}" + "${FFMPEG_INCLUDE_DIRS}" + "libtorchaudio_ffmpeg" + "${LIBTORCHAUDIO_DECODER_DEFINITIONS}" + ) + endif() endif() diff --git a/torchaudio/csrc/ffmpeg/ffmpeg.cpp b/torchaudio/csrc/ffmpeg/ffmpeg.cpp index 648d787e890..5814572035f 100644 --- a/torchaudio/csrc/ffmpeg/ffmpeg.cpp +++ b/torchaudio/csrc/ffmpeg/ffmpeg.cpp @@ -66,17 +66,24 @@ std::string join(std::vector vars) { AVFormatContextPtr get_input_format_context( const std::string& src, const c10::optional& device, - const OptionDict& option) { - AVFormatContext* pFormat = NULL; + const OptionDict& option, + AVIOContext* io_ctx) { + AVFormatContext* pFormat = avformat_alloc_context(); + if (!pFormat) { + throw std::runtime_error("Failed to allocate AVFormatContext."); + } + if (io_ctx) { + pFormat->pb = io_ctx; + } - AVINPUT_FORMAT_CONST AVInputFormat* pInput = [&]() -> AVInputFormat* { + auto* pInput = [&]() -> AVINPUT_FORMAT_CONST AVInputFormat* { if (device.has_value()) { std::string device_str = device.value(); AVINPUT_FORMAT_CONST AVInputFormat* p = av_find_input_format(device_str.c_str()); if (!p) { std::ostringstream msg; - msg << "Unsupported device: \"" << device_str << "\""; + msg << "Unsupported device/format: \"" << device_str << "\""; throw std::runtime_error(msg.str()); } return p; @@ -103,6 +110,17 @@ AVFormatContextPtr get_input_format_context( AVFormatContextPtr::AVFormatContextPtr(AVFormatContext* p) : Wrapper(p) {} +//////////////////////////////////////////////////////////////////////////////// +// AVIO +//////////////////////////////////////////////////////////////////////////////// +void AVIOContextDeleter::operator()(AVIOContext* p) { + av_freep(&p->buffer); + av_freep(&p); +}; + +AVIOContextPtr::AVIOContextPtr(AVIOContext* p) + : Wrapper(p) {} + //////////////////////////////////////////////////////////////////////////////// // AVPacket //////////////////////////////////////////////////////////////////////////////// diff --git a/torchaudio/csrc/ffmpeg/ffmpeg.h b/torchaudio/csrc/ffmpeg/ffmpeg.h index a45a26d2580..9d03b5ff906 100644 --- a/torchaudio/csrc/ffmpeg/ffmpeg.h +++ b/torchaudio/csrc/ffmpeg/ffmpeg.h @@ -13,6 +13,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -74,7 +75,19 @@ struct AVFormatContextPtr AVFormatContextPtr get_input_format_context( const std::string& src, const c10::optional& device, - const OptionDict& option); + const OptionDict& option, + AVIOContext* io_ctx = nullptr); + +//////////////////////////////////////////////////////////////////////////////// +// AVIO +//////////////////////////////////////////////////////////////////////////////// +struct AVIOContextDeleter { + void operator()(AVIOContext* p); +}; + +struct AVIOContextPtr : public Wrapper { + explicit AVIOContextPtr(AVIOContext* p); +}; //////////////////////////////////////////////////////////////////////////////// // AVPacket diff --git a/torchaudio/csrc/ffmpeg/prototype.cpp b/torchaudio/csrc/ffmpeg/prototype.cpp index 06ee300e338..0ccf498a1e5 100644 --- a/torchaudio/csrc/ffmpeg/prototype.cpp +++ b/torchaudio/csrc/ffmpeg/prototype.cpp @@ -46,84 +46,70 @@ TORCH_LIBRARY_FRAGMENT(torchaudio, m) { av_log_set_level(AV_LOG_ERROR); }); m.def("torchaudio::ffmpeg_load", load); - m.class_("ffmpeg_Streamer"); - m.def("torchaudio::ffmpeg_streamer_init", init); - m.def("torchaudio::ffmpeg_streamer_num_src_streams", [](S s) { - return s->num_src_streams(); - }); - m.def("torchaudio::ffmpeg_streamer_num_out_streams", [](S s) { - return s->num_out_streams(); - }); - m.def("torchaudio::ffmpeg_streamer_get_src_stream_info", [](S s, int64_t i) { - return s->get_src_stream_info(i); - }); - m.def("torchaudio::ffmpeg_streamer_get_out_stream_info", [](S s, int64_t i) { - return s->get_out_stream_info(i); - }); - m.def("torchaudio::ffmpeg_streamer_find_best_audio_stream", [](S s) { - return s->find_best_audio_stream(); - }); - m.def("torchaudio::ffmpeg_streamer_find_best_video_stream", [](S s) { - return s->find_best_video_stream(); - }); - m.def("torchaudio::ffmpeg_streamer_seek", [](S s, double t) { - return s->seek(t); - }); - m.def( - "torchaudio::ffmpeg_streamer_add_audio_stream", - [](S s, - int64_t i, - int64_t frames_per_chunk, - int64_t num_chunks, - const c10::optional& filter_desc, - const c10::optional& decoder, - const c10::optional>& - decoder_options) { - s->add_audio_stream( - i, - frames_per_chunk, - num_chunks, - filter_desc, - decoder, - map(decoder_options)); - }); - m.def( - "torchaudio::ffmpeg_streamer_add_video_stream", - [](S s, - int64_t i, - int64_t frames_per_chunk, - int64_t num_chunks, - const c10::optional& filter_desc, - const c10::optional& decoder, - const c10::optional>& - decoder_options, - const c10::optional& hw_accel) { - s->add_video_stream( - i, - frames_per_chunk, - num_chunks, - filter_desc, - decoder, - map(decoder_options), - hw_accel); - }); - m.def("torchaudio::ffmpeg_streamer_remove_stream", [](S s, int64_t i) { - s->remove_stream(i); - }); - m.def( - "torchaudio::ffmpeg_streamer_process_packet", - [](S s, const c10::optional& timeout, const double backoff) { - return s->process_packet(timeout, backoff); - }); - m.def("torchaudio::ffmpeg_streamer_process_all_packets", [](S s) { - s->process_all_packets(); - }); - m.def("torchaudio::ffmpeg_streamer_is_buffer_ready", [](S s) { - return s->is_buffer_ready(); - }); - m.def("torchaudio::ffmpeg_streamer_pop_chunks", [](S s) { - return s->pop_chunks(); - }); + m.class_("ffmpeg_Streamer") + .def(torch::init<>(init)) + .def("num_src_streams", [](S self) { return self->num_src_streams(); }) + .def("num_out_streams", [](S self) { return self->num_out_streams(); }) + .def( + "get_src_stream_info", + [](S s, int64_t i) { return s->get_src_stream_info(i); }) + .def( + "get_out_stream_info", + [](S s, int64_t i) { return s->get_out_stream_info(i); }) + .def( + "find_best_audio_stream", + [](S s) { return s->find_best_audio_stream(); }) + .def( + "find_best_video_stream", + [](S s) { return s->find_best_video_stream(); }) + .def("seek", [](S s, double t) { return s->seek(t); }) + .def( + "add_audio_stream", + [](S s, + int64_t i, + int64_t frames_per_chunk, + int64_t num_chunks, + const c10::optional& filter_desc, + const c10::optional& decoder, + const c10::optional>& + decoder_options) { + s->add_audio_stream( + i, + frames_per_chunk, + num_chunks, + filter_desc, + decoder, + map(decoder_options)); + }) + .def( + "add_video_stream", + [](S s, + int64_t i, + int64_t frames_per_chunk, + int64_t num_chunks, + const c10::optional& filter_desc, + const c10::optional& decoder, + const c10::optional>& + decoder_options, + const c10::optional& hw_accel) { + s->add_video_stream( + i, + frames_per_chunk, + num_chunks, + filter_desc, + decoder, + map(decoder_options), + hw_accel); + }) + .def("remove_stream", [](S s, int64_t i) { s->remove_stream(i); }) + .def( + "process_packet", + [](S s, const c10::optional& timeout, const double backoff) { + return s->process_packet(timeout, backoff); + }) + .def("process_all_packets", [](S s) { s->process_all_packets(); }) + .def("is_buffer_ready", [](S s) { return s->is_buffer_ready(); }) + .def("pop_chunks", [](S s) { return s->pop_chunks(); }); } } // namespace diff --git a/torchaudio/csrc/ffmpeg/pybind/pybind.cpp b/torchaudio/csrc/ffmpeg/pybind/pybind.cpp new file mode 100644 index 00000000000..46e633262c1 --- /dev/null +++ b/torchaudio/csrc/ffmpeg/pybind/pybind.cpp @@ -0,0 +1,39 @@ +#include +#include +#include + +namespace torchaudio { +namespace ffmpeg { +namespace { + +PYBIND11_MODULE(_torchaudio_ffmpeg, m) { + py::class_>( + m, "StreamReaderFileObj") + .def(py::init< + py::object, + const c10::optional&, + const c10::optional&, + int64_t>()) + .def("num_src_streams", &StreamReaderFileObj::num_src_streams) + .def("num_out_streams", &StreamReaderFileObj::num_out_streams) + .def( + "find_best_audio_stream", + &StreamReaderFileObj::find_best_audio_stream) + .def( + "find_best_video_stream", + &StreamReaderFileObj::find_best_video_stream) + .def("get_src_stream_info", &StreamReaderFileObj::get_src_stream_info) + .def("get_out_stream_info", &StreamReaderFileObj::get_out_stream_info) + .def("seek", &StreamReaderFileObj::seek) + .def("add_audio_stream", &StreamReaderFileObj::add_audio_stream) + .def("add_video_stream", &StreamReaderFileObj::add_video_stream) + .def("remove_stream", &StreamReaderFileObj::remove_stream) + .def("process_packet", &StreamReaderFileObj::process_packet) + .def("process_all_packets", &StreamReaderFileObj::process_all_packets) + .def("is_buffer_ready", &StreamReaderFileObj::is_buffer_ready) + .def("pop_chunks", &StreamReaderFileObj::pop_chunks); +} + +} // namespace +} // namespace ffmpeg +} // namespace torchaudio diff --git a/torchaudio/csrc/ffmpeg/pybind/stream_reader.cpp b/torchaudio/csrc/ffmpeg/pybind/stream_reader.cpp new file mode 100644 index 00000000000..ede150c774e --- /dev/null +++ b/torchaudio/csrc/ffmpeg/pybind/stream_reader.cpp @@ -0,0 +1,89 @@ +#include +#include + +namespace torchaudio { +namespace ffmpeg { +namespace { + +static int read_function(void* opaque, uint8_t* buf, int buf_size) { + FileObj* fileobj = static_cast(opaque); + buf_size = FFMIN(buf_size, fileobj->buffer_size); + + int num_read = 0; + while (num_read < buf_size) { + int request = buf_size - num_read; + auto chunk = static_cast( + static_cast(fileobj->fileobj.attr("read")(request))); + auto chunk_len = chunk.length(); + if (chunk_len == 0) { + break; + } + if (chunk_len > request) { + std::ostringstream message; + message + << "Requested up to " << request << " bytes but, " + << "received " << chunk_len << " bytes. " + << "The given object does not confirm to read protocol of file object."; + throw std::runtime_error(message.str()); + } + memcpy(buf, chunk.data(), chunk_len); + buf += chunk_len; + num_read += chunk_len; + } + return num_read == 0 ? AVERROR_EOF : num_read; +} + +static int64_t seek_function(void* opaque, int64_t offset, int whence) { + // We do not know the file size. + if (whence == AVSEEK_SIZE) { + return AVERROR(EIO); + } + FileObj* fileobj = static_cast(opaque); + return py::cast(fileobj->fileobj.attr("seek")(offset, whence)); +} + +AVIOContextPtr get_io_context(FileObj* opaque, int buffer_size) { + uint8_t* buffer = static_cast(av_malloc(buffer_size)); + if (!buffer) { + throw std::runtime_error("Failed to allocate buffer."); + } + + // If avio_alloc_context succeeds, then buffer will be cleaned up by + // AVIOContextPtr destructor. + // If avio_alloc_context fails, we need to clean up by ourselves. + AVIOContext* av_io_ctx = avio_alloc_context( + buffer, + buffer_size, + 0, + static_cast(opaque), + &read_function, + nullptr, + py::hasattr(opaque->fileobj, "seek") ? &seek_function : nullptr); + + if (!av_io_ctx) { + av_freep(&buffer); + throw std::runtime_error("Failed to allocate AVIO context."); + } + return AVIOContextPtr{av_io_ctx}; +} +} // namespace + +FileObj::FileObj(py::object fileobj_, int buffer_size) + : fileobj(fileobj_), + buffer_size(buffer_size), + pAVIO(get_io_context(this, buffer_size)) {} + +StreamReaderFileObj::StreamReaderFileObj( + py::object fileobj_, + const c10::optional& format, + const c10::optional& option, + int64_t buffer_size) + : FileObj(fileobj_, static_cast(buffer_size)), + StreamReaderBinding(get_input_format_context( + static_cast(py::str(fileobj_.attr("__str__")())), + format, + option.value_or(OptionDict{}), + pAVIO)) {} + +} // namespace ffmpeg +} // namespace torchaudio diff --git a/torchaudio/csrc/ffmpeg/pybind/stream_reader.h b/torchaudio/csrc/ffmpeg/pybind/stream_reader.h new file mode 100644 index 00000000000..7b12ae5c020 --- /dev/null +++ b/torchaudio/csrc/ffmpeg/pybind/stream_reader.h @@ -0,0 +1,28 @@ +#pragma once +#include +#include + +namespace torchaudio { +namespace ffmpeg { + +struct FileObj { + py::object fileobj; + int buffer_size; + AVIOContextPtr pAVIO; + FileObj(py::object fileobj, int buffer_size); +}; + +// The reason we inherit FileObj instead of making it an attribute +// is so that FileObj is instantiated first. +// AVIOContext must be initialized before AVFormat, and outlive AVFormat. +class StreamReaderFileObj : protected FileObj, public StreamReaderBinding { + public: + StreamReaderFileObj( + py::object fileobj, + const c10::optional& format, + const c10::optional& option, + int64_t buffer_size); +}; + +} // namespace ffmpeg +} // namespace torchaudio diff --git a/torchaudio/csrc/ffmpeg/streamer.cpp b/torchaudio/csrc/ffmpeg/streamer.cpp index 6a7f050beba..18b6037a18b 100644 --- a/torchaudio/csrc/ffmpeg/streamer.cpp +++ b/torchaudio/csrc/ffmpeg/streamer.cpp @@ -21,12 +21,12 @@ void Streamer::validate_open_stream() const { void Streamer::validate_src_stream_index(int i) const { validate_open_stream(); if (i < 0 || i >= static_cast(pFormatContext->nb_streams)) - throw std::out_of_range("Source stream index out of range"); + throw std::runtime_error("Source stream index out of range"); } void Streamer::validate_output_stream_index(int i) const { if (i < 0 || i >= static_cast(stream_indices.size())) - throw std::out_of_range("Output stream index out of range"); + throw std::runtime_error("Output stream index out of range"); } void Streamer::validate_src_stream_type(int i, AVMediaType type) { @@ -81,19 +81,25 @@ SrcStreamInfo Streamer::get_src_stream_info(int i) const { ret.codec_long_name = desc->long_name; } switch (codecpar->codec_type) { - case AVMEDIA_TYPE_AUDIO: - ret.fmt_name = - av_get_sample_fmt_name(static_cast(codecpar->format)); + case AVMEDIA_TYPE_AUDIO: { + AVSampleFormat smp_fmt = static_cast(codecpar->format); + if (smp_fmt != AV_SAMPLE_FMT_NONE) { + ret.fmt_name = av_get_sample_fmt_name(smp_fmt); + } ret.sample_rate = static_cast(codecpar->sample_rate); ret.num_channels = codecpar->channels; break; - case AVMEDIA_TYPE_VIDEO: - ret.fmt_name = - av_get_pix_fmt_name(static_cast(codecpar->format)); + } + case AVMEDIA_TYPE_VIDEO: { + AVPixelFormat pix_fmt = static_cast(codecpar->format); + if (pix_fmt != AV_PIX_FMT_NONE) { + ret.fmt_name = av_get_pix_fmt_name(pix_fmt); + } ret.width = codecpar->width; ret.height = codecpar->height; ret.frame_rate = av_q2d(stream->r_frame_rate); break; + } default:; } return ret; @@ -137,7 +143,7 @@ bool Streamer::is_buffer_ready() const { //////////////////////////////////////////////////////////////////////////////// void Streamer::seek(double timestamp) { if (timestamp < 0) { - throw std::invalid_argument("timestamp must be non-negative."); + throw std::runtime_error("timestamp must be non-negative."); } int64_t ts = static_cast(timestamp * AV_TIME_BASE); @@ -220,6 +226,13 @@ void Streamer::add_stream( validate_src_stream_type(i, media_type); AVStream* stream = pFormatContext->streams[i]; + // When media source is file-like object, it is possible that source codec is + // not detected properly. + if (stream->codecpar->format == -1) { + throw std::runtime_error( + "Failed to detect the source stream format. Please provide the decoder to use."); + } + stream->discard = AVDISCARD_DEFAULT; if (!processors[i]) processors[i] = std::make_unique( diff --git a/torchaudio/io/__init__.py b/torchaudio/io/__init__.py index bb1dcf90acc..65f01e1a957 100644 --- a/torchaudio/io/__init__.py +++ b/torchaudio/io/__init__.py @@ -14,6 +14,7 @@ def _init_extension(): try: torchaudio._extension._load_lib("libtorchaudio_ffmpeg") + import torchaudio._torchaudio_ffmpeg except OSError as err: raise ImportError( "Stream API requires FFmpeg libraries (libavformat and such). Please install FFmpeg 4." diff --git a/torchaudio/io/_stream_reader.py b/torchaudio/io/_stream_reader.py index e3eb00fd3d9..1b4edd64ce6 100644 --- a/torchaudio/io/_stream_reader.py +++ b/torchaudio/io/_stream_reader.py @@ -29,21 +29,21 @@ class StreamReaderSourceStream: Still images, such as PNG and JPEG formats are reported as `video`. """ codec: str - """Short name of the codec. Such as `pcm_s16le` and `h264`.""" + """Short name of the codec. Such as ``"pcm_s16le"`` and ``"h264"``.""" codec_long_name: str """Detailed name of the codec. - Such as `"PCM signed 16-bit little-endian"` and `"H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10"`. + Such as "`PCM signed 16-bit little-endian`" and "`H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10`". """ format: Optional[str] - """Media format. Such as `s16` and `yuv420p`. + """Media format. Such as ``"s16"`` and ``"yuv420p"``. Commonly found audio values are; - - `u8`, `u8p`: Unsigned 8-bit unsigned interger. - - `s16`, `s16p`: 16-bit signed integer. - - `s32`, `s32p`: 32-bit signed integer. - - `flt`, `fltp`: 32-bit floating-point. + - ``"u8"``, ``"u8p"``: Unsigned 8-bit unsigned interger. + - ``"s16"``, ``"s16p"``: 16-bit signed integer. + - ``"s32"``, ``"s32p"``: 32-bit signed integer. + - ``"flt"``, ``"fltp"``: 32-bit floating-point. .. note:: @@ -63,7 +63,7 @@ class StreamReaderSourceAudioStream(StreamReaderSourceStream): The metadata of an audio source stream. - In addition to the attributes reported by :py:func:`SourceStream`, + In addition to the attributes reported by :py:func:`StreamReaderSourceStream`, when the source stream is audio type, then the following additional attributes are reported. """ @@ -80,7 +80,7 @@ class StreamReaderSourceVideoStream(StreamReaderSourceStream): The metadata of a video source stream. - In addition to the attributes reported by :py:func:`SourceStream`, + In addition to the attributes reported by :py:func:`StreamReaderSourceStream`, when the source stream is audio type, then the following additional attributes are reported. """ @@ -154,24 +154,16 @@ def _parse_oi(i): return StreamReaderOutputStream(i[0], i[1]) -def _get_afilter_desc(sample_rate: Optional[int], dtype: torch.dtype): +def _get_afilter_desc(sample_rate: Optional[int], fmt: Optional[str]): descs = [] if sample_rate is not None: descs.append(f"aresample={sample_rate}") - if dtype is not None: - fmt = { - torch.uint8: "u8p", - torch.int16: "s16p", - torch.int32: "s32p", - torch.long: "s64p", - torch.float32: "fltp", - torch.float64: "dblp", - }[dtype] + if fmt is not None: descs.append(f"aformat=sample_fmts={fmt}") return ",".join(descs) if descs else None -def _get_vfilter_desc(frame_rate: Optional[float], width: Optional[int], height: Optional[int], format: Optional[str]): +def _get_vfilter_desc(frame_rate: Optional[float], width: Optional[int], height: Optional[int], fmt: Optional[str]): descs = [] if frame_rate is not None: descs.append(f"fps={frame_rate}") @@ -182,24 +174,95 @@ def _get_vfilter_desc(frame_rate: Optional[float], width: Optional[int], height: scales.append(f"height={height}") if scales: descs.append(f"scale={':'.join(scales)}") - if format is not None: - fmt = { - "RGB": "rgb24", - "BGR": "bgr24", - "YUV": "yuv420p", - "GRAY": "gray", - }[format] + if fmt is not None: descs.append(f"format=pix_fmts={fmt}") return ",".join(descs) if descs else None +def _format_doc(**kwargs): + def decorator(obj): + obj.__doc__ = obj.__doc__.format(**kwargs) + return obj + + return decorator + + +_frames_per_chunk = """Number of frames returned as one chunk. + If the source stream is exhausted before enough frames are buffered, + then the chunk is returned as-is.""" + +_buffer_chunk_size = """Internal buffer size. + When the number of chunks buffered exceeds this number, old frames are + dropped.""" + +_audio_stream_index = """The source audio stream index. + If omitted, :py:attr:`default_audio_stream` is used.""" + + +_video_stream_index = """The source video stream index. + If omitted, :py:attr:`default_video_stream` is used.""" + +_decoder = """The name of the decoder to be used. + When provided, use the specified decoder instead of the default one. + + To list the available decoders, you can use `ffmpeg -decoders` command.""" + +_decoder_option = """Options passed to decoder. + Mapping from str to str. + + To list decoder options for a decoder, you can use + `ffmpeg -h decoder=` command.""" + + +_hw_accel = """Enable hardware acceleration. + + When video is decoded on CUDA hardware, for example + `decode="h264_cuvid"`, passing CUDA device indicator to `hw_accel` + (i.e. `hw_accel="cuda:0"`) will place the resulting frames + directly on the specifiec CUDA device. + + If `None`, the frame will be moved to CPU memory. + Default: ``None``.""" + + +_format_audio_args = _format_doc( + frames_per_chunk=_frames_per_chunk, + buffer_chunk_size=_buffer_chunk_size, + stream_index=_audio_stream_index, + decoder=_decoder, + decoder_option=_decoder_option, +) + + +_format_video_args = _format_doc( + frames_per_chunk=_frames_per_chunk, + buffer_chunk_size=_buffer_chunk_size, + stream_index=_video_stream_index, + decoder=_decoder, + decoder_option=_decoder_option, + hw_accel=_hw_accel, +) + + class StreamReader: """Fetch and decode audio/video streams chunk by chunk. For the detailed usage of this class, please refer to the tutorial. Args: - src (str): Source. Can be a file path, URL, device identifier or filter expression. + src (str or file-like object): The media source. + If string-type, it must be a resource indicator that FFmpeg can + handle. This includes a file path, URL, device identifier or + filter expression. The supported value depends on the FFmpeg found + in the system. + + If file-like object, it must support `read` method with the signature + `read(size: int) -> bytes`. + Additionally, if the file-like object has `seek` method, it uses + the method when parsing media metadata. This improves the reliability + of codec detection. The signagure of `seek` method must be + `seek(offset, whence) -> int`. + format (str or None, optional): Override the input format, or specify the source sound device. Default: ``None`` (no override nor device input). @@ -232,6 +295,11 @@ class StreamReader: You can use this argument to change the input source before it is passed to decoder. Default: ``None``. + + buffer_size (int): + The internal buffer size in byte. Unsed only when `src` is file-like object. + + Default: `4096`. """ def __init__( @@ -239,12 +307,19 @@ def __init__( src: str, format: Optional[str] = None, option: Optional[Dict[str, str]] = None, + buffer_size: int = 4096, ): - self._s = torch.ops.torchaudio.ffmpeg_streamer_init(src, format, option) - i = torch.ops.torchaudio.ffmpeg_streamer_find_best_audio_stream(self._s) - self._i_audio = None if i < 0 else i - i = torch.ops.torchaudio.ffmpeg_streamer_find_best_video_stream(self._s) - self._i_video = None if i < 0 else i + if isinstance(src, str): + self._be = torch.classes.torchaudio.ffmpeg_Streamer(src, format, option) + elif hasattr(src, "read"): + self._be = torchaudio._torchaudio_ffmpeg.StreamReaderFileObj(src, format, option, buffer_size) + else: + raise ValueError("`src` must be either string or file-like object.") + + i = self._be.find_best_audio_stream() + self._default_audio_stream = None if i < 0 else i + i = self._be.find_best_video_stream() + self._default_video_stream = None if i < 0 else i @property def num_src_streams(self): @@ -252,7 +327,7 @@ def num_src_streams(self): :type: int """ - return torch.ops.torchaudio.ffmpeg_streamer_num_src_streams(self._s) + return self._be.num_src_streams() @property def num_out_streams(self): @@ -260,7 +335,7 @@ def num_out_streams(self): :type: int """ - return torch.ops.torchaudio.ffmpeg_streamer_num_out_streams(self._s) + return self._be.num_out_streams() @property def default_audio_stream(self): @@ -268,7 +343,7 @@ def default_audio_stream(self): :type: Optional[int] """ - return self._i_audio + return self._default_audio_stream @property def default_video_stream(self): @@ -276,7 +351,7 @@ def default_video_stream(self): :type: Optional[int] """ - return self._i_video + return self._default_video_stream def get_src_stream_info(self, i: int) -> torchaudio.io.StreamReaderSourceStream: """Get the metadata of source stream @@ -286,7 +361,7 @@ def get_src_stream_info(self, i: int) -> torchaudio.io.StreamReaderSourceStream: Returns: SourceStream """ - return _parse_si(torch.ops.torchaudio.ffmpeg_streamer_get_src_stream_info(self._s, i)) + return _parse_si(self._be.get_src_stream_info(i)) def get_out_stream_info(self, i: int) -> torchaudio.io.StreamReaderOutputStream: """Get the metadata of output stream @@ -296,7 +371,7 @@ def get_out_stream_info(self, i: int) -> torchaudio.io.StreamReaderOutputStream: Returns: OutputStream """ - return _parse_oi(torch.ops.torchaudio.ffmpeg_streamer_get_out_stream_info(self._s, i)) + return _parse_oi(self._be.get_out_stream_info(i)) def seek(self, timestamp: float): """Seek the stream to the given timestamp [second] @@ -304,227 +379,192 @@ def seek(self, timestamp: float): Args: timestamp (float): Target time in second. """ - torch.ops.torchaudio.ffmpeg_streamer_seek(self._s, timestamp) + self._be.seek(timestamp) + @_format_audio_args def add_basic_audio_stream( self, frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, + decoder: Optional[str] = None, + decoder_option: Optional[Dict[str, str]] = None, + format: Optional[str] = "fltp", sample_rate: Optional[int] = None, - dtype: torch.dtype = torch.float32, ): """Add output audio stream Args: - frames_per_chunk (int): Number of frames returned by StreamReader as a chunk. - If the source stream is exhausted before enough frames are buffered, - then the chunk is returned as-is. + frames_per_chunk (int): {frames_per_chunk} - buffer_chunk_size (int, optional): Internal buffer size. - When this many chunks are created, but - client code does not pop the chunk, if a new frame comes in, the old - chunk is dropped. + buffer_chunk_size (int, optional): {buffer_chunk_size} - stream_index (int or None, optional): The source audio stream index. - If omitted, :py:attr:`default_audio_stream` is used. + stream_index (int or None, optional): {stream_index} - sample_rate (int or None, optional): If provided, resample the audio. + decoder (str or None, optional): {decoder} + + decoder_option (dict or None, optional): {decoder_option} + + format (str, optional): Output sample format (precision). + + If ``None``, the output chunk has dtype corresponding to + the precision of the source audio. - dtype (torch.dtype, optional): If not ``None``, change the output sample precision. - If floating point, then the sample value range is - `[-1, 1]`. + Otherwise, the sample is converted and the output dtype is changed + as following. + + - ``"u8p"``: The output is ``torch.uint8`` type. + - ``"s16p"``: The output is ``torch.int16`` type. + - ``"s32p"``: The output is ``torch.int32`` type. + - ``"s64p"``: The output is ``torch.int64`` type. + - ``"fltp"``: The output is ``torch.float32`` type. + - ``"dblp"``: The output is ``torch.float64`` type. + + sample_rate (int or None, optional): If provided, resample the audio. """ - i = self.default_audio_stream if stream_index is None else stream_index - torch.ops.torchaudio.ffmpeg_streamer_add_audio_stream( - self._s, - i, + self.add_audio_stream( frames_per_chunk, buffer_chunk_size, - _get_afilter_desc(sample_rate, dtype), - None, - None, + stream_index, + decoder, + decoder_option, + _get_afilter_desc(sample_rate, format), ) + @_format_video_args def add_basic_video_stream( self, frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, + decoder: Optional[str] = None, + decoder_option: Optional[Dict[str, str]] = None, + hw_accel: Optional[str] = None, + format: Optional[str] = "rgb24", frame_rate: Optional[int] = None, width: Optional[int] = None, height: Optional[int] = None, - format: str = "RGB", ): """Add output video stream Args: - frames_per_chunk (int): Number of frames returned by StreamReader as a chunk. - If the source stream is exhausted before enough frames are buffered, - then the chunk is returned as-is. + frames_per_chunk (int): {frames_per_chunk} - buffer_chunk_size (int, optional): Internal buffer size. - When this many chunks are created, but - client code does not pop the chunk, if a new frame comes in, the old - chunk is dropped. + buffer_chunk_size (int, optional): {buffer_chunk_size} - stream_index (int or None, optional): The source video stream index. - If omitted, :py:attr:`default_video_stream` is used. + stream_index (int or None, optional): {stream_index} + + decoder (str or None, optional): {decoder} + + decoder_option (dict or None, optional): {decoder_option} + + hw_accel (str or None, optional): {hw_accel} + + format (str, optional): Change the format of image channels. Valid values are, + + - ``"rgb24"``: 8 bits * 3 channels (R, G, B) + - ``"bgr24"``: 8 bits * 3 channels (B, G, R) + - ``"yuv420p"``: 8 bits * 3 channels (Y, U, V) + - ``"gray"``: 8 bits * 1 channels frame_rate (int or None, optional): If provided, change the frame rate. width (int or None, optional): If provided, change the image width. Unit: Pixel. - height (int or None, optional): If provided, change the image height. Unit: Pixel. - format (str, optional): Change the format of image channels. Valid values are, - - `RGB`: 8 bits * 3 channels - - `BGR`: 8 bits * 3 channels - - `YUV`: 8 bits * 3 channels - - `GRAY`: 8 bits * 1 channels + height (int or None, optional): If provided, change the image height. Unit: Pixel. """ - i = self.default_video_stream if stream_index is None else stream_index - torch.ops.torchaudio.ffmpeg_streamer_add_video_stream( - self._s, - i, + self.add_video_stream( frames_per_chunk, buffer_chunk_size, + stream_index, + decoder, + decoder_option, + hw_accel, _get_vfilter_desc(frame_rate, width, height, format), - None, - None, - None, ) + @_format_audio_args def add_audio_stream( self, frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, - filter_desc: Optional[str] = None, decoder: Optional[str] = None, - decoder_options: Optional[Dict[str, str]] = None, + decoder_option: Optional[Dict[str, str]] = None, + filter_desc: Optional[str] = None, ): """Add output audio stream Args: - frames_per_chunk (int): Number of frames returned by StreamReader as a chunk. - If the source stream is exhausted before enough frames are buffered, - then the chunk is returned as-is. + frames_per_chunk (int): {frames_per_chunk} + + buffer_chunk_size (int, optional): {buffer_chunk_size} - buffer_chunk_size (int, optional): Internal buffer size. - When this many chunks are created, but - client code does not pop the chunk, if a new frame comes in, the old - chunk is dropped. + stream_index (int or None, optional): {stream_index} - stream_index (int or None, optional): The source audio stream index. - If omitted, :py:attr:`default_audio_stream` is used. + decoder (str or None, optional): {decoder} + + decoder_option (dict or None, optional): {decoder_option} filter_desc (str or None, optional): Filter description. The list of available filters can be found at https://ffmpeg.org/ffmpeg-filters.html Note that complex filters are not supported. - decoder (str or None, optional): The name of the decoder to be used. - When provided, use the specified decoder instead of the default one. - - decoder_options (dict or None, optional): Options passed to decoder. - Mapping from str to str. """ i = self.default_audio_stream if stream_index is None else stream_index - torch.ops.torchaudio.ffmpeg_streamer_add_audio_stream( - self._s, + if i is None: + raise RuntimeError("There is no audio stream.") + self._be.add_audio_stream( i, frames_per_chunk, buffer_chunk_size, filter_desc, decoder, - decoder_options, + decoder_option or {}, ) + @_format_video_args def add_video_stream( self, frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, - filter_desc: Optional[str] = None, decoder: Optional[str] = None, - decoder_options: Optional[Dict[str, str]] = None, + decoder_option: Optional[Dict[str, str]] = None, hw_accel: Optional[str] = None, + filter_desc: Optional[str] = None, ): """Add output video stream Args: - frames_per_chunk (int): Number of frames returned by StreamReader as a chunk. - If the source stream is exhausted before enough frames are buffered, - then the chunk is returned as-is. + frames_per_chunk (int): {frames_per_chunk} + + buffer_chunk_size (int, optional): {buffer_chunk_size} + + stream_index (int or None, optional): {stream_index} - buffer_chunk_size (int): Internal buffer size. - When this many chunks are created, but - client code does not pop the chunk, if a new frame comes in, the old - chunk is dropped. + decoder (str or None, optional): {decoder} - stream_index (int or None, optional): The source video stream index. - If omitted, :py:attr:`default_video_stream` is used. + decoder_option (dict or None, optional): {decoder_option} + + hw_accel (str or None, optional): {hw_accel} filter_desc (str or None, optional): Filter description. The list of available filters can be found at https://ffmpeg.org/ffmpeg-filters.html Note that complex filters are not supported. - - decoder (str or None, optional): The name of the decoder to be used. - When provided, use the specified decoder instead of the default one. - - decoder_options (dict or None, optional): Options passed to decoder. - Mapping from str to str. - - hw_accel (str or None, optional): Enable hardware acceleration. - - The valid choice is "cuda" or ``None``. - Default: ``None``. (No hardware acceleration.) - - When the following conditions are met, providing `hw_accel="cuda"` - will create Tensor directly from CUDA HW decoder. - - 1. TorchAudio is compiled with CUDA support. - 2. FFmpeg libraries linked dynamically are compiled with NVDEC support. - 3. The codec is supported NVDEC by. (Currently, `"h264_cuvid"` is supported) - - Example - HW decoding:: - - >>> # Decode video with NVDEC, create Tensor on CPU. - >>> streamer = StreamReader(src="input.mp4") - >>> streamer.add_video_stream(10, decoder="h264_cuvid", hw_accel=None) - >>> - >>> chunk, = next(streamer.stream()) - >>> print(chunk.dtype) - ... cpu - - >>> # Decode video with NVDEC, create Tensor directly on CUDA - >>> streamer = StreamReader(src="input.mp4") - >>> streamer.add_video_stream(10, decoder="h264_cuvid", hw_accel="cuda:1") - >>> - >>> chunk, = next(streamer.stream()) - >>> print(chunk.dtype) - ... cuda:1 - - >>> # Decode and resize video with NVDEC, create Tensor directly on CUDA - >>> streamer = StreamReader(src="input.mp4") - >>> streamer.add_video_stream( - >>> 10, decoder="h264_cuvid", - >>> decoder_options={"resize": "240x360"}, hw_accel="cuda:1") - >>> - >>> chunk, = next(streamer.stream()) - >>> print(chunk.dtype) - ... cuda:1 """ i = self.default_video_stream if stream_index is None else stream_index - torch.ops.torchaudio.ffmpeg_streamer_add_video_stream( - self._s, + if i is None: + raise RuntimeError("There is no video stream.") + self._be.add_video_stream( i, frames_per_chunk, buffer_chunk_size, filter_desc, decoder, - decoder_options, + decoder_option or {}, hw_accel, ) @@ -534,7 +574,7 @@ def remove_stream(self, i: int): Args: i (int): Index of the output stream to be removed. """ - torch.ops.torchaudio.ffmpeg_streamer_remove_stream(self._s, i) + self._be.remove_stream(i) def process_packet(self, timeout: Optional[float] = None, backoff: float = 10.0) -> int: """Read the source media and process one packet. @@ -593,15 +633,15 @@ def process_packet(self, timeout: Optional[float] = None, backoff: float = 10.0) flushed the pending frames. The caller should stop calling this method. """ - return torch.ops.torchaudio.ffmpeg_streamer_process_packet(self._s, timeout, backoff) + return self._be.process_packet(timeout, backoff) def process_all_packets(self): """Process packets until it reaches EOF.""" - torch.ops.torchaudio.ffmpeg_streamer_process_all_packets(self._s) + self._be.process_all_packets() def is_buffer_ready(self) -> bool: """Returns true if all the output streams have at least one chunk filled.""" - return torch.ops.torchaudio.ffmpeg_streamer_is_buffer_ready(self._s) + return self._be.is_buffer_ready() def pop_chunks(self) -> Tuple[Optional[torch.Tensor]]: """Pop one chunk from all the output stream buffers. @@ -611,7 +651,7 @@ def pop_chunks(self) -> Tuple[Optional[torch.Tensor]]: Buffer contents. If a buffer does not contain any frame, then `None` is returned instead. """ - return torch.ops.torchaudio.ffmpeg_streamer_pop_chunks(self._s) + return self._be.pop_chunks() def _fill_buffer(self, timeout: Optional[float], backoff: float) -> int: """Keep processing packets until all buffers have at least one chunk