Skip to content

Actor Registration protobuf updated #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 74 additions & 7 deletions protobuf/eigr/actor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,95 @@ message ActorSnapshotStrategy {
}

// A strategy which a user function's entity is passivated.
message ActorDeactivateStrategy {
message ActorDeactivationStrategy {
oneof strategy {
// the timeout strategy.
TimeoutStrategy timeout = 1;
}
}

// A strategy based on a timeout.
// A strategy based on a timeout.
message TimeoutStrategy {
// The timeout in millis
int64 timeout = 1;
}

// A command represents an action that the user can perform on an Actor.
// Commands in supporting languages are represented by functions or methods.
// An Actor command has nothing to do with the semantics of Commands in a CQRS/EventSourced system.
// It just represents an action that supporting languages can invoke.
message Command {

// The name of the function or method in the supporting language that has been registered in Ator.
string name = 1;
}

// A FixedTimerCommand is similar to a regular Command, its main differences are that it is scheduled to run at regular intervals
// and only takes the actor's state as an argument.
// Timer Commands are good for executing loops that manipulate the actor's own state.
// In Elixir or other languages in BEAM it would be similar to invoking Process.send_after(self(), atom, msg, timeout)
message FixedTimerCommand {

// The time to wait until the command is triggered
int32 seconds = 1;

// See Command description Above
Command command = 2;
}

message ActorState {
map<string, string> tags = 1;
google.protobuf.Any state = 2;
}

message Actor {
string name = 1;
// TODO doc here
message Metadata {
// A channel group represents a way to send commands to various actors
// that belong to a certain semantic group.
string channel_group = 1;

map<string, string> tags = 2;
}

message ActorSettings {

// Indicates if actor´s is abstract or non abstract.
bool abstract = 1;

// Indicates whether an actor's state should be persisted in a definitive store.
bool persistent = 2;
ActorState state = 3;
ActorSnapshotStrategy snapshot_strategy = 4;
ActorDeactivateStrategy deactivate_strategy = 5;

// Snapshot strategy
ActorSnapshotStrategy snapshot_strategy = 3;

// Deactivate strategy
ActorDeactivationStrategy deactivation_strategy = 4;
}

message ActorId {
// The name of a Actor Entity.
string name = 1;

// Name of a ActorSystem
string system = 2;
}

message Actor {
// Actor Identification
ActorId id = 1;

// A Actor state.
ActorState state = 2;

// Actor metadata
Metadata metadata = 6;

// Actor settings.
ActorSettings settings = 3;

// The commands registered for an actor
repeated Command commands = 4;

// The registered timer commands for an actor.
repeated FixedTimerCommand timer_commands = 5;
}
100 changes: 87 additions & 13 deletions protobuf/eigr/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ import "google/protobuf/any.proto";
option java_package = "io.eigr.functions.protocol";
option go_package = "github.com/eigr/go-support/eigr/protocol;protocol";

message SpawnRequest {
eigr.actors.ActorSystem actor_system = 2;
}

message SpawnResponse {
RequestStatus status = 1;
}

message RegistrationRequest {

ServiceInfo service_info = 1;
Expand Down Expand Up @@ -144,7 +152,7 @@ message ProxyInfo {
int32 protocol_minor_version = 2;

string proxy_name = 3;

string proxy_version = 4;
}

Expand All @@ -155,29 +163,91 @@ message RegistrationResponse {
ProxyInfo proxy_info = 2;
}

// Context is where current and/or updated state is stored
// Context is where current and/or updated state is stored
// to be transmitted to/from proxy and user function
//
// Params:
// * state: Actor state passed back and forth between proxy and user function.
// * state: Actor state passed back and forth between proxy and user function.
message Context {

google.protobuf.Any state = 1;
}

// When a Host Function is invoked it returns the updated state and return value to the call.
// It can also return a number of side effects to other Actors as a result of its computation.
// These side effects will be forwarded to the respective Actors asynchronously and should not affect the Host Function's response to its caller.
// Internally side effects is just a special kind of InvocationRequest.
// Useful for handle handle `recipient list` and `Composed Message Processor` patterns:
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/RecipientList.html
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/DistributionAggregate.html
message SideEffect {
InvocationRequest request = 1;
}

// Broadcast a message to many Actors
// Useful for handle `recipient list`, `publish-subscribe channel`, and `scatter-gatther` patterns:
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/RecipientList.html
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html
message Broadcast {
// Channel of target Actors
string channel_group = 1;

// Command. Only Actors that have this command will run successfully
string command_name = 2;

// Payload
google.protobuf.Any value = 3;
}

// Sends the output of a command of an Actor to the input of another command of an Actor
// Useful for handle `pipes` pattern:
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/PipesAndFilters.html
message Pipe {
// Target Actor
string actor = 1;

// Command.
string command_name = 2;
}

// Sends the input of a command of an Actor to the input of another command of an Actor
// Useful for handle `content-basead router` pattern
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html
message Forward {
// Target Actor
string actor = 1;

// Command.
string command_name = 2;
}

// Container for archicetural message patterns
message Workflow {

Broadcast broadcast = 2;

repeated SideEffect effects = 1;

oneof routing {
Pipe pipe = 3;
Forward forward = 4;
}
}

// The user function when it wants to send a message to an Actor uses the InvocationRequest message type.
//
// Params:
// * system: See ActorStstem message.
// * actor: The target Actor, i.e. the one that the user function is calling to perform some computation.
// * command_name: The function or method on the target Actor that will receive this request
// * command_name: The function or method on the target Actor that will receive this request
// and perform some useful computation with the sent data.
// * value: This is the value sent by the user function to be computed by the request's target Actor command.
// * async: Indicates whether the command should be processed synchronously, where a response should be sent back to the user function,
// * async: Indicates whether the command should be processed synchronously, where a response should be sent back to the user function,
// or whether the command should be processed asynchronously, i.e. no response sent to the caller and no waiting.
message InvocationRequest {

eigr.actors.ActorSystem system =1;
eigr.actors.ActorSystem system = 1;

eigr.actors.Actor actor = 2;

Expand All @@ -188,16 +258,16 @@ message InvocationRequest {
bool async = 5;
}

// ActorInvocation is a translation message between a local invocation made via InvocationRequest
// ActorInvocation is a translation message between a local invocation made via InvocationRequest
// and the real Actor that intends to respond to this invocation and that can be located anywhere in the cluster.
//
// Params:
// actor_name: The name of the Actor handling the InvocationRequest request, also called the target Actor.
// actor_system: The name of ActorSystem registered in Registration step.
// command_name: The function or method on the target Actor that will receive this request
// command_name: The function or method on the target Actor that will receive this request
// and perform some useful computation with the sent data.
// current_context: The current Context with current state value of the target Actor.
// That is, the same as found via matching in %Actor{name: target_actor, state: %ActorState{state: value} = actor_state}.
// current_context: The current Context with current state value of the target Actor.
// That is, the same as found via matching in %Actor{name: target_actor, state: %ActorState{state: value} = actor_state}.
// In this case, the Context type will contain in the value attribute the same `value` as the matching above.
// value: The value to be passed to the function or method corresponding to command_name.
message ActorInvocation {
Expand All @@ -219,7 +289,7 @@ message ActorInvocation {
// actor_name: The name of the Actor handling the InvocationRequest request, also called the target Actor.
// actor_system: The name of ActorSystem registered in Registration step.
// updated_context: The Context with updated state value of the target Actor after user function has processed a request.
// value: The value that the original request proxy will forward in response to the InvocationRequest type request.
// value: The value that the original request proxy will forward in response to the InvocationRequest type request.
// This is the final response from the point of view of the user who invoked the Actor call and its subsequent processing.
message ActorInvocationResponse {

Expand All @@ -230,13 +300,15 @@ message ActorInvocationResponse {
Context updated_context = 3;

google.protobuf.Any value = 4;

Workflow workflow = 5;
}

// InvocationResponse is the response that the proxy that received the InvocationRequest request will forward to the request's original user function.
//
// Params:
// status: Status of request. Could be one of [UNKNOWN, OK, ACTOR_NOT_FOUND, ERROR].
// sytem: The original ActorSystem of the InvocationRequest request.
// system: The original ActorSystem of the InvocationRequest request.
// actor: The target Actor originally sent in the InvocationRequest message.
// value: The value resulting from the request processing that the target Actor made.
// This value must be passed by the user function to the one who requested the initial request in InvocationRequest.
Expand Down Expand Up @@ -267,4 +339,6 @@ message RequestStatus {
Status status = 1;

string message = 2;
}
}


46 changes: 38 additions & 8 deletions spawn/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
"""
from spawn.eigr.actor_pb2 import (
Actor,
ActorDeactivateStrategy,
ActorSnapshotStrategy,
ActorId,
ActorState,
Metadata,
ActorSettings,
Command,
FixedTimerCommand,
ActorSnapshotStrategy,
ActorDeactivationStrategy,
ActorSystem,
Registry,
TimeoutStrategy,
Expand Down Expand Up @@ -59,18 +64,43 @@ def register(self, actors: List[ActorEntity]):

actor_state = ActorState()

deactivate_strategy = ActorDeactivateStrategy()
deactivate_strategy = ActorDeactivationStrategy()
deactivate_strategy.timeout.CopyFrom(deactivate_timeout_strategy)

snaphot_strategy = ActorSnapshotStrategy()
snaphot_strategy.timeout.CopyFrom(snaphot_timeout_strategy)

actor_01 = Actor()
actor_01.name = "user_actor_01"
actor_01.persistent = True

actor_id = ActorId()
actor_id.name = "user_actor_01"
actor_id.system = "spawn-system"

actor_01.id.CopyFrom(actor_id)

actor_01.state.CopyFrom(actor_state)
actor_01.deactivate_strategy.CopyFrom(deactivate_strategy)
actor_01.snapshot_strategy.CopyFrom(snaphot_strategy)

actor_metatdata = Metadata()
actor_metatdata.channel_group = "spawn-python"
actor_metatdata.tags["actor"] = "user_actor_01"

actor_01.metadata.CopyFrom(actor_metatdata)

actor_settings = ActorSettings()
actor_settings.abstract = True
actor_settings.persistent = True
actor_settings.snapshot_strategy.CopyFrom(snaphot_strategy)
actor_settings.deactivation_strategy.CopyFrom(deactivate_strategy)

actor_01.settings.CopyFrom(actor_settings)

actor_command = actor_01.commands.add()
actor_command.name = ""

actor_fixed_timer_command = actor_01.timer_commands.add()

actor_fixed_timer_command.seconds = 1
actor_fixed_timer_command.command.CopyFrom(actor_command)

registry = Registry()
registry.actors.get_or_create("user_actor_01").CopyFrom(actor_01)
Expand Down Expand Up @@ -110,4 +140,4 @@ def register(self, actors: List[ActorEntity]):
logging.info("Actors register response %s", resp)
except Exception as e:
logging.error("ERROR: %s", e)
logging.error("Shit %s", e.__cause__)
logging.error("Shit %s", e.__cause__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it best practice to have a newline of every line?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be deleted

Loading