Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve MessagePack model #618

Merged
merged 8 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,18 @@ private[internal] object FormatParsers {
length match {
case 4 =>
requireBytes(4, ctx).map { res =>
res.accumulate(v => MsgpackItem.Timestamp32(v))
res.accumulate(v => MsgpackItem.Timestamp32(v.toInt(false)))
}
case 8 =>
requireBytes(8, ctx).map { res =>
val result = res.result
val seconds = result & hex"00000003ffffffff"
val nanosec = result >> 34

res.toContext.prepend(MsgpackItem.Timestamp64(nanosec.drop(4), seconds.drop(3)))
res.accumulate(v => MsgpackItem.Timestamp64(v.toLong(false)))
}
case 12 =>
for {
res <- requireBytes(4, ctx)
nanosec = res.result
nanosec = res.result.toInt(false)
res <- requireBytes(8, res.toContext)
seconds = res.result
seconds = res.result.toLong(false)
} yield res.toContext.prepend(MsgpackItem.Timestamp96(nanosec, seconds))
case _ => Pull.raiseError(new MsgpackParsingException(s"Invalid timestamp length: ${length}"))
}
Expand Down Expand Up @@ -120,4 +116,64 @@ private[internal] object FormatParsers {
}
}
}

def parseFloat32[F[_]](ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(4, ctx).map {
_.accumulate { v =>
MsgpackItem.Float32 {
val raw = v.toInt(false)
val sign = if ((raw & 0x80000000) == 0x80000000) -1 else 1
val biasedExponent = (raw & 0x7f800000) >>> 23

// subnormal or zero
if (biasedExponent == 0) {
val mantissa = (raw & 0x007fffff).toFloat
if (mantissa == 0) 0F
else sign * Math.pow(2, -126).toFloat * (mantissa / 0x800000)
// Inf or NaN
} else if (biasedExponent == 0xff) {
val mantissa = raw & 0x007fffff
if (mantissa == 0) sign * Float.PositiveInfinity
else Float.NaN
// normal
} else {
val exponent = (biasedExponent - 127).toDouble
val mantissa = (raw & 0x007fffff).toFloat + 0x800000
sign * Math.pow(2, exponent).toFloat * (mantissa / 0x800000)
}
}
}
}
}

