Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Add restart on failure option for event sourced entities #439

Merged
merged 1 commit into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ private[impl] trait AbstractClientActionContext extends ClientActionContext {

protected def logError(message: String): Unit = ()

final def createClientAction(reply: Optional[JavaPbAny], allowNoReply: Boolean): Option[ClientAction] =
final def createClientAction(reply: Optional[JavaPbAny],
allowNoReply: Boolean,
restartOnFailure: Boolean): Option[ClientAction] =
error match {
case Some(msg) => Some(ClientAction(ClientAction.Action.Failure(Failure(commandId, msg))))
case Some(msg) => Some(ClientAction(ClientAction.Action.Failure(Failure(commandId, msg, restartOnFailure))))
case None =>
if (reply.isPresent) {
if (forward.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService],
ctx.deactivate()
}

val clientAction = ctx.createClientAction(reply, allowNoReply = true)
val clientAction = ctx.createClientAction(reply, allowNoReply = true, restartOnFailure = false)

if (ctx.hasError) {
verifyNoDelta("failed command handling")
Expand Down Expand Up @@ -292,7 +292,7 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService],
context.deactivate()
}

val clientAction = context.createClientAction(reply, allowNoReply = true)
val clientAction = context.createClientAction(reply, allowNoReply = true, restartOnFailure = false)

if (context.hasError) {
subscribers -= id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,15 @@ final class EventSourcedImpl(_system: ActorSystem,
ScalaPbAny.toJavaProto(command.payload.getOrElse(throw ProtocolException(command, "No command payload")))
val metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil))
val context =
new CommandContextImpl(thisEntityId, sequence, command.name, command.id, metadata, service.anySupport, log)
new CommandContextImpl(thisEntityId,
sequence,
command.name,
command.id,
metadata,
service.anySupport,
handler,
service.snapshotEvery,
log)

val reply = try {
handler.handleCommand(cmd, context)
Expand All @@ -201,19 +209,12 @@ final class EventSourcedImpl(_system: ActorSystem,
context.deactivate() // Very important!
}

val clientAction = context.createClientAction(reply, false)
val clientAction = context.createClientAction(reply, false, restartOnFailure = context.events.nonEmpty)

if (!context.hasError) {
// apply events from successful command to local entity state
context.events.zipWithIndex.foreach {
case (event, i) =>
handler.handleEvent(ScalaPbAny.toJavaProto(event), new EventContextImpl(thisEntityId, sequence + i + 1))
}

val endSequenceNumber = sequence + context.events.size
val performSnapshot = (endSequenceNumber / service.snapshotEvery) > (sequence / service.snapshotEvery)
val snapshot =
if (performSnapshot) {
if (context.performSnapshot) {
val s = handler.snapshot(new SnapshotContext with AbstractContext {
override def entityId: String = entityId
override def sequenceNumber: Long = endSequenceNumber
Expand Down Expand Up @@ -264,6 +265,8 @@ final class EventSourcedImpl(_system: ActorSystem,
override val commandId: Long,
override val metadata: Metadata,
val anySupport: AnySupport,
val handler: EventSourcedEntityHandler,
val snapshotEvery: Int,
val log: LoggingAdapter)
extends CommandContext
with AbstractContext
Expand All @@ -272,10 +275,15 @@ final class EventSourcedImpl(_system: ActorSystem,
with ActivatableContext {

final var events: Vector[ScalaPbAny] = Vector.empty
final var performSnapshot: Boolean = false

override def emit(event: AnyRef): Unit = {
checkActive()
events :+= anySupport.encodeScala(event)
val encoded = anySupport.encodeScala(event)
val nextSequenceNumber = sequenceNumber + events.size + 1
handler.handleEvent(ScalaPbAny.toJavaProto(encoded), new EventContextImpl(entityId, nextSequenceNumber))
events :+= encoded
performSnapshot = (snapshotEvery > 0) && (performSnapshot || (nextSequenceNumber % snapshotEvery == 0))
}

override protected def logError(message: String): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,19 @@ class EventSourcedImplSpec extends WordSpec with Matchers with BeforeAndAfterAll
entity.send(command(1, "cart", "AddItem", addItem("foo", "bar", -1)))
entity.expect(actionFailure(1, "Cannot add negative quantity of item [foo]"))
entity.send(command(2, "cart", "GetCart"))
entity.expect(reply(2, EmptyCart)) // check emit-then-fail doesn't change entity state
entity.expect(reply(2, EmptyCart)) // check entity state hasn't changed
entity.passivate()
}
}

"fail action when command handler uses context fail with restart for emitted events" in {
service.expectLogError(
"Fail invoked for command [AddItem] for entity [cart]: Cannot add negative quantity of item [foo]"
) {
val entity = protocol.eventSourced.connect()
entity.send(init(ShoppingCart.Name, "cart"))
entity.send(command(1, "cart", "AddItem", addItem("foo", "bar", -42)))
entity.expect(actionFailure(1, "Cannot add negative quantity of item [foo]", restart = true))
entity.passivate()
val reactivated = protocol.eventSourced.connect()
reactivated.send(init(ShoppingCart.Name, "cart"))
Expand Down Expand Up @@ -271,9 +283,12 @@ object EventSourcedImplSpec {

@CommandHandler
def addItem(item: Shoppingcart.AddLineItem, ctx: CommandContext): Empty = {
// emit and then fail on negative quantities, for testing atomicity
ctx.emit(Protocol.itemAdded(item.getProductId, item.getName, item.getQuantity))
if (item.getQuantity == -42) {
// emit and then fail on magic negative quantity, for testing atomicity
ctx.emit(Protocol.itemAdded(item.getProductId, item.getName, item.getQuantity))
}
if (item.getQuantity <= 0) ctx.fail(s"Cannot add negative quantity of item [${item.getProductId}]")
ctx.emit(Protocol.itemAdded(item.getProductId, item.getName, item.getQuantity))
Empty.getDefaultInstance
}

Expand Down
3 changes: 3 additions & 0 deletions protocols/protocol/cloudstate/entity.proto
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ message Failure {

// A description of the error.
string description = 2;

// Whether this failure should trigger an entity restart.
bool restart = 3;
}

message EntitySpec {
Expand Down
26 changes: 26 additions & 0 deletions proxy/core/src/main/scala/akka/cloudstate/EntityStash.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package akka.cloudstate

import akka.actor.{ActorRef, StashSupport}
import akka.dispatch.Envelope

// Access package-private akka stash methods, for custom command unstashing
object EntityStash {
def unstash(stash: StashSupport, message: Any, sender: ActorRef): Unit =
stash.mailbox.enqueueFirst(stash.self, Envelope(message, sender, stash.context.system))
}
2 changes: 1 addition & 1 deletion proxy/core/src/main/scala/io/cloudstate/proxy/Serve.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ object Serve {
case UserFunctionReply(Some(ClientAction(ClientAction.Action.Forward(_), _)), _, _) =>
log.error("Cannot serialize forward reply, this should have been handled by the UserFunctionRouter")
None
case UserFunctionReply(Some(ClientAction(ClientAction.Action.Failure(Failure(_, message, _)), _)), _, _) =>
case UserFunctionReply(Some(ClientAction(ClientAction.Action.Failure(Failure(_, message, _, _)), _)), _, _) =>
log.error("User Function responded with a failure: {}", message)
throw CommandException(message)
case _ =>
Expand Down
Loading