Skip to content

Commit

Permalink
Merge pull request #618 from jarmuszz/main
Browse files Browse the repository at this point in the history
  • Loading branch information
ybasket authored Jul 14, 2024
2 parents 80dd7f4 + 955ec6c commit fef8818
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,18 @@ private[internal] object FormatParsers {
length match {
case 4 =>
requireBytes(4, ctx).map { res =>
res.accumulate(v => MsgpackItem.Timestamp32(v))
res.accumulate(v => MsgpackItem.Timestamp32(v.toInt(false)))
}
case 8 =>
requireBytes(8, ctx).map { res =>
val result = res.result
val seconds = result & hex"00000003ffffffff"
val nanosec = result >> 34

res.toContext.prepend(MsgpackItem.Timestamp64(nanosec.drop(4), seconds.drop(3)))
res.accumulate(v => MsgpackItem.Timestamp64(v.toLong(false)))
}
case 12 =>
for {
res <- requireBytes(4, ctx)
nanosec = res.result
nanosec = res.result.toInt(false)
res <- requireBytes(8, res.toContext)
seconds = res.result
seconds = res.result.toLong(false)
} yield res.toContext.prepend(MsgpackItem.Timestamp96(nanosec, seconds))
case _ => Pull.raiseError(new MsgpackParsingException(s"Invalid timestamp length: ${length}"))
}
Expand Down Expand Up @@ -120,4 +116,64 @@ private[internal] object FormatParsers {
}
}
}

def parseFloat32[F[_]](ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(4, ctx).map {
_.accumulate { v =>
MsgpackItem.Float32 {
val raw = v.toInt(false)
val sign = if ((raw & 0x80000000) == 0x80000000) -1 else 1
val biasedExponent = (raw & 0x7f800000) >>> 23

// subnormal or zero
if (biasedExponent == 0) {
val mantissa = (raw & 0x007fffff).toFloat
if (mantissa == 0) 0F
else sign * Math.pow(2, -126).toFloat * (mantissa / 0x800000)
// Inf or NaN
} else if (biasedExponent == 0xff) {
val mantissa = raw & 0x007fffff
if (mantissa == 0) sign * Float.PositiveInfinity
else Float.NaN
// normal
} else {
val exponent = (biasedExponent - 127).toDouble
val mantissa = (raw & 0x007fffff).toFloat + 0x800000
sign * Math.pow(2, exponent).toFloat * (mantissa / 0x800000)
}
}
}
}
}

