Skip to content

Dynamic Fork Join

yogeshnachnani edited this page Feb 25, 2016 · 2 revisions

As mentioned, Flux also helps model traditional Workflows where a sequence of defined tasks need to be executed one after the other.

A special case of such workflows, is to implement a Dynamic Fork-Join where the number of "tasks" to be executed can be determined only at runtime.

Consider a warehousing use case where a big bag of weight W needs to be broken into small bags of weight less than or equal to w. Each of these bags need to be scheduled to be packaged and shipped.

Any one writing a program would try to execute the following steps

  1. Split the Items in the bag into a list of small bags L

  2. for every Item in L

    2` -> (

    • a.create a new Bag
    • b.pack it
    • c.ship it)
  3. Publish bags list information.

In the above scenario number of creation,pick and pack constructs that need to be exectuted are available only during runtime and depend on W.

The following diagram illustrates the above scenario:

Or, alternatively, the same can be expressed using Flux primitives

Create the initial state:

  /* The splitState is our initial state as shown in the diagram */
  final StateDefinition splitState = new StateDefinition(
      "split bags",
      Collections.<EventDefinition>emptySet(),
      new SplitBagTask()
      );

/* Straight forward task to split a large bag to List<SmallBag> */

class SplitBagTask implements Task {
  public Pair<Event, FluxError> execute(Event... events) {
    List<SmallBag> smallBagList;
    /* Populate smallBagList */
    return new Pair<>(new Event("bags split", asJson(smallBagList)), null);
  }
}

Define a State Machine with only the above initial state:

  final StateMachineDefinition bagProcessingMachine =
    new StateMachineDefinition("BagProcessor","BagProcessor",Collections.emptyList(),"split bags");
  bagProcessingMachine.addStateDefinition(splitState);

Create the "process bags" state that would create new states to process each SmallBag and also create the End State

  /* 
     The next state adds more states to the state machine 
     - The states for processing each SmallBag
     - The end state which would be triggered after each SmallBag is processed
  */
  final StateDefinition processBags = new StateDefinition(
      "process bags",
      Collections.singleton(new EventDefinition("bags split")),
      new ProcessBagsTask()
      );
  bagProcessingMachine.addStateDefinition(processBags);
}

/* This task adds new States to the existing state machine */

class ProcessBagsTask implements Task {
  public Pair<Event, FluxError> execute(Event... events) {
    /* Retrieve smallBagList from event data */
    List<SmallBag> smallBagList = fromJson(events[0].getEventData());
    List<Event> eventList = new ArrayList<>();
    /* Create states to process a single SmallBag instance */
    for (int i = 0; i < smallBagList.size(); i++) {
      FluxContext.retrieveStateMachine().addStateDefinition(new StateDefinition(
            "processBag" + i,
            Collections.singleton(new EventDefinition("processBag" + i)),
            new ProcessBagsTask()
            ));
      eventList.add(new Event("processBag" + i, asJson(smallBagList.get(i))));
    }
    
    /* 
       Create the End state.
       End state will depend on all done_processBag raised by each ProcessBagTask created above 
    */
    Set<EventDefinition> endStateDependencies = formEndStateDependencies("done_processBag", eventList.size());
    FluxContext.retrieveStateMachine().addStateDefinition(new StateDefinition(
          "end state",
          endStateDependencies,
          new MarkCompleteTask()
          ));

    /* Finally, raise a 'forkEvent' in order to trigger transitions to "processBag" states created above */
    
    return new Pair<>(new Event("forkEvent", asJson(eventList)));
    
    /* The forkEvent will be processed by flux. The eventList will be unpacked and processed */
  }
}


public Set<EventDefinition> formEndStateDependencies(String eventNamePrefix, int numberOfEventDefinitions) {
  Set<EventDefinition> dependencies = new HashSet<>();
  for (int i = 0 ; i < numberOfEventDefinitions ; i++ ) {
    dependencies.add(new EventDefinition(eventNamePrefix+i));
   }
   return dependencies;
}

Note: The above example assumes a FluxContext. We're working on a DSL that would provide a way to write the same example without the client having a knowledge of FluxContext

Each small bag will be processed by the following task. Each task raises a done_processBag event upon completion

class ProcessSmallBagTask implements Task {
  public Pair<Event, FluxError> execute(Event... events) {
    /* Actual Task of create + pack + ship */
    String eventName = events[0].getName();
    return new Pair<>(new Event("done_"+eventName), null);
  }
}

Finally, the MarkCompleteTask will be executed upon transitioning to the "end state"

class MarkCompleteTask implements Task {
  public Pair<Event, FluxError> execute(Event... events) {
    /* Mark completion */
    return null;
  }
}
Clone this wiki locally