diff --git a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala index 50777e76..38b82de6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala @@ -5,12 +5,14 @@ package akka.persistence.r2dbc.journal import java.time.Instant + import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try + import akka.Done import akka.actor.ActorRef import akka.actor.typed.ActorSystem @@ -24,6 +26,7 @@ import akka.persistence.PersistentRepr import akka.persistence.journal.AsyncWriteJournal import akka.persistence.journal.Tagged import akka.persistence.query.PersistenceQuery +import akka.persistence.query.typed.EventEnvelope.SerializedEvent import akka.persistence.r2dbc.ConnectionFactoryProvider import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.InstantFactory @@ -121,10 +124,14 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends val entityType = PersistenceId.extractEntityType(pr.persistenceId) val slice = persistenceExt.sliceForPersistenceId(pr.persistenceId) - val serialized = serialization.serialize(event).get - val serializer = serialization.findSerializerFor(event) - val manifest = Serializers.manifestFor(serializer, event) - val id: Int = serializer.identifier + val serializedEvent = event match { + case s: SerializedEvent => s // already serialized + case _ => + val bytes = serialization.serialize(event).get + val serializer = serialization.findSerializerFor(event) + val manifest = Serializers.manifestFor(serializer, event) + SerializedEvent(bytes, serializer.identifier, manifest) + } val metadata = pr.metadata.map { meta => val m = meta.asInstanceOf[AnyRef] @@ -142,9 +149,9 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends pr.sequenceNr, timestamp, JournalDao.EmptyDbTimestamp, - Some(serialized), - id, - manifest, + Some(serializedEvent.bytes), + serializedEvent.serializerId, + serializedEvent.serializerManifest, pr.writerUuid, tags, metadata) diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala new file mode 100644 index 00000000..971312eb --- /dev/null +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2022 - 2023 Lightbend Inc. + */ + +package akka.persistence.r2dbc.journal + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.persistence.query.typed.EventEnvelope.SerializedEvent +import akka.persistence.r2dbc.R2dbcSettings +import akka.persistence.r2dbc.TestActors.Persister +import akka.persistence.r2dbc.TestConfig +import akka.persistence.r2dbc.TestData +import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.typed.PersistenceId +import akka.serialization.SerializationExtension +import akka.serialization.Serializers +import org.scalatest.wordspec.AnyWordSpecLike + +class PersistSerializedEventSpec + extends ScalaTestWithActorTestKit(TestConfig.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + + override def typedSystem: ActorSystem[_] = system + private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) + + case class Row(pid: String, seqNr: Long, serializerId: Int, manifest: String) + + "Persist SerializedEvent" should { + + "be stored with given serialization information" in { + val doneProbe = createTestProbe[Done]() + + val entityType = nextEntityType() + val persistenceId = PersistenceId.ofUniqueId(nextPid(entityType)) + val ref = spawn(Persister(persistenceId, Set.empty)) + + // String serialization has no manifest + val event1 = "e1" + val serializer1 = SerializationExtension(system).findSerializerFor(event1) + val serializedEvent1 = + SerializedEvent( + serializer1.toBinary(event1), + serializer1.identifier, + Serializers.manifestFor(serializer1, event1)) + + // Option serialization has manifest + val event2 = Some("e2") + val serializer2 = SerializationExtension(system).findSerializerFor(event2) + val serializedEvent2 = + SerializedEvent( + serializer2.toBinary(event2), + serializer2.identifier, + Serializers.manifestFor(serializer2, event2)) + + ref ! Persister.Persist(serializedEvent1) + ref ! Persister.PersistWithAck(serializedEvent2, doneProbe.ref) + doneProbe.expectMessage(Done) + testKit.stop(ref) + + val ref2 = spawn(Persister(persistenceId, Set.empty)) + val replyProbe = createTestProbe[String]() + ref2 ! Persister.GetState(replyProbe.ref) + replyProbe.expectMessage("e1|Some(e2)") + + val rows = + r2dbcExecutor + .select[Row]("test")( + connection => + connection.createStatement( + s"select * from ${settings.journalTableWithSchema} where persistence_id = '${persistenceId.id}'"), + row => { + Row( + pid = row.get("persistence_id", classOf[String]), + seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]), + serializerId = row.get[java.lang.Integer]("event_ser_id", classOf[java.lang.Integer]), + manifest = row.get("event_ser_manifest", classOf[String])) + }) + .futureValue + + rows.foreach { case Row(pid, seqNr, serializerId, manifest) => + withClue(s"pid [$pid}], seqNr [$seqNr]: ") { + if (seqNr == 1L) { + serializerId shouldBe serializedEvent1.serializerId + manifest shouldBe serializedEvent1.serializerManifest + } else if (seqNr == 2L) { + serializerId shouldBe serializedEvent2.serializerId + manifest shouldBe serializedEvent2.serializerManifest + } + } + } + } + + } +} diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala index 7bde5f30..06c5a8e6 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala @@ -5,6 +5,7 @@ package akka.persistence.r2dbc.journal import scala.concurrent.duration._ + import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -14,7 +15,6 @@ import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle -import akka.persistence.r2dbc.internal.h2.H2Dialect import akka.persistence.typed.PersistenceId import org.scalatest.wordspec.AnyWordSpecLike diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5e0af679..3e5e9416 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -10,7 +10,7 @@ object Dependencies { val Scala3 = "3.2.2" val Scala2Versions = Seq(Scala213, Scala212) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.8.3") + val AkkaVersion = System.getProperty("override.akka.version", "2.8.4+8-1abb713f-SNAPSHOT") val AkkaVersionInDocs = AkkaVersion.take(3) val AkkaPersistenceJdbcVersion = "5.2.0" // only in migration tool tests val AkkaProjectionVersionInDocs = "current"