Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement and test pipeline/stream support for all platforms and scala versions #1141

Draft
wants to merge 7 commits into
base: series/2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions zio-json/js/src/main/scala/java/io/BufferedWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package java.io

class BufferedWriter(out: Writer, sz: Int) extends Writer {

if (sz <= 0) throw new IllegalArgumentException("Buffer size <= 0")

def this(out: Writer) = this(out, 4096)

private val buffer: Array[Char] = new Array[Char](sz)
private var pos: Int = 0
private var closed: Boolean = false

def close(): Unit = if (!closed) {
flush()
out.close()
closed = true
}

def flush(): Unit = {
ensureOpen()
out.write(buffer, 0, pos)
out.flush()
pos = 0
}

def newLine(): Unit =
write(System.lineSeparator(), 0, System.lineSeparator().length)

override def write(c: Int): Unit =
write(Array(c.toChar), 0, 1)

override def write(s: String, off: Int, len: Int): Unit =
write(s.toCharArray, off, len)

def write(cbuf: Array[Char], off: Int, len: Int): Unit = {
ensureOpen()
val available = sz - pos
if (available >= len) {
System.arraycopy(cbuf, off, buffer, pos, len)
pos += len
if (pos == sz) flush()
} else {
write(cbuf, off, available)
write(cbuf, off + available, len - available)
}
}

private def ensureOpen(): Unit =
if (closed) throw new IOException("Stream closed")
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -115,122 +115,6 @@ object DecoderPlatformSpecificSpec extends ZIOSpecDefault {
testAst("ugh10k")
),
suite("ZIO Streams integration")(
test("decodes a stream of chars") {
for {
int <- JsonDecoder[Int].decodeJsonStream(ZStream('1', '2', '3'))
} yield {
assert(int)(equalTo(123))
}
},
test("decodes an encoded stream of bytes") {
for {
int <- JsonDecoder[Int].decodeJsonStreamInput(ZStream.fromIterable("123".getBytes(StandardCharsets.UTF_8)))
} yield assert(int)(equalTo(123))
},
suite("decodeJsonPipeline")(
suite("Newline delimited")(
test("decodes single elements") {
ZStream
.fromIterable("1001".toSeq)
.via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline))
.runCollect
.map { xs =>
assert(xs)(equalTo(Chunk(1001)))
}
},
test("decodes multiple elements") {
ZStream
.fromIterable("1001\n1002".toSeq)
.via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline))
.runCollect
.map { xs =>
assert(xs)(equalTo(Chunk(1001, 1002)))
}
},
test("decodes multiple elements when fed in smaller chunks") {
ZStream
.fromIterable("1001\n1002".toSeq)
.rechunk(1)
.via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline))
.runCollect
.map { xs =>
assert(xs)(equalTo(Chunk(1001, 1002)))
}
},
test("accepts trailing NL") {
ZStream
.fromIterable("1001\n1002\n".toSeq)
.via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline))
.runCollect
.map { xs =>
assert(xs)(equalTo(Chunk(1001, 1002)))
}
},
test("errors") {
ZStream
.fromIterable("1\nfalse\n3".toSeq)
.via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline))
.runDrain
.exit
.map { exit =>
assert(exit)(fails(anything))
}
},
test("is interruptible") {
(ZStream.fromIterable("1\n2\n3\n4") ++ ZStream.fromZIO(ZIO.interrupt))
.via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Newline))
.runDrain
.exit
.map { exit =>
assert(exit)(isInterrupted)
}
} @@ timeout(2.seconds)
),
suite("Array delimited")(
test("decodes single elements") {
ZStream
.fromIterable("[1001]".toSeq)
.via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Array))
.runCollect
.map { xs =>
assert(xs)(equalTo(Chunk(1001)))
}
},
test("empty array") {
ZStream
.fromIterable("[]".toSeq)
.via(JsonDecoder[String].decodeJsonPipeline(JsonStreamDelimiter.Array))
.runCollect
.map { xs =>
assert(xs)(isEmpty)
}
},
test("decodes multiple elements") {
ZStream
.fromIterable("[ 1001, 1002, 1003 ]".toSeq)
.via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Array))
.runCollect
.map { xs =>
assert(xs)(equalTo(Chunk(1001, 1002, 1003)))
}
},
test("handles whitespace leniently") {
val in =
"""[
1001, 1002,
1003
]"""

ZStream
.fromIterable(in.toSeq)
.via(JsonDecoder[Int].decodeJsonPipeline(JsonStreamDelimiter.Array))
.runCollect
.map { xs =>
assert(xs)(equalTo(Chunk(1001, 1002, 1003)))
}
}
)
),
suite("helpers in zio.json")(
test("readJsonLines reads from files") {
import logEvent._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,67 +24,6 @@ object EncoderPlatformSpecificSpec extends ZIOSpecDefault {
testRoundTrip[List[Tweet]]("twitter_api_response"),
testRoundTrip[GeoJSON]("che.geo")
),
suite("ZIO Streams integration")(
test("encodes into a ZStream of Char") {
val intEncoder = JsonEncoder[Int]
val value = 1234

for {
chars <- intEncoder.encodeJsonStream(value).runCollect
} yield {
assert(chars.mkString)(equalTo("1234"))
}
},
test("encodes values that yield a result of length > DefaultChunkSize") {
val longString = List.fill(ZStream.DefaultChunkSize * 2)('x').mkString

for {
chars <- JsonEncoder[String].encodeJsonStream(longString).runCollect
} yield {
assert(chars)(hasSize(equalTo(ZStream.DefaultChunkSize * 2 + 2))) &&
assert(chars.mkString(""))(equalTo("\"" ++ longString ++ "\""))
}
},
test("encodeJsonLinesPipeline") {
val ints = ZStream(1, 2, 3, 4)

for {
xs <- ints.via(JsonEncoder[Int].encodeJsonLinesPipeline).runCollect
} yield {
assert(xs.mkString)(equalTo("1\n2\n3\n4\n"))
}
},
test("encodeJsonLinesPipeline handles elements which take up > DefaultChunkSize to encode") {
val longString = List.fill(5000)('x').mkString

val ints = ZStream(longString, longString)
val encoder = JsonEncoder[String]

for {
xs <- ints.via(encoder.encodeJsonLinesPipeline).runCollect
} yield {
// leading `"`, trailing `"` and `\n` = 3
assert(xs.size)(equalTo((5000 + 3) * 2))
}
},
test("encodeJsonArrayPipeline XYZ") {
val ints = ZStream(1, 2, 3).map(n => Json.Obj(Chunk("id" -> Json.Num(BigDecimal(n).bigDecimal))))

for {
xs <- ints.via(JsonEncoder[Json].encodeJsonArrayPipeline).runCollect
} yield {
assert(xs.mkString)(equalTo("""[{"id":1},{"id":2},{"id":3}]"""))
}
},
test("encodeJsonArrayPipeline, empty stream") {
val emptyArray = ZStream
.from(List())
.via(JsonEncoder[String].encodeJsonArrayPipeline)
.run(ZSink.mkString)

assertZIO(emptyArray)(equalTo("[]"))
}
),
suite("helpers in zio.json")(
test("writeJsonLines writes JSON lines") {
val path = Files.createTempFile("log", "json")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,88 @@
package zio

trait JsonPackagePlatformSpecific {}
import zio.json.{ JsonDecoder, JsonEncoder, JsonStreamDelimiter, ast }
import zio.stream._

import java.io.{ File, IOException }
import java.net.URL
import java.nio.charset.StandardCharsets
import java.nio.file.{ Path, Paths }

trait JsonPackagePlatformSpecific {
def readJsonAs(file: File): ZStream[Any, Throwable, ast.Json] =
readJsonLinesAs[ast.Json](file)

def readJsonAs(path: Path): ZStream[Any, Throwable, ast.Json] =
readJsonLinesAs[ast.Json](path)

def readJsonAs(path: String): ZStream[Any, Throwable, ast.Json] =
readJsonLinesAs[ast.Json](path)

def readJsonAs(url: URL): ZStream[Any, Throwable, ast.Json] =
readJsonLinesAs[ast.Json](url)

def readJsonLinesAs[A: JsonDecoder](file: File): ZStream[Any, Throwable, A] =
readJsonLinesAs(file.toPath)

def readJsonLinesAs[A: JsonDecoder](path: Path): ZStream[Any, Throwable, A] =
ZStream
.fromPath(path)
.via(
ZPipeline.utf8Decode >>>
stringToChars >>>
JsonDecoder[A].decodeJsonPipeline(JsonStreamDelimiter.Newline)
)

def readJsonLinesAs[A: JsonDecoder](path: String): ZStream[Any, Throwable, A] =
readJsonLinesAs(Paths.get(path))

def readJsonLinesAs[A: JsonDecoder](url: URL): ZStream[Any, Throwable, A] = {
val scoped = ZIO
.fromAutoCloseable(ZIO.attempt(url.openStream()))
.refineToOrDie[IOException]

ZStream
.fromInputStreamScoped(scoped)
.via(
ZPipeline.utf8Decode >>>
stringToChars >>>
JsonDecoder[A].decodeJsonPipeline(JsonStreamDelimiter.Newline)
)
}

def writeJsonLines[R](file: File, stream: ZStream[R, Throwable, ast.Json]): RIO[R, Unit] =
writeJsonLinesAs(file, stream)

def writeJsonLines[R](path: Path, stream: ZStream[R, Throwable, ast.Json]): RIO[R, Unit] =
writeJsonLinesAs(path, stream)

def writeJsonLines[R](path: String, stream: ZStream[R, Throwable, ast.Json]): RIO[R, Unit] =
writeJsonLinesAs(path, stream)

def writeJsonLinesAs[R, A: JsonEncoder](file: File, stream: ZStream[R, Throwable, A]): RIO[R, Unit] =
writeJsonLinesAs(file.toPath, stream)

def writeJsonLinesAs[R, A: JsonEncoder](path: Path, stream: ZStream[R, Throwable, A]): RIO[R, Unit] =
stream
.via(
JsonEncoder[A].encodeJsonLinesPipeline >>>
charsToUtf8
)
.run(ZSink.fromPath(path))
.unit

def writeJsonLinesAs[R, A: JsonEncoder](path: String, stream: ZStream[R, Throwable, A]): RIO[R, Unit] =
writeJsonLinesAs(Paths.get(path), stream)

private def stringToChars: ZPipeline[Any, Nothing, String, Char] =
ZPipeline.mapChunks[String, Char](_.flatMap(_.toCharArray))

private def charsToUtf8: ZPipeline[Any, Nothing, Char, Byte] =
ZPipeline.mapChunksZIO[Any, Nothing, Char, Byte] { chunk =>
ZIO.succeed {
Chunk.fromArray {
new String(chunk.toArray).getBytes(StandardCharsets.UTF_8)
}
}
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.util.control.NoStackTrace
* A `JsonDecoder[A]` instance has the ability to decode JSON to values of type `A`, potentially
* failing with an error if the JSON content does not encode a value of the given type.
*/
trait JsonDecoder[A] extends JsonDecoderPlatformSpecific[A] {
trait JsonDecoder[A] extends JsonStreamDecoder[A] {
self =>

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.annotation._
import scala.collection.{ immutable, mutable }
import scala.reflect.ClassTag

trait JsonEncoder[A] extends JsonEncoderPlatformSpecific[A] {
trait JsonEncoder[A] extends JsonStreamEncoder[A] {
self =>

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import zio.stream.{ Take, ZPipeline, ZStream }
import java.nio.charset.{ Charset, StandardCharsets }
import scala.annotation.tailrec

trait JsonDecoderPlatformSpecific[A] { self: JsonDecoder[A] =>
trait JsonStreamDecoder[A] { self: JsonDecoder[A] =>

private def readAll(reader: java.io.Reader): ZIO[Any, Throwable, A] =
ZIO.attemptBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import zio.json.internal.WriteWriter
import zio.stream._
import zio.{ Chunk, Ref, Unsafe, ZIO }

trait JsonEncoderPlatformSpecific[A] { self: JsonEncoder[A] =>
trait JsonStreamEncoder[A] { self: JsonEncoder[A] =>

/**
* Encodes the specified value into a character stream.
Expand Down
Loading
Loading