Skip to content

Commit

Permalink
Adds the avro and protobuffer serializers for java.time.Instant (#388)
Browse files Browse the repository at this point in the history
  • Loading branch information
fedefernandez authored Sep 25, 2018
1 parent 756923d commit 7451607
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 24 deletions.
25 changes: 24 additions & 1 deletion modules/internal/src/main/scala/encoders/avro.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package internal.encoders

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
import java.nio.ByteBuffer
import java.time.{LocalDate, LocalDateTime}
import java.time.{Instant, LocalDate, LocalDateTime}

import com.google.common.io.ByteStreams
import freestyle.rpc.internal.util.{BigDecimalUtil, EncoderUtil, JavaTimeUtil}
Expand Down Expand Up @@ -121,6 +121,20 @@ object avro extends AvroMarshallers {
JavaTimeUtil.localDateTimeToLong(value)
}

implicit object instantToSchema extends ToSchema[Instant] {
override val schema: Schema = Schema.create(Schema.Type.LONG)
}

implicit object instantFromValue extends FromValue[Instant] {
def apply(value: Any, field: Field): Instant =
JavaTimeUtil.longToInstant(value.asInstanceOf[Long])
}

implicit object instantToValue extends ToValue[Instant] {
override def apply(value: Instant): Long =
JavaTimeUtil.instantToLong(value)
}

object marshallers {

implicit val localDateMarshaller: Marshaller[LocalDate] = new Marshaller[LocalDate] {
Expand All @@ -142,6 +156,15 @@ object avro extends AvroMarshallers {
JavaTimeUtil.longToLocalDateTime(
EncoderUtil.byteArrayToLong(ByteStreams.toByteArray(stream)))
}

implicit val instantMarshaller: Marshaller[Instant] =
new Marshaller[Instant] {
override def stream(value: Instant): InputStream =
new ByteArrayInputStream(EncoderUtil.longToByteArray(JavaTimeUtil.instantToLong(value)))

override def parse(stream: InputStream): Instant =
JavaTimeUtil.longToInstant(EncoderUtil.byteArrayToLong(ByteStreams.toByteArray(stream)))
}
}

}
Expand Down
12 changes: 11 additions & 1 deletion modules/internal/src/main/scala/encoders/pbd.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package freestyle.rpc
package internal.encoders

import java.io.{ByteArrayInputStream, InputStream}
import java.time.{LocalDate, LocalDateTime}
import java.time.{Instant, LocalDate, LocalDateTime}

import com.google.protobuf.{CodedInputStream, CodedOutputStream}
import freestyle.rpc.internal.util.{BigDecimalUtil, EncoderUtil, JavaTimeUtil}
Expand Down Expand Up @@ -75,5 +75,15 @@ object pbd {
JavaTimeUtil.longToLocalDateTime(EncoderUtil.byteArrayToLong(input.readByteArray()))
}

implicit object InstantWriter extends PBWriter[Instant] {
override def writeTo(index: Int, value: Instant, out: CodedOutputStream): Unit =
out.writeByteArray(index, EncoderUtil.longToByteArray(JavaTimeUtil.instantToLong(value)))
}

implicit object InstantReader extends PBReader[Instant] {
override def read(input: CodedInputStream): Instant =
JavaTimeUtil.longToInstant(EncoderUtil.byteArrayToLong(input.readByteArray()))
}

}
}
8 changes: 6 additions & 2 deletions modules/internal/src/main/scala/util/JavaTimeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ object JavaTimeUtil {
def intToLocalDate(value: Int): LocalDate = LocalDate.ofEpochDay(value.toLong)

def localDateTimeToLong(value: LocalDateTime): Long =
value.toInstant(ZoneOffset.UTC).toEpochMilli
instantToLong(value.toInstant(ZoneOffset.UTC))

def longToLocalDateTime(value: Long): LocalDateTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(value), ZoneOffset.UTC).toLocalDateTime
ZonedDateTime.ofInstant(longToInstant(value), ZoneOffset.UTC).toLocalDateTime

def instantToLong(value: Instant): Long = value.toEpochMilli

def longToInstant(value: Long): Instant = Instant.ofEpochMilli(value)

}
11 changes: 11 additions & 0 deletions modules/internal/src/test/scala/util/JavaTimeUtilTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ class JavaTimeUtilTests extends WordSpec with Matchers with Checkers {
}
}
}