def parseFloat64[F[_]](ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(8, ctx).map {
_.accumulate { v =>
MsgpackItem.Float64 {
val raw = v.toLong(false)
val sign = if ((raw & 0x8000000000000000L) == 0x8000000000000000L) -1 else 1
val biasedExponent = (raw & 0x7ff0000000000000L) >>> 52

// subnormal or zero
if (biasedExponent == 0) {
val mantissa = (raw & 0xfffffffffffffL).toDouble
if (mantissa == 0) 0D
else sign * Math.pow(2, -1022) * (mantissa / 0x10000000000000L)
// Inf or NaN
} else if (biasedExponent == 0x7ff) {
val mantissa = raw & 0xfffffffffffffL
if (mantissa == 0) sign * Double.PositiveInfinity
else Double.NaN
// normal
} else {
val exponent = (biasedExponent - 1023).toDouble
val mantissa = (raw & 0xfffffffffffffL).toDouble + 0x10000000000000L
sign * Math.pow(2, exponent) * (mantissa / 0x10000000000000L)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ private[low] object ItemParser {
case Headers.Ext8 => parsePlainExt(1, ctx)
case Headers.Ext16 => parsePlainExt(2, ctx)
case Headers.Ext32 => parsePlainExt(4, ctx)
case Headers.Float32 => parseSimpleType(MsgpackItem.Float32(_))(4, ctx)
case Headers.Float64 => parseSimpleType(MsgpackItem.Float64(_))(8, ctx)
case Headers.Float32 => parseFloat32(ctx)
case Headers.Float64 => parseFloat64(ctx)
case Headers.Uint8 => parseSimpleType(MsgpackItem.UnsignedInt(_))(1, ctx)
case Headers.Uint16 => parseSimpleType(MsgpackItem.UnsignedInt(_))(2, ctx)
case Headers.Uint32 => parseSimpleType(MsgpackItem.UnsignedInt(_))(4, ctx)
Expand Down
22 changes: 17 additions & 5 deletions msgpack/src/main/scala/fs2/data/msgpack/low/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ object MsgpackItem {
case class SignedInt(bytes: ByteVector) extends MsgpackItem

/** Single precision IEE 754 float */
case class Float32(bytes: ByteVector) extends MsgpackItem
case class Float32(v: Float) extends MsgpackItem

/** Double precision IEE 754 float */
case class Float64(bytes: ByteVector) extends MsgpackItem
case class Float64(v: Double) extends MsgpackItem

/** UTF-8 encoded string */
case class Str(bytes: ByteVector) extends MsgpackItem
Expand All @@ -40,9 +40,21 @@ object MsgpackItem {
case class Extension(tpe: Byte, bytes: ByteVector) extends MsgpackItem

// Predefined extension types
case class Timestamp32(seconds: ByteVector) extends MsgpackItem
case class Timestamp64(nanoseconds: ByteVector, seconds: ByteVector) extends MsgpackItem
case class Timestamp96(nanoseconds: ByteVector, seconds: ByteVector) extends MsgpackItem
case class Timestamp32(seconds: Int) extends MsgpackItem

/** Stores data in a 30-bit [[nanoseconds]] and a 34-bit [[seconds]] fields, both of which are accessible as class
* attributes. To ensure valid data length at the type level, both fields are constructed from a single 64-bit
* [[combined]] variable.
* @param combined [[nanoseconds]] and [[seconds]] combined into a signle 64-bit value
*/
case class Timestamp64(combined: Long) extends MsgpackItem {
/* We are sure that (x: Long) >> 34 fits in an int but we also need to add a mask so that we don't end up with
* a negative number.
*/
val nanoseconds: Int = (0x000000003fffffffL & (combined >> 34)).toInt
val seconds: Long = combined & 0x00000003ffffffffL
}
case class Timestamp96(nanoseconds: Int, seconds: Long) extends MsgpackItem

case object Nil extends MsgpackItem
case object True extends MsgpackItem
Expand Down
76 changes: 68 additions & 8 deletions msgpack/src/test/scala/fs2/data/msgpack/ParserSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scodec.bits.*
import weaver.SimpleIOSuite
import fs2.*
import fs2.data.msgpack.low.MsgpackItem

import java.nio.charset.StandardCharsets

object ParserSpec extends SimpleIOSuite {
Expand Down Expand Up @@ -60,8 +61,8 @@ object ParserSpec extends SimpleIOSuite {
(hex"0xc90000000312123456", List(MsgpackItem.Extension(0x12, hex"123456"))),

// float32, float64
(hex"0xca3e800000", List(MsgpackItem.Float32(hex"3e800000"))),
(hex"0xcb3fd0000000000000", List(MsgpackItem.Float64(hex"0x3fd0000000000000"))),
(hex"0xca3e800000", List(MsgpackItem.Float32(0.25F))),
(hex"0xcb3fd0000000000000", List(MsgpackItem.Float64(0.25))),

// uint8, uint16, uint32, uint64
(hex"0xcc13", List(MsgpackItem.UnsignedInt(hex"13"))),
Expand Down Expand Up @@ -97,9 +98,9 @@ object ParserSpec extends SimpleIOSuite {
(hex"f7", List(MsgpackItem.SignedInt(hex"f7"))), // -9

// timestamp
(hex"0xd6ff12341234", List(MsgpackItem.Timestamp32(hex"12341234"))),
(hex"0xd7ff1234123412341234", List(MsgpackItem.Timestamp64(hex"48D048D", hex"0012341234"))),
(hex"0xc70cff123412341234123412341234", List(MsgpackItem.Timestamp96(hex"12341234", hex"1234123412341234")))
(hex"0xd6ff12341234", List(MsgpackItem.Timestamp32(0x12341234))),
(hex"0xd7ff1234123412341234", List(MsgpackItem.Timestamp64(0x1234123412341234L))),
(hex"0xc70cff123412341234123412341234", List(MsgpackItem.Timestamp96(0x12341234, 0x1234123412341234L)))
)

Stream
Expand Down Expand Up @@ -234,7 +235,7 @@ object ParserSpec extends SimpleIOSuite {
MsgpackItem.Str(ByteVector("int".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.SignedInt(hex"01"),
MsgpackItem.Str(ByteVector("float".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.Float32(hex"3f 00 00 00"), // 0.5 single prec.
MsgpackItem.Float32(0.5F), // 0.5 single prec.
MsgpackItem.Str(ByteVector("boolean".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.True,
MsgpackItem.Str(ByteVector("null".getBytes(StandardCharsets.UTF_8))),
Expand All @@ -250,9 +251,9 @@ object ParserSpec extends SimpleIOSuite {
MsgpackItem.Str(ByteVector("foo".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.SignedInt(hex"01"),
MsgpackItem.Str(ByteVector("baz".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.Float64(hex"3f e0 00 00 00 00 00 00"), // 0.5 double prec.
MsgpackItem.Float64(0.5), // 0.5 double prec.
MsgpackItem.Str(ByteVector("timestamp".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.Timestamp64(hex"004488c", hex"0344556677") // 9:02:47 pm UTC + 280716ns, August 20, 2414
MsgpackItem.Timestamp64(0x0011223344556677L) // 9:02:47 pm UTC + 280716ns, August 20, 2414
))
)

Expand All @@ -272,4 +273,63 @@ object ParserSpec extends SimpleIOSuite {
.compile
.foldMonoid
}

test("Msgpack value parser should correctly parse floating point values") {
val cases = List(
// float32, normal
(hex"ca3f200000", MsgpackItem.Float32(0.625F)),
// float32, subnormal
(hex"ca00400000", MsgpackItem.Float32(5.877472e-39.toFloat)),
// float64 normal
(hex"cb3fc8000000000000", MsgpackItem.Float64(0.1875)),
// float64 subnormal
(hex"cb000ccccccccccccc", MsgpackItem.Float64(1.78005908680576071121966950087e-308)),
// float32, float64 Inf
(hex"ca7f800000", MsgpackItem.Float32(Float.PositiveInfinity)),
(hex"caff800000", MsgpackItem.Float32(Float.NegativeInfinity)),
(hex"cb7ff0000000000000", MsgpackItem.Float64(Double.PositiveInfinity)),
(hex"cbfff0000000000000", MsgpackItem.Float64(Double.NegativeInfinity))
)

val nans = hex"""
ca7f80000a
caff80000a
cb7ff000000000000a
cbfff000000000000a
"""

val s1 =
Stream
.emits(cases)
.evalMap {
case (hex, repr) => {
Stream
.chunk(Chunk.byteVector((hex)))
.through(low.items[IO])
.compile
.toList
.attempt
.map(encoded => expect.same(encoded, Right(List(repr))))
}
}
val s2 =
Stream
.chunk(Chunk.byteVector(nans))
.through(low.items[IO])
.map {
case MsgpackItem.Float32(v) => expect(v.isNaN)
case MsgpackItem.Float64(v) => expect(v.isNaN)
case m => failure(s"Expected NaN but found: ${m}")
}

(s1 ++ s2).compile.foldMonoid
}

pureTest("Timestamp64 nanoseconds field should always be positive") {
val nums = List(
0xffffffffffffffffL,
0x8000000000000000L
)
forEach(nums)(x => expect(MsgpackItem.Timestamp64(x).nanoseconds > 0))
}
}
Loading