-
Notifications
You must be signed in to change notification settings - Fork 1
Tutorial 9. Actor Behaviors
This chapter covers the broad range of actor behaviors that RestFlow supports. In general, actors take zero or more inputs, and produce zero or more outputs, each time the actor steps. This allows for considerable variety in the behavioral patterns that actors exhibit.
###Dimensions of actor behavior
The following table summarizes the most important ways in which the behavior of actors can vary. Consider each of these dimensions of actor behavior when examining the examples below.
Input cardinalities | How many data items does an actor require on each input each time it steps? Are any of the inputs optional? |
Output cardinalities | How many data items does an actor produce on each ouput each time it steps? Are any of the outputs optional? |
Termination rules | Under what conditions(s) should the system stop stepping the actor during a particular run? |
Determinacy | Does the actor produce the same sequence of outputs in two runs in which it receives the same sequence of inputs? |
Statefulness | Do the outputs or behavior of the actor during one step depend on inputs received during previous steps? |
Note that the remainder of this chapter assumes that a director implementing the data-driven MoC (model of computation) is being employed. The actors below may behave very differently under different MoCs! See Tutorial 7. Choosing a Director for more information about MoCs and directors.
###Functions
The easiest actors to understand are those that behave like functions. These actors require new values on each of their (one or more) inputs for each step, and they produce new values on each output during the same step. The values of such actors' outputs during a particular step depend only on the values of the inputs received during that step.
An example is this Multiplier actor:
- id: Multiplier
type: GroovyActor
properties:
step: product=a*b
inputs:
a:
b:
outputs:
product:
This actor steps whenever the corresponding workflow node receives one new value for each of a and b (cardinality of 1 for each input). The actor computes product deterministically from its inputs, and always outputs a new product when the actor steps (cardinality 1 for product). The actor is deterministic. Given the same values for a and b in the future, this actor will assign the same value to product.
Actors that behave like functions will step once for each set of input data provided to them.
###Stateless source actors
Actors that take no inputs but do produce outputs are considered sources. The key question here is this: how many times will a source actor step? The answer is that by default it will step once for each set of values assigned to the node via the constants or sequences properties. The empty set is considered one set, so if no constants or sequences are listed for the node then the actor will step once.
For example, the following node has no inflows and no constants or sequences. So the inlined actor will step once during a run.
- id: SayHello
type: GroovyActorNode
properties:
actor.step: message = "Hello!"
outflows:
message: /message/
Similarly, the inlined actor below will step once during a run, because the node is configured with one constant value:
- id: SayGoodMorning
type: GroovyActorNode
properties:
actor.step: message = greeting
constants:
greeting: Good morning!
outflows:
message: /message/
Finally, the inlined actor in this third example will step three times during a run, because the node is configured with a sequence of three greeting values:
- id: SayThreeGreetings
type: GroovyActorNode
properties:
actor.step: message = greeting
sequences:
greeting:
- Good morning!
- Hello!
- Good night!
outflows:
message: /message/
Stateless source actors are useful for providing a sequence of constants to other actors in the workflow.
###Stateful source actors
The exception to the above rule for source actors applies when an actor is stateful. If an actor is stateless, the director can easily determine how many times the actor should be stepped based on the number of unique value the node can provide. If the actor is stateful, however, then the director cannot know in advance how many times the actor should be stepped, because the actor may produce different outputs even though it has not been provided with different inputs. The actor itself must indicate somehow when it has completed its operations for this run.
Consider the following definition of the Ramp actor. Ramp counts from min to max in steps of inc. It outputs the next number in the sequence each time the actor steps.
- id: Ramp
type: GroovyActor
properties:
initialize: |
nextNumber = min
step: |
if (nextNumber <= max) {
number = nextNumber
nextNumber = nextNumber + inc
} else {
_status.setOutputEnable('number', false);
}
inputs:
min:
default: 1
max:
default: 1
inc:
default: 1
outputs:
number:
state:
nextNumber:
The CountToTen node declared below uses the Ramp actor to generate the first 10 counting numbers. Configured as it is with only one set of constants, one would expect from the default rule explained above that CountToTen would step the Ramp actor only once during a run. In fact, the node will step the actor 10 times and then stop.
- id: CountToTen
type: Node
properties:
actor: !ref Ramp
constants:
min: 1
max: 10
outflows:
number: /number/
endFlowOnNoOutput: true
CountToTen steps Ramp more than once because the actor declares that it maintains state in the nextNumber variable (last two line of the Ramp actor declaration). A stateful source actor will step indefinitely unless the actor indicates somehow that it should stop. One way an actor can do this is to disable its outputs. Groovy actors can do this by calling the setOutputEnable method on the _status object as shown above.
Disabling an output does not always mean that the actor should not be stepped again. In general it means that the actor produced no value on that output during that step of the actor. This is useful when writing filter actors, as we will see in the next section.
However, a node using a stateful actor can disable an actor output as indication that the actor should not step again. This is indicated by setting the endFlowOnNoOutput property on the node to true (final line of CountToTen node declaration). So in the above example, it is the actor declaring itself to be stateful that allows the actor to step more than once during the run; and it is the combination of the actor disabling its output when it is done counting, and the node setting the endFlowOnNoOutput property to true, that keeps the actor from stepping forever. The sample workflow counttoten.yaml demonstrates this actor.
Stateful source actors are useful for importing data from sources outside the workflow. For example, a TextFileLineReader actor easily could be designed to output one line of a text file each time it steps, and to disable its output when the end of the file is reached.
###Optional outputs: filter and distributor actors
Filters are good examples of actors that do not always output data each time they are stepped. The BandpassFilter actor below takes a number (inNumber) as input each time it is stepped, and outputs the same number if it is less than or equal to max, and greater or equal to min. Otherwise it outputs nothing during that step.
- id: BandpassFilter
type: GroovyActor
properties:
step: |
if ( inNumber >= min && inNumber <= max) {
outNumber = inNumber;
} else {
_status.setOutputEnable('outNumber', false);
}
inputs:
min:
max:
inNumber:
outputs:
outNumber:
Similarly, the CutoffAtMinimum actor below distributes input values to either the accept or reject output depending on whether they are above or below the minimum value:
- id: CutoffAtMinimum
type: GroovyActor
properties:
step: |
if (number >= minimum) {
accept = number;
_status.setOutputEnable('reject', false);
} else {
reject = number;
_status.setOutputEnable('accept', false);
}
inputs:
number:
minimum:
outputs:
accept:
reject:
###Merging input flows using optional inputs
Setting and output to null during a step is a way for actors to not produce anything on a particular output. Actors can also choose not to accept particular inputs during a step. This is useful, for example, when deterministically merging two input data flows:
- id: IntegerStreamSortedMerger
type: GroovyActor
properties:
initialize: |
_status.enableInput("a");
_status.enableInput("b");
step: |
if (a == null) {
out = b;
_status.enableInput('b');
return;
}
if (b == null) {
out = a;
_status.enableInput('a');
return;
}
if ( a < b) {
out = a;
_status.enableInput('a');
return;
}
if (b < a) {
out = b;
_status.enableInput('b');
return;
}
out = a;
_status.enableInput('a');
if (!retainDuplicates) {
_status.enableInput('b');
}
inputs:
a:
optional: true
nullable: true
defaultReadiness: false
b:
optional: true
nullable: true
defaultReadiness: false
retainDuplicates:
default: false
outputs:
out:
This actor receives inputs on a and b (presumably retainDuplicates either is left at the default value or set by a parameter on the node using the actor) and outputs the input values one at a time in sorted order (smaller values first). The assumption is that input streams are each sorted already, and that this actor needs only to merge those two sorted streams while maintaining the sort order.
Examine the third block of code for the step property. What it does is to compare the latest values received on a and b. If a is less than b, then it outputs a. It then sets a flag indicating that a new value for a should be provided to the actor during the next step. In other words, there will be a new value for a during the next step (replacing the value output during this step), but the current value of b will be retained.
The next code block outputs b if it is less than a, and indicates that a new value for b is required for the next step.
When a and b are equal, the remainder of the step code comes into play. The value of a (equal to b) is output. By default both a and b will be replaced during the next step, but the final block of code allows one to retain duplicates if retainDuplicates is set to true.
The first three blocks in step deal with the situations that arise once the node inflows providing values to the actor have received their final values. The actor handles the cases where the two streams of integers to be merged are of different lengths. In such situations, the actor will be stepped one or more times with a value of null for the input receiving the shorter stream. The strategy is to output b if a is null (no value received for a), and vice versa.
###Data flow loops
The example workflow below, loop.yaml, combines an adder (AddOne) and a filter (LowPassFilter) in a data-flow loop to compute the first 10 counting numbers. Note that the AddOne outflow is wired to the LowPassfilter inflow, and the LowPassFilter outflow to the AddOne inflow. Meanwhile, the Print node also takes the outflow of LowPassFilter as its inflow:
imports:
- classpath:/common/types.yaml
- classpath:/common/directors.yaml
components:
- id: Loop
type: Workflow
properties:
director: !ref DataDrivenDirector
nodes:
- !ref AddOne
- !ref LowPassFilter
- !ref Print
- id: AddOne
type: GroovyActorNode
properties:
actor.step: |
outNumber = inNumber + 1
initialValues:
inNumber: 0
inflows:
inNumber: /filtered/
outflows:
outNumber: /numbers/
- id: LowPassFilter
type: GroovyActorNode
properties:
endFlowOnNoOutput: true
actor.step: |
if (inNumber <= max) {
outNumber = inNumber
} else {
_status.setOutputEnable('outNumber', false);
}
constants:
max: 10
inflows:
inNumber: /numbers/
outflows:
outNumber: /filtered/
- id: Print
type: GroovyActorNode
properties:
actor.step: |
println number
inflows:
number: /filtered/
Running the workflow gives the expected results:
$ restflow -f loop.yaml
1
2
3
4
5
6
7
8
9
10
$
How does the loop start in the first place? Note that AddOne provides an initialValue property for inNumber, in addition to binding inNumber to the /filtered/ inflow. When an initialValue property and an inflow both provide values to the same input variable of an actor, the former is used as input for the first step of the actor, and values from the inflow for subsequent actor steps. This allows you to 'prime the pump' and so get the loop started. An alternative would be to use optional inputs as demonstrated by the IntegerStreamSortedMerger actor in the previous section, but this tends to be more complicated and less reusable. Here, the AddOne node could easily employ a generic addition actor defined elsewhere, and there would no need for that actor to be adapted for use in a loop.
###The Hamming Sequence
The final example in this chapter, hamming.yaml, combines three simple multipliers, two nodes that use the IntegerStreamSortedMerger actor discussed above, and one low-pass filter to compute the Hamming sequence up to 60. The Hamming sequence comprises the regular numbers, i.e. the integers factorable to powers of 2, 3, and 5. This workflow exploits most of the sophisticated actor behaviors discussed above.
imports:
- classpath:/common/types.yaml
- classpath:/common/directors.yaml
components:
- id: Hamming
type: Workflow
properties:
director: !ref DataDrivenDirector
nodes:
- !ref MultiplyByTwo
- !ref MultiplyByThree
- !ref MultiplyByFive
- !ref MergeProductsFiveAndThree
- !ref MergeProductsFiveThreeAndTwo
- !ref DiscardProductsGreaterThanMax
- !ref RenderHammingSequence
- id: MultiplyByTwo
type: GroovyActorNode
properties:
actor.step: |
product = a * 2
inflows:
a: /merged/filtered/
outflows:
product: /products/two/
- id: MultiplyByThree
type: GroovyActorNode
properties:
actor.step: |
product = a * 3
inflows:
a: /merged/filtered/
outflows:
product: /products/three/
- id: MultiplyByFive
type: GroovyActorNode
properties:
actor.step: |
product = a * 5
inflows:
a: /merged/filtered/
outflows:
product: /products/five/
- id: MergeProductsFiveAndThree
type: Node
properties:
actor: !ref IntegerStreamSortedMerger
constants:
retainDuplicates: false
inflows:
a: /products/five/
b: /products/three/
outflows:
out: /merged/five_three/
- id: MergeProductsFiveThreeAndTwo
type: Node
properties:
actor: !ref IntegerStreamSortedMerger
constants:
retainDuplicates: false
inflows:
a: /merged/five_three/
b: /products/two/
outflows:
out: /merged/five_three_two/
- id: DiscardProductsGreaterThanMax
type: GroovyActorNode
properties:
endFlowOnNoOutput: true
actor.step: |
if (inNumber <= max) {
outNumber = inNumber
} else {
_status.setOutputEnable('outNumber', false);
}
constants:
max: 60
initialValues:
inNumber: 1
inflows:
inNumber: /merged/five_three_two/
outflows:
outNumber: /merged/filtered/
- id: RenderHammingSequence
type: GroovyActorNode
properties:
actor.step: |
println number
inflows:
number: /merged/filtered/
- id: IntegerStreamSortedMerger
type: GroovyActor
properties:
initialize: |
_status.enableInput("a");
_status.enableInput("b");
step: |
if (a == null) {
out = b;
_status.enableInput('b');
return;
}
if (b == null) {
out = a;
_status.enableInput('a');
return;
}
if ( a < b) {
out = a;
_status.enableInput('a');
return;
}
if (b < a) {
out = b;
_status.enableInput('b');
return;
}
out = a;
_status.enableInput('a');
if (!retainDuplicates) {
_status.enableInput('b');
}
inputs:
a:
optional: true
nullable: true
defaultReadiness: false
b:
optional: true
nullable: true
defaultReadiness: false
retainDuplicates:
default: false
outputs:
out:
Running the workflow correctly produces the following:
$ restflow -f hamming.yaml
1
2
3
4
5
6
8
9
10
12
15
16
18
20
24
25
27
30
32
36
40
45
48
50
54
60
$
Note that all of the code implementing the actors in this workflow are included in hamming.yaml, and all are inlined except for the two merging actors.