"allow to convert Instant to and from long" in {
import com.fortysevendeg.scalacheck.datetime.jdk8.granularity.seconds
check {
forAll(genDateTimeWithinRange(from, range)) { zdt: ZonedDateTime =>
val date = zdt.toInstant
val value = JavaTimeUtil.instantToLong(date)
JavaTimeUtil.longToInstant(value) == date
}
}
}
}

}
101 changes: 81 additions & 20 deletions modules/server/src/test/scala/protocol/RPCJavaTimeTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,36 @@ class RPCJavaTimeTests extends RpcBaseTestSuite with BeforeAndAfterAll with Chec

object RPCDateService {

case class Request(date: LocalDate, dateTime: LocalDateTime, label: String)
case class Request(date: LocalDate, dateTime: LocalDateTime, instant: Instant, label: String)

case class Response(date: LocalDate, dateTime: LocalDateTime, result: String, check: Boolean)
case class Response(
date: LocalDate,
dateTime: LocalDateTime,
instant: Instant,
result: String,
check: Boolean)

@service(Protobuf)
trait ProtoRPCDateServiceDef[F[_]] {
def localDateProto(date: LocalDate): F[LocalDate]
def localDateTimeProto(dateTime: LocalDateTime): F[LocalDateTime]
def instantProto(instant: Instant): F[Instant]
def dateProtoWrapper(req: Request): F[Response]
}

@service(Avro)
trait AvroRPCDateServiceDef[F[_]] {
def localDateAvro(date: LocalDate): F[LocalDate]
def localDateTimeAvro(dateTime: LocalDateTime): F[LocalDateTime]
def instantAvro(instant: Instant): F[Instant]
def dateAvroWrapper(req: Request): F[Response]
}

@service(AvroWithSchema)
trait AvroWithSchemaRPCDateServiceDef[F[_]] {
def localDateAvroWithSchema(date: LocalDate): F[LocalDate]
def localDateTimeAvroWithSchema(dateTime: LocalDateTime): F[LocalDateTime]
def instantAvroWithSchema(instant: Instant): F[Instant]
def dateAvroWrapperWithSchema(req: Request): F[Response]
}

Expand All @@ -70,16 +78,19 @@ class RPCJavaTimeTests extends RpcBaseTestSuite with BeforeAndAfterAll with Chec

def localDateProto(date: LocalDate): F[LocalDate] = date.pure
def localDateTimeProto(dateTime: LocalDateTime): F[LocalDateTime] = dateTime.pure
def instantProto(instant: Instant): F[Instant] = instant.pure
def dateProtoWrapper(req: Request): F[Response] =
Response(req.date, req.dateTime, req.label, check = true).pure
Response(req.date, req.dateTime, req.instant, req.label, check = true).pure
def localDateAvro(date: LocalDate): F[LocalDate] = date.pure
def localDateTimeAvro(dateTime: LocalDateTime): F[LocalDateTime] = dateTime.pure
def instantAvro(instant: Instant): F[Instant] = instant.pure
def dateAvroWrapper(req: Request): F[Response] =
Response(req.date, req.dateTime, req.label, check = true).pure
Response(req.date, req.dateTime, req.instant, req.label, check = true).pure
def localDateAvroWithSchema(date: LocalDate): F[LocalDate] = date.pure
def localDateTimeAvroWithSchema(dateTime: LocalDateTime): F[LocalDateTime] = dateTime.pure
def instantAvroWithSchema(instant: Instant): F[Instant] = instant.pure
def dateAvroWrapperWithSchema(req: Request): F[Response] =
Response(req.date, req.dateTime, req.label, check = true).pure
Response(req.date, req.dateTime, req.instant, req.label, check = true).pure
}

}
Expand Down Expand Up @@ -129,7 +140,24 @@ class RPCJavaTimeTests extends RpcBaseTestSuite with BeforeAndAfterAll with Chec

}

