From e94073afcc66868a6d480d2e3414932ea9c2e5cb Mon Sep 17 00:00:00 2001 From: "Sergey \"Shnatsel\" Davidoff" Date: Wed, 16 Oct 2024 17:42:44 +0100 Subject: [PATCH] Drop flume to reduce dependency footprint now that std has a good channel in it --- Cargo.toml | 1 - src/block/reader.rs | 7 ++++--- src/block/writer.rs | 9 +++++---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8b28f2e..d52e346 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ bit_field = "^0.10.1" # exr file version bit flags miniz_oxide = "^0.7.1" # zip compression for pxr24 smallvec = "^1.7.0" # make cache-friendly allocations TODO profile if smallvec is really an improvement! rayon-core = "^1.11.0" # threading for parallel compression TODO make this an optional feature? -flume = { version = "^0.11.0", default-features = false } # crossbeam, but less unsafe code TODO make this an optional feature? zune-inflate = { version = "^0.2.3", default-features = false, features = ["zlib"] } # zip decompression, faster than miniz_oxide [dev-dependencies] diff --git a/src/block/reader.rs b/src/block/reader.rs index bb9888e..3756073 100644 --- a/src/block/reader.rs +++ b/src/block/reader.rs @@ -4,6 +4,7 @@ use std::convert::TryFrom; use std::fmt::Debug; use std::io::{Read, Seek}; +use std::sync::mpsc; use rayon_core::{ThreadPool, ThreadPoolBuildError}; use smallvec::alloc::sync::Arc; @@ -387,8 +388,8 @@ impl SequentialBlockDecompressor { #[derive(Debug)] pub struct ParallelBlockDecompressor { remaining_chunks: R, - sender: flume::Sender>, - receiver: flume::Receiver>, + sender: mpsc::Sender>, + receiver: mpsc::Receiver>, currently_decompressing_count: usize, max_threads: usize, @@ -437,7 +438,7 @@ impl ParallelBlockDecompressor { let max_threads = pool.current_num_threads().max(1).min(chunks.len()) + 2; // ca one block for each thread at all times - let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? + let (send, recv) = mpsc::channel(); // TODO bounded channel simplifies logic? Ok(Self { shared_meta_data_ref: Arc::new(chunks.meta_data().clone()), diff --git a/src/block/writer.rs b/src/block/writer.rs index 1227c69..6d38621 100644 --- a/src/block/writer.rs +++ b/src/block/writer.rs @@ -5,12 +5,13 @@ use std::fmt::Debug; use std::io::Seek; use std::iter::Peekable; use std::ops::Not; +use std::sync::mpsc; use rayon_core::{ThreadPool, ThreadPoolBuildError}; use smallvec::alloc::collections::BTreeMap; use crate::block::UncompressedBlock; -use crate::block::chunk::{Chunk}; +use crate::block::chunk::Chunk; use crate::compression::Compression; use crate::error::{Error, Result, UnitResult, usize_to_u64}; use crate::io::{Data, Tracking, Write}; @@ -337,8 +338,8 @@ pub struct ParallelBlocksCompressor<'w, W> { meta: &'w MetaData, sorted_writer: SortedBlocksWriter<'w, W>, - sender: flume::Sender>, - receiver: flume::Receiver>, + sender: mpsc::Sender>, + receiver: mpsc::Receiver>, pool: rayon_core::ThreadPool, currently_compressing_count: usize, @@ -379,7 +380,7 @@ impl<'w, W> ParallelBlocksCompressor<'w, W> where W: 'w + ChunksWriter { }; let max_threads = pool.current_num_threads().max(1).min(chunks_writer.total_chunks_count()) + 2; // ca one block for each thread at all times - let (send, recv) = flume::unbounded(); // TODO bounded channel simplifies logic? + let (send, recv) = mpsc::channel(); // TODO bounded channel simplifies logic? Some(Self { sorted_writer: SortedBlocksWriter::new(meta, chunks_writer),