Skip to content

Commit

Permalink
fix scullxbones#179: ActorRef serialization: transportInformation sho…
Browse files Browse the repository at this point in the history
…uld be set

Signed-off-by: Gaël Bréard <gael.breard@orange.com>
  • Loading branch information
gbrd committed Jan 26, 2018
1 parent 4662e70 commit 64365d6
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
/*
/*
* Copyright (c) 2013-2018 Brian Scully
* Copyright (c) 2018 Gael Breard, Orange: Fix issue #179 about actorRef serialization
*
* Contributions:
* Jean-Francois GUENA: implement "suffixed collection name" feature (issue #39 partially fulfilled)
* ...
Expand Down Expand Up @@ -29,7 +32,9 @@ object CasbahPersistenceSnapshotter {
case o: DBObject =>
obj.put(V2.SERIALIZED, o)
case _ =>
obj.put(V2.SERIALIZED, serialization.serializerFor(classOf[Snapshot]).toBinary(Snapshot(snapshot.snapshot)))
SerializationHelper.withTransportInformation(serialization.system) {
obj.put(V2.SERIALIZED, serialization.serializerFor(classOf[Snapshot]).toBinary(Snapshot(snapshot.snapshot)))
}
}
obj
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2013-2018 Brian Scully
* Copyright (c) 2018 Gael Breard, Orange: Fix issue #179 about actorRef serialization
*/
package akka.contrib.persistence.mongodb

import akka.actor.ActorRef
Expand Down Expand Up @@ -159,17 +163,19 @@ object Payload {
implicit def bytes2payload(buf: Array[Byte]): Bin = Bin(buf, Set.empty[String])

def apply[D](payload: Any, tags: Set[String] = Set.empty)(implicit ser: Serialization, ev: Manifest[D], dt: DocumentType[D], loadClass: LoadClass): Payload = {
payload match {
case tg: Tagged => Payload(tg.payload, tg.tags)
case pr: PersistentRepr => Legacy(pr, tags)
case d: D => Bson(d, tags)
case bytes: Array[Byte] => Bin(bytes, tags)
case str: String => StringPayload(str, tags)
case n: Double => FloatingPointPayload(n, tags)
case n: Long => FixedPointPayload(n, tags)
case b: Boolean => BooleanPayload(b, tags)
case x: AnyRef => Serialized(x, tags)
case x => throw new IllegalArgumentException(s"Type for $x of ${x.getClass} is currently unsupported")
SerializationHelper.withTransportInformation[Payload](ser.system) {
payload match {
case tg: Tagged => Payload(tg.payload, tg.tags)
case pr: PersistentRepr => Legacy(pr, tags)
case d: D => Bson(d, tags)
case bytes: Array[Byte] => Bin(bytes, tags)
case str: String => StringPayload(str, tags)
case n: Double => FloatingPointPayload(n, tags)
case n: Long => FixedPointPayload(n, tags)
case b: Boolean => BooleanPayload(b, tags)
case x: AnyRef => Serialized(x, tags)
case x => throw new IllegalArgumentException(s"Type for $x of ${x.getClass} is currently unsupported")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2018 Gael Breard, Orange: Fix issue #179 about actorRef serialization
*/
package akka.contrib.persistence.mongodb

import akka.actor.ExtendedActorSystem
import akka.serialization.Serialization

object SerializationHelper {
//may be replaced with future API in Serialization : Serialization.withTransportInformation(ser.system)
def withTransportInformation[T](system: ExtendedActorSystem)(f: => T): T = {
val address = system.provider.getDefaultAddress
if (address.hasLocalScope) {
f
} else {
Serialization.currentTransportInformation.withValue(Serialization.Information(address, system)) {
f
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2013-2018 Brian Scully
* Copyright (c) 2018 Gael Breard, Orange: Fix issue #179 about actorRef serialization
*/
package akka.contrib.persistence.mongodb

import akka.actor.{ActorRef, ActorSystem, DynamicAccess, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
Expand Down Expand Up @@ -72,7 +76,9 @@ class RxMongoSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSystem)
case b: BSONDocument =>
b
case _ =>
BSON.write(serialization.serialize(Snapshot(snap.snapshot)).get)
SerializationHelper.withTransportInformation(serialization.system) {
BSON.write(serialization.serialize(Snapshot(snap.snapshot)).get)
}
}
BSONDocument(PROCESSOR_ID -> snap.metadata.persistenceId,
SEQUENCE_NUMBER -> snap.metadata.sequenceNr,
Expand Down

0 comments on commit 64365d6

Please sign in to comment.