Skip to content

Tweak Readable util functions #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ Breaking changes:

New features:
- Added event handlers for `Writeable` streams (#49 by @JordanMartinez)
- Added missing APIs (#51 by @JordanMartinez)
- Added missing APIs (#51, #53 by @JordanMartinez)

- readable, readableEnded, readableFlowing, readableHighWaterMark, readableLength
- pipe'
- writeable, writeableEnded, writeableCorked, errored, writeableFinished, writeableHighWaterMark, writeableLength, writeableNeedDrain
- closed, destroyed
- allowHalfOpen
- pipeline
- fromString, fromBuffer
- readableFromString, readableFromBuffer
- newPassThrough
- Integrated `node-streams-aff` into library (#52 by @JordanMartinez)

Expand All @@ -68,6 +68,11 @@ New features:
- fromStringUTF8

The only APIs from the library not added were `newReadable` and `push`.
- Added convenience API for converting `Readable` to `String` or `Buffer` (#53 by @JordanMartinez)

- `readableToStringUtf8`
- `readableToString`
- `readableToBuffers`

Bugfixes:
- Drop misleading comment for `setEncoding` (#51 by @JordanMartinez)
Expand Down
2 changes: 1 addition & 1 deletion src/Node/Stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export const allowHalfOpenImpl = (d) => d.allowHalfOpen;

export const pipelineImpl = (src, transforms, dst, cb) => stream.pipeline([src, ...transforms, dst], cb);

export const readableFromStrImpl = (str) => stream.Readable.from(str, { objectMode: false });
export const readableFromStrImpl = (str, encoding) => stream.Readable.from(str, { encoding, objectMode: false });

export const readableFromBufImpl = (buf) => stream.Readable.from(buf, { objectMode: false });

Expand Down
14 changes: 7 additions & 7 deletions src/Node/Stream.purs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ module Node.Stream
, destroyed
, allowHalfOpen
, pipeline
, fromString
, fromBuffer
, readableFromString
, readableFromBuffer
, newPassThrough
) where

Expand Down Expand Up @@ -493,13 +493,13 @@ pipeline src transforms dest cb = runEffectFn4 pipelineImpl src transforms dest

foreign import pipelineImpl :: forall w r. EffectFn4 (Readable w) (Array Duplex) (Writable r) ((Error -> Effect Unit)) (Unit)

fromString :: String -> Effect (Readable ())
fromString str = runEffectFn1 readableFromStrImpl str
readableFromString :: String -> Encoding -> Effect (Readable ())
readableFromString str enc = runEffectFn2 readableFromStrImpl str (encodingToNode enc)

foreign import readableFromStrImpl :: EffectFn1 (String) (Readable ())
foreign import readableFromStrImpl :: EffectFn2 (String) (String) (Readable ())

fromBuffer :: Buffer -> Effect (Readable ())
fromBuffer buf = runEffectFn1 readableFromBufImpl buf
readableFromBuffer :: Buffer -> Effect (Readable ())
readableFromBuffer buf = runEffectFn1 readableFromBufImpl buf

foreign import readableFromBufImpl :: EffectFn1 (Buffer) (Readable ())

Expand Down
76 changes: 72 additions & 4 deletions src/Node/Stream/Aff.purs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
-- |
-- | #### Implementation
-- |
-- | The reading functions in this module all operate on a `Readable` stream
-- | The `read*` functions (not to be confused with the `readable*` functions)
-- | in this module all operate on a `Readable` stream
-- | in
-- | [“paused mode”](https://nodejs.org/docs/latest/api/stream.html#stream_two_reading_modes).
-- |
Expand Down Expand Up @@ -80,7 +81,10 @@
-- |
-- | If a write fails then it will `throwError` in the `Aff`.
module Node.Stream.Aff
( readSome
( readableToStringUtf8
, readableToString
, readableToBuffers
, readSome
, readAll
, readN
, write
Expand All @@ -105,11 +109,75 @@ import Effect.Exception (catchException)
import Effect.Ref as Ref
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding(..))
import Node.Encoding as Encoding
import Node.EventEmitter (once)
import Node.Stream (Readable, Writable, closeH, drainH, endH, errorH, readable, readableH)
import Node.EventEmitter (on, once)
import Node.Stream (Readable, Writable, closeH, dataH, drainH, endH, errorH, readable, readableH)
import Node.Stream as Stream

-- | Works on streams in "flowing" mode.
-- | Reads all of the stream's contents into a buffer
-- | and converts the result into a UTF8-encoded String.
readableToStringUtf8
:: forall m w
. MonadAff m
=> Readable w
-> m String
readableToStringUtf8 r = readableToString r UTF8

-- | Works on streams in "flowing" mode.
-- | Reads all of the stream's contents into a buffer
-- | and converts the result into a String using the provided encoding.
readableToString
:: forall m w
. MonadAff m
=> Readable w
-> Encoding
-> m String
readableToString r enc = do
bufs <- readableToBuffers r
liftEffect $ Buffer.toString enc =<< Buffer.concat bufs

-- | Works on streams in "flowing" mode.
-- | Reads all of the stream's buffered contents into an array.
readableToBuffers
:: forall m w
. MonadAff m
=> Readable w
-> m (Array Buffer)
readableToBuffers r = liftAff $ makeAff \complete -> do
bufs <- liftST $ Array.ST.new
dataRef <- Ref.new (mempty :: Effect Unit)
let removeData = join $ Ref.read dataRef

removeError <- r # once errorH \err -> do
removeData
complete $ Left err

removeClose <- r # once closeH do
-- Don't error, instead return whatever we've read.
removeError
removeData
result <- liftST $ Array.ST.unsafeFreeze bufs
complete $ Right result

removeEnd <- r # once endH do
removeError
removeClose
removeData
result <- liftST $ Array.ST.unsafeFreeze bufs
complete $ Right result

rmData <- r # on dataH \buf ->
void $ liftST $ Array.ST.push buf bufs

Ref.write rmData dataRef
pure $ effectCanceler do
removeError
removeClose
removeEnd
removeData

-- | Works on streams in "paused" mode.
-- | Wait until there is some data available from the stream, then read it.
-- |
Expand Down
9 changes: 5 additions & 4 deletions test/Main1.purs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import Effect.Aff (Aff, Milliseconds(..), launchAff_)
import Effect.Class (liftEffect)
import Node.Buffer (Buffer, concat)
import Node.Buffer as Buffer
import Node.Encoding (Encoding(..))
import Node.Stream (Readable, Writable, destroy, newPassThrough)
import Node.Stream as Stream
import Node.Stream.Aff (end, fromStringUTF8, readAll, readN, readSome, toStringUTF8, write)
Expand Down Expand Up @@ -51,19 +52,19 @@ main = unsafePartial $ do
, void $ readSome s
]
it "reads from a zero-length Readable" do
r <- liftEffect $ Stream.fromString ""
r <- liftEffect $ Stream.readableFromString "" UTF8
-- readSome should return readagain false
shouldEqual { buffers: "", readagain: true } =<< toStringBuffers =<< readSome r
shouldEqual "" =<< toStringUTF8 =<< readAll r
shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN r 0
it "readN cleans up event handlers" do
s <- liftEffect $ Stream.fromString ""
s <- liftEffect $ Stream.readableFromString "" UTF8
for_ (0 .. 100) \_ -> void $ readN s 0
it "readSome cleans up event handlers" do
s <- liftEffect $ Stream.fromString ""
s <- liftEffect $ Stream.readableFromString "" UTF8
for_ (0 .. 100) \_ -> void $ readSome s
it "readAll cleans up event handlers" do
s <- liftEffect $ Stream.fromString ""
s <- liftEffect $ Stream.readableFromString "" UTF8
for_ (0 .. 100) \_ -> void $ readAll s
it "write cleans up event handlers" do
s <- liftEffect $ newPassThrough
Expand Down