Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move IsStream type class definition out of the low level StreamK module #1223

Merged
merged 6 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .hlint.ignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
src/Streamly/Internal/Data/Stream/Prelude.hs
src/Streamly/Internal/Data/Stream/PreludeCommon.hs
src/Streamly/Internal/Data/Stream/Serial.hs
src/Streamly/Internal/Data/Stream/Zip.hs
src/Streamly/Internal/Data/Stream/StreamK/Type.hs
Expand All @@ -6,7 +8,10 @@ src/Streamly/Internal/Data/Pipe/Type.hs
src/Streamly/Internal/Data/SmallArray/Type.hs
src/Streamly/Internal/Unicode/Stream.hs
src/Streamly/Internal/Data/Array/Prim/Type.hs
src/Streamly/Internal/Data/Array/Prim/Mut/Type.hs
src/Streamly/Internal/Data/Array/Prim/MutTypesInclude.hs
src/Streamly/Internal/Data/Array/Prim/Pinned/Mut/Type.hs
src/Streamly/Internal/Data/Array/Prim/Pinned/Type.hs
src/Streamly/Internal/Data/Array/Prim/TypesInclude.hs
test/Streamly/Test/Common/Array.hs
test/Streamly/Test/Data/Array.hs
Expand Down
43 changes: 27 additions & 16 deletions benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import qualified Prelude as P
import qualified Data.List as List

import qualified Streamly.Internal.Control.Concurrent as S
import qualified Streamly.Internal.Data.Stream.StreamK.Type as S
import qualified Streamly.Internal.Data.Stream.StreamK as S

import Gauge (bench, nfIO, bgroup, Benchmark, defaultMain)
Expand Down Expand Up @@ -127,7 +128,7 @@ unfoldr streamLen n = S.unfoldr step n