"be able to serialize and deserialize LocalDate and LocalDateTime in a Request using proto format" in {
"be able to serialize and deserialize Instant using proto format" in {

withServerChannel(ProtoRPCDateServiceDef.bindService[ConcurrentMonad]) { sc =>
val client: ProtoRPCDateServiceDef.Client[ConcurrentMonad] =
ProtoRPCDateServiceDef.clientFromChannel[ConcurrentMonad](sc.channel)

check {
forAll(genDateTimeWithinRange(from, range)) { zdt: ZonedDateTime =>
val instant = zdt.toInstant
client.instantProto(instant).unsafeRunSync() == instant
}
}

}

}

"be able to serialize and deserialize LocalDate, LocalDateTime, and Instant in a Request using proto format" in {

withServerChannel(ProtoRPCDateServiceDef.bindService[ConcurrentMonad]) { sc =>
val client: ProtoRPCDateServiceDef.Client[ConcurrentMonad] =
Expand All @@ -140,11 +168,10 @@ class RPCJavaTimeTests extends RpcBaseTestSuite with BeforeAndAfterAll with Chec
(zdt: ZonedDateTime, s: String) =>
val date = zdt.toLocalDate
val dateTime = zdt.toLocalDateTime
client.dateProtoWrapper(Request(date, dateTime, s)).unsafeRunSync() == Response(
date,
dateTime,
s,
check = true)
val instant = zdt.toInstant
client
.dateProtoWrapper(Request(date, dateTime, instant, s))
.unsafeRunSync() == Response(date, dateTime, instant, s, check = true)
}
}

Expand Down Expand Up @@ -186,7 +213,24 @@ class RPCJavaTimeTests extends RpcBaseTestSuite with BeforeAndAfterAll with Chec

}

"be able to serialize and deserialize LocalDate and LocalDateTime in a Request using avro format" in {
"be able to serialize and deserialize Instant using avro format" in {

withServerChannel(AvroRPCDateServiceDef.bindService[ConcurrentMonad]) { sc =>
val client: AvroRPCDateServiceDef.Client[ConcurrentMonad] =
AvroRPCDateServiceDef.clientFromChannel[ConcurrentMonad](sc.channel)

check {
forAll(genDateTimeWithinRange(from, range)) { zdt: ZonedDateTime =>
val instant = zdt.toInstant
client.instantAvro(instant).unsafeRunSync() == instant
}
}

}

}

"be able to serialize and deserialize LocalDate, LocalDateTime, and Instant in a Request using avro format" in {

withServerChannel(AvroRPCDateServiceDef.bindService[ConcurrentMonad]) { sc =>
val client: AvroRPCDateServiceDef.Client[ConcurrentMonad] =
Expand All @@ -197,11 +241,10 @@ class RPCJavaTimeTests extends RpcBaseTestSuite with BeforeAndAfterAll with Chec
(zdt: ZonedDateTime, s: String) =>
val date = zdt.toLocalDate
val dateTime = zdt.toLocalDateTime
client.dateAvroWrapper(Request(date, dateTime, s)).unsafeRunSync() == Response(
date,
dateTime,
s,
check = true)
val instant = zdt.toInstant
client
.dateAvroWrapper(Request(date, dateTime, instant, s))
.unsafeRunSync() == Response(date, dateTime, instant, s, check = true)
}
}

Expand Down Expand Up @@ -243,7 +286,24 @@ class RPCJavaTimeTests extends RpcBaseTestSuite with BeforeAndAfterAll with Chec

}

"be able to serialize and deserialize LocalDate and LocalDateTime in a Request using avro format with schema" in {
"be able to serialize and deserialize Instant using avro format with schema" in {

withServerChannel(AvroWithSchemaRPCDateServiceDef.bindService[ConcurrentMonad]) { sc =>
val client: AvroWithSchemaRPCDateServiceDef.Client[ConcurrentMonad] =
AvroWithSchemaRPCDateServiceDef.clientFromChannel[ConcurrentMonad](sc.channel)

check {
forAll(genDateTimeWithinRange(from, range)) { zdt: ZonedDateTime =>
val instant = zdt.toInstant
client.instantAvroWithSchema(instant).unsafeRunSync() == instant
}
}

}

}

"be able to serialize and deserialize LocalDate, LocalDateTime, and Instant in a Request using avro format with schema" in {

withServerChannel(AvroWithSchemaRPCDateServiceDef.bindService[ConcurrentMonad]) { sc =>
val client: AvroWithSchemaRPCDateServiceDef.Client[ConcurrentMonad] =
Expand All @@ -254,9 +314,10 @@ class RPCJavaTimeTests extends RpcBaseTestSuite with BeforeAndAfterAll with Chec
(zdt: ZonedDateTime, s: String) =>
val date = zdt.toLocalDate
val dateTime = zdt.toLocalDateTime
val instant = zdt.toInstant
client
.dateAvroWrapperWithSchema(Request(date, dateTime, s))
.unsafeRunSync() == Response(date, dateTime, s, check = true)
.dateAvroWrapperWithSchema(Request(date, dateTime, instant, s))
.unsafeRunSync() == Response(date, dateTime, instant, s, check = true)
}
}

Expand Down

0 comments on commit 7451607

Please sign in to comment.