Skip to content

Commit

Permalink
Implement differenceBySorted API
Browse files Browse the repository at this point in the history
Fix after rebase

Remove UnionBySorted

fix Werror

Add benchmarek

Fix review comments
  • Loading branch information
rnjtranjan committed Mar 14, 2022
1 parent 5e7fb87 commit a9e4a96
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 6 deletions.
5 changes: 5 additions & 0 deletions benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ o_n_heap_buffering value =
$ joinWith Internal.intersectBy sqrtVal
, benchIOSrc1 "intersectBySorted"
$ joinMapWith (Internal.intersectBySorted compare) halfVal
-- XXX It hangs forever
--, benchIOSrc1 "differenceBy"
-- $ joinMapWith (Internal.differenceBy (==)) halfVal
, benchIOSrc1 "differenceBySorted"
$ joinMapWith (Internal.differenceBySorted compare) sqrtVal
]
]

Expand Down
84 changes: 84 additions & 0 deletions core/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ module Streamly.Internal.Data.Stream.StreamD.Nesting
, splitInnerBy
, splitInnerBySuffix
, intersectBySorted
, differenceBySorted
)
where

Expand Down Expand Up @@ -487,6 +488,25 @@ mergeBy cmp = mergeByM (\a b -> return $ cmp a b)
-- Intersection of sorted streams
-------------------------------------------------------------------------------

data StreamEmptyNess =
LeftEmpty
| RightEmpty
| BothEmpty
| NoneEmpty
deriving (Eq, Show)

data RunOrder =
LeftRun
| RightRun
| CompareRun
| CompareDupRun
| FastFarwardRun
| RightDupRun
| BuffPrepare
| BuffPair
| BuffReset
deriving (Eq, Show)

-- Assuming the streams are sorted in ascending order
{-# INLINE_NORMAL intersectBySorted #-}
intersectBySorted :: Monad m
Expand Down Expand Up @@ -2466,3 +2486,67 @@ splitInnerBySuffix splitter joiner (Stream step1 state1) =

step _ (SplitYielding x next) = return $ Yield x next
step _ SplitFinishing = return Stop

-------------------------------------------------------------------------------
-- Difference of sorted streams -----------------------------------------------
-------------------------------------------------------------------------------
{-# INLINE_NORMAL differenceBySorted #-}
differenceBySorted :: (Monad m) =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
differenceBySorted cmp (Stream stepa ta) (Stream stepb tb) =
Stream step (Just ta, Just tb, Nothing, Nothing, Nothing)

where
{-# INLINE_LATE step #-}

-- one of the values is missing, and the corresponding stream is running
step gst (Just sa, sb, Nothing, b, Nothing) = do
r <- stepa gst sa
return $ case r of
Yield a sa' -> Skip (Just sa', sb, Just a, b, Nothing)
Skip sa' -> Skip (Just sa', sb, Nothing, b, Nothing)
Stop -> Skip (Nothing, sb, Nothing, b, Nothing)

step gst (sa, Just sb, a, Nothing, Nothing) = do
r <- stepb gst sb
return $ case r of
Yield b sb' -> Skip (sa, Just sb', a, Just b, Nothing)
Skip sb' -> Skip (sa, Just sb', a, Nothing, Nothing)
Stop -> Skip (sa, Nothing, a, Nothing, Nothing)

-- Matching element
step gst (Just sa, Just sb, Nothing, _, Just _) = do
r1 <- stepa gst sa
r2 <- stepb gst sb
return $ case r1 of
Yield a sa' ->
case r2 of
Yield c sb' ->
Skip (Just sa', Just sb', Just a, Just c, Nothing)
Skip sb' ->
Skip (Just sa', Just sb', Just a, Just a, Nothing)
Stop ->
Yield a (Just sa', Just sb, Nothing, Nothing, Just a)
Skip sa' ->
case r2 of
Yield c sb' ->
Skip (Just sa', Just sb', Just c, Just c, Nothing)
Skip sb' ->
Skip (Just sa', Just sb', Nothing, Nothing, Nothing)
Stop ->
Stop
Stop ->
Stop

-- both the values are available
step _ (sa, sb, Just a, Just b, Nothing) = do
let res = cmp a b
return $ case res of
GT -> Skip (sa, sb, Just a, Nothing, Nothing)
LT -> Yield a (sa, sb, Nothing, Just b, Nothing)
EQ -> Skip (sa, sb, Nothing, Just b, Just b)

-- one of the values is missing, corresponding stream is done
step _ (sa, Nothing, Just a, Nothing, Nothing) =
return $ Yield a (sa, Nothing, Nothing, Nothing , Nothing)
step _ (_, _, _, _, _) = return Stop
13 changes: 8 additions & 5 deletions src/Streamly/Internal/Data/Stream/IsStream/Top.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module Streamly.Internal.Data.Stream.IsStream.Top
, intersectBy
, intersectBySorted
, differenceBy
, mergeDifferenceBy
, differenceBySorted
, unionBy
, mergeUnionBy

Expand Down Expand Up @@ -632,11 +632,14 @@ differenceBy eq s1 s2 =
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy :: -- (IsStream t, Monad m) =>
-- /Pre-release/
{-# INLINE differenceBySorted #-}
differenceBySorted :: (IsStream t, MonadIO m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy _eq _s1 _s2 = undefined
differenceBySorted eq s1 =
IsStream.fromStreamD
. StreamD.differenceBySorted eq (IsStream.toStreamD s1)
. IsStream.toStreamD

-- | This is essentially an append operation that appends all the extra
-- occurrences of elements from the second stream that are not already present
Expand Down
22 changes: 21 additions & 1 deletion test/Streamly/Test/Prelude/Top.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Main (main) where

import Data.List (elem, intersect, nub, sort)
import Data.List (elem, intersect, nub, sort, (\\))
import Data.Maybe (isNothing)
import Streamly.Prelude (SerialT)
import Test.QuickCheck
Expand Down Expand Up @@ -196,6 +196,25 @@ intersectBy srt intersectFunc cmp =
let v2 = ls0 `intersect` ls1
assert (sort v1 == sort v2)

differenceBySorted :: Property
differenceBySorted =
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 ->
monadicIO $ action (sort ls0) (sort ls1)

where

action ls0 ls1 = do
v1 <-
run
$ S.toList
$ Top.differenceBySorted
compare
(S.fromList ls0)
(S.fromList ls1)
let v2 = ls0 \\ ls1
assert (v1 == sort v2)

-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
Expand All @@ -219,3 +238,4 @@ main = hspec $ do
(intersectBy id Top.intersectBy (==))
prop "intersectBySorted"
(intersectBy sort Top.intersectBySorted compare)
prop "differenceBySorted" Main.differenceBySorted
5 changes: 5 additions & 0 deletions test/streamly-tests.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ test-suite Prelude.Top
type: exitcode-stdio-1.0
main-is: Streamly/Test/Prelude/Top.hs

test-suite Data.Stream.Top
import: test-options
type: exitcode-stdio-1.0
main-is: Streamly/Test/Data/Stream/Top.hs

test-suite Prelude.WAsync
import: test-options
type: exitcode-stdio-1.0
Expand Down

0 comments on commit a9e4a96

Please sign in to comment.