{-# INLINE unfoldrM #-}
unfoldrM :: S.MonadAsync m => Int -> Int -> Stream m Int
unfoldrM streamLen n = S.unfoldrM step n
unfoldrM streamLen n = S.unfoldrMWith S.consM step n
where
step cnt =
if cnt > n + streamLen
Expand All @@ -148,7 +149,7 @@ replicate = S.replicate

{-# INLINE replicateM #-}
replicateM :: S.MonadAsync m => Int -> Int -> Stream m Int
replicateM streamLen = S.replicateM streamLen . return
replicateM streamLen = S.replicateMWith S.consM streamLen . return

{-# INLINE iterate #-}
iterate :: Int -> Int -> Stream m Int
Expand All @@ -168,8 +169,11 @@ fromFoldableM streamLen n =
Prelude.foldr S.consM S.nil (Prelude.fmap return [n..n+streamLen])

{-# INLINABLE concatMapFoldableWith #-}
concatMapFoldableWith :: (S.IsStream t, Foldable f)
=> (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
concatMapFoldableWith :: Foldable f
=> (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b)
-> f a
-> Stream m b
concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil

{-# INLINE concatMapFoldableSerial #-}
Expand Down Expand Up @@ -202,13 +206,13 @@ uncons s = do
Just (_, t) -> uncons t

{-# INLINE init #-}
init :: (Monad m, S.IsStream t) => t m a -> m ()
init :: Monad m => Stream m a -> m ()
init s = do
t <- S.init s
P.mapM_ S.drain t

{-# INLINE tail #-}
tail :: (Monad m, S.IsStream t) => t m a -> m ()
tail :: Monad m => Stream m a -> m ()
tail s = S.tail s >>= P.mapM_ tail

{-# INLINE nullTail #-}
Expand Down Expand Up @@ -265,7 +269,7 @@ fmapK n = composeN n $ P.fmap (+ 1)

{-# INLINE mapM #-}
mapM :: S.MonadAsync m => Int -> Stream m Int -> m ()
mapM n = composeN n $ S.mapM return
mapM n = composeN n $ S.mapMWith S.consM return

{-# INLINE mapMSerial #-}
mapMSerial :: S.MonadAsync m => Int -> Stream m Int -> m ()
Expand Down Expand Up @@ -339,7 +343,8 @@ iterateSource iterStreamLen g i n = f i (unfoldrM iterStreamLen n)
-- this is quadratic
{-# INLINE iterateScan #-}
iterateScan :: S.MonadAsync m => Int -> Int -> Int -> Stream m Int
iterateScan iterStreamLen maxIters = iterateSource iterStreamLen (S.scanl' (+) 0) (maxIters `div` 10)
iterateScan iterStreamLen maxIters =
iterateSource iterStreamLen (S.scanl' (+) 0) (maxIters `div` 10)

-- this is quadratic
{-# INLINE iterateDropWhileFalse #-}
Expand All @@ -349,23 +354,27 @@ iterateDropWhileFalse streamLen iterStreamLen maxIters =

{-# INLINE iterateMapM #-}
iterateMapM :: S.MonadAsync m => Int -> Int -> Int -> Stream m Int
iterateMapM iterStreamLen maxIters = iterateSource iterStreamLen (S.mapM return) maxIters
iterateMapM iterStreamLen =
iterateSource iterStreamLen (S.mapMWith S.consM return)

{-# INLINE iterateFilterEven #-}
iterateFilterEven :: S.MonadAsync m => Int -> Int -> Int -> Stream m Int
iterateFilterEven iterStreamLen maxIters = iterateSource iterStreamLen (S.filter even) maxIters
iterateFilterEven iterStreamLen = iterateSource iterStreamLen (S.filter even)

{-# INLINE iterateTakeAll #-}
iterateTakeAll :: S.MonadAsync m => Int -> Int -> Int -> Int -> Stream m Int
iterateTakeAll streamLen iterStreamLen maxIters = iterateSource iterStreamLen (S.take streamLen) maxIters
iterateTakeAll streamLen iterStreamLen =
iterateSource iterStreamLen (S.take streamLen)

{-# INLINE iterateDropOne #-}
iterateDropOne :: S.MonadAsync m => Int -> Int -> Int -> Stream m Int
iterateDropOne iterStreamLen maxIters = iterateSource iterStreamLen (S.drop 1) maxIters
iterateDropOne iterStreamLen = iterateSource iterStreamLen (S.drop 1)

{-# INLINE iterateDropWhileTrue #-}
iterateDropWhileTrue :: S.MonadAsync m => Int -> Int -> Int -> Int -> Stream m Int
iterateDropWhileTrue streamLen iterStreamLen maxIters = iterateSource iterStreamLen (S.dropWhile (<= streamLen)) maxIters
iterateDropWhileTrue :: S.MonadAsync m =>
Int -> Int -> Int -> Int -> Stream m Int
iterateDropWhileTrue streamLen iterStreamLen =
iterateSource iterStreamLen (S.dropWhile (<= streamLen))

-------------------------------------------------------------------------------
-- Zipping
Expand Down Expand Up @@ -471,7 +480,7 @@ sourceConcatMapId val n =
{-# INLINE concatMapBySerial #-}
concatMapBySerial :: Int -> Int -> Int -> IO ()
concatMapBySerial outer inner n =
S.drain $ S.concatMapBy S.serial
S.drain $ S.concatMapWith S.serial
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are concatMapWith and concatMapBy same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concatMapBy is older name, later renamed to concatMapWith.

(unfoldrM inner)
(unfoldrM outer n)

Expand Down Expand Up @@ -681,7 +690,9 @@ o_1_space_concat streamLen =

-- This is for comparison with concatMapFoldableWith
, benchIOSrc1 "concatMapWithId (n of 1) (fromFoldable)"
(S.drain . S.concatMapBy S.serial id . sourceConcatMapId streamLen)
(S.drain
. S.concatMapWith S.serial id
. sourceConcatMapId streamLen)

, benchIOSrc1 "concatMapBy serial (n of 1)"
(concatMapBySerial streamLen 1)
Expand Down
4 changes: 2 additions & 2 deletions benchmark/Streamly/Benchmark/Prelude/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Prelude hiding (mapM)

import Streamly.Prelude (fromAsync, async, maxBuffer, maxThreads, fromSerial)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.StreamK.Type as Internal
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Transform

import Streamly.Benchmark.Common
import Streamly.Benchmark.Prelude
Expand Down Expand Up @@ -48,7 +48,7 @@ foldrSShared :: Int -> Int -> IO ()
foldrSShared count n =
S.drain
$ fromAsync
$ Internal.foldrSShared (\x xs -> S.consM (return x) xs) S.nil
$ Transform.foldrSShared (\x xs -> S.consM (return x) xs) S.nil
$ fromSerial
$ sourceUnfoldrM count n

Expand Down
3 changes: 1 addition & 2 deletions benchmark/Streamly/Benchmark/Prelude/Parallel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import Streamly.Prelude

import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.IsStream as Internal

import Streamly.Benchmark.Common
Expand Down Expand Up @@ -72,7 +71,7 @@ parAppSum src = (S.sum S.|$. src) >>= \x -> seq x (return ())

{-# INLINE tapAsyncS #-}
tapAsyncS :: S.MonadAsync m => Int -> SerialT m Int -> m ()
tapAsyncS n = composeN n $ Par.tapAsync S.sum
tapAsyncS n = composeN n $ Internal.tapAsyncK S.sum

{-# INLINE tapAsync #-}
tapAsync :: S.MonadAsync m => Int -> SerialT m Int -> m ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ iterateStateT n = do
else return x

{-# INLINE iterateState #-}
{-# SPECIALIZE iterateState :: Int -> SerialT (StateT Int IO) Int #-}
iterateState ::
(S.MonadAsync m, MonadState Int m)
=> Int
Expand Down
4 changes: 3 additions & 1 deletion benchmark/lib/Streamly/Benchmark/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import qualified Data.Foldable as F
import qualified Data.List as List
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Pipe as Pipe
import qualified Streamly.Internal.Data.Stream.Serial as Serial

Expand Down Expand Up @@ -89,7 +90,8 @@ sourceUnfoldrM count start = S.unfoldrM step start

{-# INLINE sourceUnfoldrMSerial #-}
sourceUnfoldrMSerial :: (S.IsStream t, Monad m) => Int -> Int -> t m Int
sourceUnfoldrMSerial count start = Serial.unfoldrM step start
sourceUnfoldrMSerial count start =
IsStream.fromSerial $ Serial.unfoldrM step start
where
step cnt =
if cnt > start + count
Expand Down
1 change: 1 addition & 0 deletions bin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ test_exe_rts_opts () {
Data.Array.Foreign) echo -n "-M128M" ;;
Data.Array.Prim) echo -n "-M128M" ;;
Data.Array.Prim.Pinned) echo -n "-M128M" ;;
Prelude.Rate) echo -n "-M512M" ;;
# For -O0 case writeChunks test fails, maybe we should have a separate flag
# for O0 case?
FileSystem.Handle) echo -n "-K16M" ;;
Expand Down
8 changes: 8 additions & 0 deletions hie.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@ cradle:
component: "bench:Data.Parser.ParserD"
- path: "./benchmark/Streamly/Benchmark/Data/Parser/ParserK.hs"
component: "bench:Data.Parser.ParserK"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs"
component: "bench:Data.Stream.StreamK"
- path: "./benchmark/Streamly/Benchmark/FileSystem/Handle/Read.hs"
component: "bench:FileSystem.Handle"
- path: "./benchmark/Streamly/Benchmark/FileSystem/Handle/ReadWrite.hs"
component: "bench:FileSystem.Handle"
- path: "./benchmark/Streamly/Benchmark/Prelude/Ahead.hs"
component: "bench:Prelude.Ahead"
- path: "./benchmark/Streamly/Benchmark/Prelude/Async.hs"
component: "bench:Prelude.Async"
- path: "./benchmark/Streamly/Benchmark/Prelude/Parallel.hs"
component: "bench:Prelude.Parallel"
- path: "./benchmark/Streamly/Benchmark/Prelude/Serial.hs"
component: "bench:Prelude.Serial"
- path: "./benchmark/Streamly/Benchmark/Prelude/Serial/Elimination.hs"
Expand Down
25 changes: 12 additions & 13 deletions src/Streamly.hs
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,16 @@ import Data.Semigroup (Semigroup(..))
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.SVar (Rate(..))
import Streamly.Internal.Data.Stream.Ahead
import Streamly.Internal.Data.Stream.Async hiding (mkAsync)
import Streamly.Internal.Data.Stream.Async
import Streamly.Internal.Data.Stream.IsStream.Combinators
import Streamly.Internal.Data.Stream.Parallel
import Streamly.Internal.Data.Stream.Serial
import Streamly.Internal.Data.Stream.StreamK hiding (serial)
import Streamly.Internal.Data.Stream.IsStream.Expand
import Streamly.Internal.Data.Stream.IsStream.Type
import Streamly.Internal.Data.Stream.Serial (StreamT, InterleavedT)
import Streamly.Internal.Data.Stream.Zip

import qualified Streamly.Prelude as P
import qualified Streamly.Internal.Data.Stream.IsStream as IP
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.Async as Async
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Transform

-- XXX provide good succinct examples of pipelining, merging, splitting etc.
-- below.
Expand Down Expand Up @@ -364,7 +363,7 @@ import qualified Streamly.Internal.Data.Stream.Async as Async
-- @since 0.1.0
{-# DEPRECATED runStreaming "Please use runStream instead." #-}
runStreaming :: (Monad m, IsStream t) => t m a -> m ()
runStreaming = P.drain . K.adapt
runStreaming = P.drain . adapt

-- | Same as @runStream@.
--
Expand All @@ -384,35 +383,35 @@ runStream = P.drain
-- @since 0.1.0
{-# DEPRECATED runInterleavedT "Please use 'drain . interleaving' instead." #-}
runInterleavedT :: Monad m => WSerialT m a -> m ()
runInterleavedT = P.drain . K.adapt
runInterleavedT = P.drain . adapt

-- | Same as @drain . fromParallel@.
--
-- @since 0.1.0
{-# DEPRECATED runParallelT "Please use 'drain . fromParallel' instead." #-}
runParallelT :: Monad m => ParallelT m a -> m ()
runParallelT = P.drain . K.adapt
runParallelT = P.drain . adapt

-- | Same as @drain . fromAsync@.
--
-- @since 0.1.0
{-# DEPRECATED runAsyncT "Please use 'drain . fromAsync' instead." #-}
runAsyncT :: Monad m => AsyncT m a -> m ()
runAsyncT = P.drain . K.adapt
runAsyncT = P.drain . adapt

-- | Same as @drain . zipping@.
--
-- @since 0.1.0
{-# DEPRECATED runZipStream "Please use 'drain . fromZipSerial instead." #-}
runZipStream :: Monad m => ZipSerialM m a -> m ()
runZipStream = P.drain . K.adapt
runZipStream = P.drain . adapt

-- | Same as @drain . zippingAsync@.
--
-- @since 0.1.0
{-# DEPRECATED runZipAsync "Please use 'drain . fromZipAsync instead." #-}
runZipAsync :: Monad m => ZipAsyncM m a -> m ()
runZipAsync = P.drain . K.adapt
runZipAsync = P.drain . adapt

{-
-- | Same as "Streamly.Prelude.foldWith".
Expand Down Expand Up @@ -451,7 +450,7 @@ forEachWith = P.forEachWith
-- @since 0.2.0
{-# INLINABLE mkAsync #-}
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
mkAsync = return . Async.mkAsync
mkAsync = return . Transform.mkAsync

------------------------------------------------------------------------------
-- Documentation
Expand Down
17 changes: 8 additions & 9 deletions src/Streamly/Internal/Data/Array.hs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ import GHC.Base (Int(..))
import GHC.IO (unsafePerformIO)

import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Unfold.Type (Unfold(..))

Expand Down Expand Up @@ -131,13 +130,13 @@ fromStreamD = D.fold write

{-# INLINE fromStreamN #-}
fromStreamN :: MonadIO m => Int -> SerialT m a -> m (Array a)
fromStreamN n m = do
fromStreamN n (SerialT m) = do
when (n < 0) $ error "fromStreamN: negative write count specified"
fromStreamDN n $ D.toStreamD m
fromStreamDN n $ D.fromStreamK m

{-# INLINE fromStream #-}
fromStream :: MonadIO m => SerialT m a -> m (Array a)
fromStream m = fromStreamD $ D.toStreamD m
fromStream (SerialT m) = fromStreamD $ D.fromStreamK m

{-# INLINABLE fromListN #-}
fromListN :: Int -> [a] -> Array a
Expand Down Expand Up @@ -196,12 +195,12 @@ toStreamDRev arr = D.Stream step (length arr - 1)
(# x #) -> D.Yield x (I# i - 1)

{-# INLINE_EARLY toStream #-}
toStream :: (Monad m, IsStream t) => Array a -> t m a
toStream = D.fromStreamD . toStreamD
toStream :: Monad m => Array a -> SerialT m a
toStream = SerialT . D.toStreamK . toStreamD

{-# INLINE_EARLY toStreamRev #-}
toStreamRev :: (Monad m, IsStream t) => Array a -> t m a
toStreamRev = D.fromStreamD . toStreamDRev
toStreamRev :: Monad m => Array a -> SerialT m a
toStreamRev = SerialT . D.toStreamK . toStreamDRev

-------------------------------------------------------------------------------
-- Elimination - using Folds
Expand Down
Loading