def parseFloat64[F[_]](ctx: ParserContext[F])(implicit
F: RaiseThrowable[F]): Pull[F, MsgpackItem, ParserContext[F]] = {
requireBytes(8, ctx).map {
_.accumulate { v =>
MsgpackItem.Float64 {
val raw = v.toLong(false)
val sign = if ((raw & 0x8000000000000000L) == 0x8000000000000000L) -1 else 1
val biasedExponent = (raw & 0x7ff0000000000000L) >>> 52

// subnormal or zero
if (biasedExponent == 0) {
val mantissa = (raw & 0xfffffffffffffL).toDouble
if (mantissa == 0) 0D
else sign * Math.pow(2, -1022) * (mantissa / 0x10000000000000L)
// Inf or NaN
} else if (biasedExponent == 0x7ff) {
val mantissa = raw & 0xfffffffffffffL
if (mantissa == 0) sign * Double.PositiveInfinity
else Double.NaN
// normal
} else {
val exponent = (biasedExponent - 1023).toDouble
val mantissa = (raw & 0xfffffffffffffL).toDouble + 0x10000000000000L
sign * Math.pow(2, exponent) * (mantissa / 0x10000000000000L)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ private[low] object ItemParser {
case Headers.Ext8 => parsePlainExt(1, ctx)
case Headers.Ext16 => parsePlainExt(2, ctx)
case Headers.Ext32 => parsePlainExt(4, ctx)
case Headers.Float32 => parseSimpleType(MsgpackItem.Float32(_))(4, ctx)
case Headers.Float64 => parseSimpleType(MsgpackItem.Float64(_))(8, ctx)
case Headers.Float32 => parseFloat32(ctx)
case Headers.Float64 => parseFloat64(ctx)
case Headers.Uint8 => parseSimpleType(MsgpackItem.UnsignedInt(_))(1, ctx)
case Headers.Uint16 => parseSimpleType(MsgpackItem.UnsignedInt(_))(2, ctx)
case Headers.Uint32 => parseSimpleType(MsgpackItem.UnsignedInt(_))(4, ctx)
Expand Down
22 changes: 17 additions & 5 deletions msgpack/src/main/scala/fs2/data/msgpack/low/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ object MsgpackItem {
case class SignedInt(bytes: ByteVector) extends MsgpackItem

/** Single precision IEE 754 float */
case class Float32(bytes: ByteVector) extends MsgpackItem
case class Float32(v: Float) extends MsgpackItem

/** Double precision IEE 754 float */
case class Float64(bytes: ByteVector) extends MsgpackItem
case class Float64(v: Double) extends MsgpackItem

/** UTF-8 encoded string */
case class Str(bytes: ByteVector) extends MsgpackItem
Expand All @@ -40,9 +40,21 @@ object MsgpackItem {
case class Extension(tpe: Byte, bytes: ByteVector) extends MsgpackItem

// Predefined extension types
case class Timestamp32(seconds: ByteVector) extends MsgpackItem
case class Timestamp64(nanoseconds: ByteVector, seconds: ByteVector) extends MsgpackItem
case class Timestamp96(nanoseconds: ByteVector, seconds: ByteVector) extends MsgpackItem
case class Timestamp32(seconds: Int) extends MsgpackItem

/** Stores data in a 30-bit [[nanoseconds]] and a 34-bit [[seconds]] fields, both of which are accessible as class
* attributes. To ensure valid data length at the type level, both fields are constructed from a single 64-bit
* [[combined]] variable.
* @param combined [[nanoseconds]] and [[seconds]] combined into a signle 64-bit value
*/
case class Timestamp64(combined: Long) extends MsgpackItem {
/* We are sure that (x: Long) >> 34 fits in an int but we also need to add a mask so that we don't end up with
* a negative number.
*/
val nanoseconds: Int = (0x000000003fffffffL & (combined >> 34)).toInt
val seconds: Long = combined & 0x00000003ffffffffL
}
case class Timestamp96(nanoseconds: Int, seconds: Long) extends MsgpackItem

case object Nil extends MsgpackItem
case object True extends MsgpackItem
Expand Down
76 changes: 68 additions & 8 deletions msgpack/src/test/scala/fs2/data/msgpack/ParserSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scodec.bits.*
import weaver.SimpleIOSuite
import fs2.*
import fs2.data.msgpack.low.MsgpackItem

import java.nio.charset.StandardCharsets

object ParserSpec extends SimpleIOSuite {
Expand Down Expand Up @@ -60,8 +61,8 @@ object ParserSpec extends SimpleIOSuite {
(hex"0xc90000000312123456", List(MsgpackItem.Extension(0x12, hex"123456"))),

// float32, float64
(hex"0xca3e800000", List(MsgpackItem.Float32(hex"3e800000"))),
(hex"0xcb3fd0000000000000", List(MsgpackItem.Float64(hex"0x3fd0000000000000"))),
(hex"0xca3e800000", List(MsgpackItem.Float32(0.25F))),
(hex"0xcb3fd0000000000000", List(MsgpackItem.Float64(0.25))),

// uint8, uint16, uint32, uint64
(hex"0xcc13", List(MsgpackItem.UnsignedInt(hex"13"))),
Expand Down Expand Up @@ -97,9 +98,9 @@ object ParserSpec extends SimpleIOSuite {
(hex"f7", List(MsgpackItem.SignedInt(hex"f7"))), // -9

// timestamp
(hex"0xd6ff12341234", List(MsgpackItem.Timestamp32(hex"12341234"))),
(hex"0xd7ff1234123412341234", List(MsgpackItem.Timestamp64(hex"48D048D", hex"0012341234"))),
(hex"0xc70cff123412341234123412341234", List(MsgpackItem.Timestamp96(hex"12341234", hex"1234123412341234")))
(hex"0xd6ff12341234", List(MsgpackItem.Timestamp32(0x12341234))),
(hex"0xd7ff1234123412341234", List(MsgpackItem.Timestamp64(0x1234123412341234L))),
(hex"0xc70cff123412341234123412341234", List(MsgpackItem.Timestamp96(0x12341234, 0x1234123412341234L)))
)

Stream
Expand Down Expand Up @@ -234,7 +235,7 @@ object ParserSpec extends SimpleIOSuite {
MsgpackItem.Str(ByteVector("int".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.SignedInt(hex"01"),
MsgpackItem.Str(ByteVector("float".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.Float32(hex"3f 00 00 00"), // 0.5 single prec.
MsgpackItem.Float32(0.5F), // 0.5 single prec.
MsgpackItem.Str(ByteVector("boolean".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.True,
MsgpackItem.Str(ByteVector("null".getBytes(StandardCharsets.UTF_8))),
Expand All @@ -250,9 +251,9 @@ object ParserSpec extends SimpleIOSuite {
MsgpackItem.Str(ByteVector("foo".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.SignedInt(hex"01"),
MsgpackItem.Str(ByteVector("baz".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.Float64(hex"3f e0 00 00 00 00 00 00"), // 0.5 double prec.
MsgpackItem.Float64(0.5), // 0.5 double prec.
MsgpackItem.Str(ByteVector("timestamp".getBytes(StandardCharsets.UTF_8))),
MsgpackItem.Timestamp64(hex"004488c", hex"0344556677") // 9:02:47 pm UTC + 280716ns, August 20, 2414
MsgpackItem.Timestamp64(0x0011223344556677L) // 9:02:47 pm UTC + 280716ns, August 20, 2414
))
)

Expand All @@ -272,4 +273,63 @@ object ParserSpec extends SimpleIOSuite {
.compile
.foldMonoid
}

test("Msgpack value parser should correctly parse floating point values") {
val cases = List(
// float32, normal
(hex"ca3f200000", MsgpackItem.Float32(0.625F)),
// float32, subnormal
(hex"ca00400000", MsgpackItem.Float32(5.877472e-39.toFloat)),
// float64 normal
(hex"cb3fc8000000000000", MsgpackItem.Float64(0.1875)),
// float64 subnormal
(hex"cb000ccccccccccccc", MsgpackItem.Float64(1.78005908680576071121966950087e-308)),
// float32, float64 Inf
(hex"ca7f800000", MsgpackItem.Float32(Float.PositiveInfinity)),
(hex"caff800000", MsgpackItem.Float32(Float.NegativeInfinity)),
(hex"cb7ff0000000000000", MsgpackItem.Float64(Double.PositiveInfinity)),
(hex"cbfff0000000000000", MsgpackItem.Float64(Double.NegativeInfinity))
)

val nans = hex"""
ca7f80000a
caff80000a
cb7ff000000000000a
cbfff000000000000a
"""

val s1 =
Stream
.emits(cases)
.evalMap {
case (hex, repr) => {
Stream
.chunk(Chunk.byteVector((hex)))
.through(low.items[IO])
.compile
.toList
.attempt
.map(encoded => expect.same(encoded, Right(List(repr))))
}
}
val s2 =
Stream
.chunk(Chunk.byteVector(nans))
.through(low.items[IO])
.map {
case MsgpackItem.Float32(v) => expect(v.isNaN)
case MsgpackItem.Float64(v) => expect(v.isNaN)
case m => failure(s"Expected NaN but found: ${m}")
}

(s1 ++ s2).compile.foldMonoid
}

pureTest("Timestamp64 nanoseconds field should always be positive") {
val nums = List(
0xffffffffffffffffL,
0x8000000000000000L
)
forEach(nums)(x => expect(MsgpackItem.Timestamp64(x).nanoseconds > 0))
}
}

0 comments on commit fef8818

Please sign in to comment.