Skip to content

Commit

Permalink
Add a finalizer action in Fold type
Browse files Browse the repository at this point in the history
This was needed especially for concurrent fold combinators. A fold
combinator that uses concurrent folds needs to wait for the concurrent
folds to finish before it can finish. The finalizing action in folds can
deallocate any resources allocated by the "initial" action and also wait
for folds that it has initialized.

This complicates fold combinators in general. We can potentially
introduce a type for non-failing parsers and support finalization only
in those. The current use cases can be covered by that. Parsers do not
support scanning, which is not required in the use cases where we need
finalization (there is no known use case).
  • Loading branch information
harendra-kumar committed Nov 18, 2023
1 parent 0ed37ff commit 32390c5
Show file tree
Hide file tree
Showing 36 changed files with 762 additions and 477 deletions.
4 changes: 2 additions & 2 deletions benchmark/Streamly/Benchmark/Data/Parser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ split_ value =
-- PR.dropWhile (<= (value * 1 `div` 4)) *> PR.die "alt"
{-# INLINE takeWhileFail #-}
takeWhileFail :: Monad m => (a -> Bool) -> Fold m a b -> Parser a m b
takeWhileFail predicate (Fold fstep finitial fextract) =
takeWhileFail predicate (Fold fstep finitial _ ffinal) =
Parser step initial extract

where
Expand All @@ -369,7 +369,7 @@ takeWhileFail predicate (Fold fstep finitial fextract) =
Fold.Done b -> Done 0 b
else return $ Error "fail"

extract s = fmap (Done 0) (fextract s)
extract s = fmap (Done 0) (ffinal s)

{-# INLINE alt2 #-}
alt2 :: Monad m
Expand Down
4 changes: 2 additions & 2 deletions benchmark/Streamly/Benchmark/Data/ParserK.hs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ sequence_ value =

{-# INLINE takeWhileFailD #-}
takeWhileFailD :: Monad m => (a -> Bool) -> Fold m a b -> Parser a m b
takeWhileFailD predicate (Fold fstep finitial fextract) =
takeWhileFailD predicate (Fold fstep finitial _ ffinal) =
Parser step initial extract

where
Expand All @@ -234,7 +234,7 @@ takeWhileFailD predicate (Fold fstep finitial fextract) =
Fold.Done b -> Done 0 b
else return $ Error "fail"

extract s = fmap (Done 0) (fextract s)
extract s = fmap (Done 0) (ffinal s)

{-# INLINE takeWhileFail #-}
takeWhileFail :: CONSTRAINT =>
Expand Down
19 changes: 18 additions & 1 deletion benchmark/Streamly/Benchmark/Unicode/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}

#undef FUSION_CHECK
#ifdef FUSION_CHECK
{-# OPTIONS_GHC -ddump-simpl -ddump-to-file -dsuppress-all #-}
#endif

#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif
Expand Down Expand Up @@ -280,7 +285,8 @@ _copyStreamUtf8'Fold :: Handle -> Handle -> IO ()
_copyStreamUtf8'Fold inh outh =
Stream.fold (Handle.write outh)
$ Unicode.encodeUtf8
$ Stream.foldMany Unicode.writeCharUtf8'
$ Stream.catRights
$ Stream.parseMany Unicode.writeCharUtf8'
$ Stream.unfold Handle.reader inh

{-# NOINLINE _copyStreamUtf8Parser #-}
Expand Down Expand Up @@ -317,6 +323,7 @@ o_1_space_decode_encode_read env =

main :: IO ()
main = do
#ifndef FUSION_CHECK
env <- mkHandleBenchEnv
defaultMain (allBenchmarks env)

Expand All @@ -329,3 +336,13 @@ main = do
, o_1_space_decode_encode_read env
]
]
#else
-- Enable FUSION_CHECK macro at the beginning of the file
-- Enable one benchmark below, and run the benchmark
-- Check the .dump-simpl output
env <- mkHandleBenchEnv
let mkHandles (RefHandles {bigInH = inh, outputH = outh}) = Handles inh outh
(Handles inh outh) <- getHandles env mkHandles
copyStreamLatin1' inh outh
return ()
#endif
2 changes: 2 additions & 0 deletions benchmark/lib/Streamly/Benchmark/Common/Handle.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ module Streamly.Benchmark.Common.Handle
, isSpace
, isSp
, mkHandleBenchEnv
, Handles(..)
, getHandles
)
where

Expand Down
2 changes: 1 addition & 1 deletion core/src/Streamly/Internal/Data/Array.hs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ writeLastN ::
(Storable a, Unbox a, MonadIO m) => Int -> Fold m a (Array a)
writeLastN n
| n <= 0 = fmap (const mempty) FL.drain
| otherwise = A.unsafeFreeze <$> Fold step initial done
| otherwise = A.unsafeFreeze <$> Fold step initial done done

where

Expand Down
3 changes: 2 additions & 1 deletion core/src/Streamly/Internal/Data/Array/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,11 @@ pinnedWrite = fmap unsafeFreeze MA.pinnedWrite
--
{-# INLINE unsafeMakePure #-}
unsafeMakePure :: Monad m => Fold IO a b -> Fold m a b
unsafeMakePure (Fold step initial extract) =
unsafeMakePure (Fold step initial extract final) =
Fold (\x a -> return $! unsafeInlineIO (step x a))
(return $! unsafePerformIO initial)
(\s -> return $! unsafeInlineIO $ extract s)
(\s -> return $! unsafeInlineIO $ final s)

-- | Convert a pure stream in Identity monad to an immutable array.
--
Expand Down
6 changes: 4 additions & 2 deletions core/src/Streamly/Internal/Data/Fold/Chunked.hs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ newtype ChunkFold m a b = ChunkFold (ParserD.Parser (Array a) m b)
{-# INLINE fromFold #-}
fromFold :: forall m a b. (MonadIO m, Unbox a) =>
Fold.Fold m a b -> ChunkFold m a b
fromFold (Fold.Fold fstep finitial fextract) =
ChunkFold (ParserD.Parser step initial (fmap (Done 0) . fextract))
fromFold (Fold.Fold fstep finitial _ ffinal) =
ChunkFold (ParserD.Parser step initial extract)

where

Expand Down Expand Up @@ -134,6 +134,8 @@ fromFold (Fold.Fold fstep finitial fextract) =
Fold.Partial fs1 ->
goArray SPEC next fs1

extract = fmap (Done 0) . ffinal

-- | Convert an element 'ParserD.Parser' into an array stream fold. If the
-- parser fails the fold would throw an exception.
--
Expand Down
Loading

0 comments on commit 32390c5

Please sign in to comment.