Skip to content

Commit

Permalink
perf: Avoid event serialization roundtrip
Browse files Browse the repository at this point in the history
* journal handles write of SerializedEvent without additional serialization
  • Loading branch information
patriknw committed Aug 24, 2023
1 parent 66e3cdb commit ec33272
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package akka.persistence.r2dbc.journal

import java.time.Instant

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import akka.Done
import akka.actor.ActorRef
import akka.actor.typed.ActorSystem
Expand All @@ -24,6 +26,7 @@ import akka.persistence.PersistentRepr
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.journal.Tagged
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.typed.EventEnvelope.SerializedEvent
import akka.persistence.r2dbc.ConnectionFactoryProvider
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.InstantFactory
Expand Down Expand Up @@ -121,10 +124,14 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends
val entityType = PersistenceId.extractEntityType(pr.persistenceId)
val slice = persistenceExt.sliceForPersistenceId(pr.persistenceId)

val serialized = serialization.serialize(event).get
val serializer = serialization.findSerializerFor(event)
val manifest = Serializers.manifestFor(serializer, event)
val id: Int = serializer.identifier
val serializedEvent = event match {
case s: SerializedEvent => s // already serialized
case _ =>
val bytes = serialization.serialize(event).get
val serializer = serialization.findSerializerFor(event)
val manifest = Serializers.manifestFor(serializer, event)
SerializedEvent(bytes, serializer.identifier, manifest)
}

val metadata = pr.metadata.map { meta =>
val m = meta.asInstanceOf[AnyRef]
Expand All @@ -142,9 +149,9 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends
pr.sequenceNr,
timestamp,
JournalDao.EmptyDbTimestamp,
Some(serialized),
id,
manifest,
Some(serializedEvent.bytes),
serializedEvent.serializerId,
serializedEvent.serializerManifest,
pr.writerUuid,
tags,
metadata)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.r2dbc.journal

import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.query.typed.EventEnvelope.SerializedEvent
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.TestActors.Persister
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.typed.PersistenceId
import akka.serialization.SerializationExtension
import akka.serialization.Serializers
import org.scalatest.wordspec.AnyWordSpecLike

class PersistSerializedEventSpec
extends ScalaTestWithActorTestKit(TestConfig.config)
with AnyWordSpecLike
with TestDbLifecycle
with TestData
with LogCapturing {

override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))

case class Row(pid: String, seqNr: Long, serializerId: Int, manifest: String)

"Persist SerializedEvent" should {

"be stored with given serialization information" in {
val doneProbe = createTestProbe[Done]()

val entityType = nextEntityType()
val persistenceId = PersistenceId.ofUniqueId(nextPid(entityType))
val ref = spawn(Persister(persistenceId, Set.empty))

// String serialization has no manifest
val event1 = "e1"
val serializer1 = SerializationExtension(system).findSerializerFor(event1)
val serializedEvent1 =
SerializedEvent(
serializer1.toBinary(event1),
serializer1.identifier,
Serializers.manifestFor(serializer1, event1))

// Option serialization has manifest
val event2 = Some("e2")
val serializer2 = SerializationExtension(system).findSerializerFor(event2)
val serializedEvent2 =
SerializedEvent(
serializer2.toBinary(event2),
serializer2.identifier,
Serializers.manifestFor(serializer2, event2))

ref ! Persister.Persist(serializedEvent1)
ref ! Persister.PersistWithAck(serializedEvent2, doneProbe.ref)
doneProbe.expectMessage(Done)
testKit.stop(ref)

val ref2 = spawn(Persister(persistenceId, Set.empty))
val replyProbe = createTestProbe[String]()
ref2 ! Persister.GetState(replyProbe.ref)
replyProbe.expectMessage("e1|Some(e2)")

val rows =
r2dbcExecutor
.select[Row]("test")(
connection =>
connection.createStatement(
s"select * from ${settings.journalTableWithSchema} where persistence_id = '${persistenceId.id}'"),
row => {
Row(
pid = row.get("persistence_id", classOf[String]),
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
serializerId = row.get[java.lang.Integer]("event_ser_id", classOf[java.lang.Integer]),
manifest = row.get("event_ser_manifest", classOf[String]))
})
.futureValue

rows.foreach { case Row(pid, seqNr, serializerId, manifest) =>
withClue(s"pid [$pid}], seqNr [$seqNr]: ") {
if (seqNr == 1L) {
serializerId shouldBe serializedEvent1.serializerId
manifest shouldBe serializedEvent1.serializerManifest
} else if (seqNr == 2L) {
serializerId shouldBe serializedEvent2.serializerId
manifest shouldBe serializedEvent2.serializerManifest
}
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.persistence.r2dbc.journal

import scala.concurrent.duration._

import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
Expand All @@ -14,7 +15,6 @@ import akka.persistence.r2dbc.TestActors.Persister
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.r2dbc.internal.h2.H2Dialect
import akka.persistence.typed.PersistenceId
import org.scalatest.wordspec.AnyWordSpecLike

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object Dependencies {
val Scala3 = "3.2.2"
val Scala2Versions = Seq(Scala213, Scala212)
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3
val AkkaVersion = System.getProperty("override.akka.version", "2.8.3")
val AkkaVersion = System.getProperty("override.akka.version", "2.8.4+8-1abb713f-SNAPSHOT")
val AkkaVersionInDocs = AkkaVersion.take(3)
val AkkaPersistenceJdbcVersion = "5.2.0" // only in migration tool tests
val AkkaProjectionVersionInDocs = "current"
Expand Down

0 comments on commit ec33272

Please sign in to comment.