diff --git a/temporian/beam/evaluation.py b/temporian/beam/evaluation.py index e0817b8c8..6353f5286 100644 --- a/temporian/beam/evaluation.py +++ b/temporian/beam/evaluation.py @@ -126,33 +126,33 @@ def run_multi_io( data = {**inputs} - num_operators = len(schedule.ordered_operators) - for operator_idx, operator in enumerate(schedule.ordered_operators): - operator_def = operator.definition() + num_steps = len(schedule.steps) + for step_idx, step in enumerate(schedule.steps): + operator_def = step.op.definition() print("=============================", file=sys.stderr) print( - f"{operator_idx+1} / {num_operators}: Run {operator}", + f"{step_idx+1} / {num_steps}: Run {step.op}", file=sys.stderr, ) # Construct operator inputs operator_inputs = { input_key: data[input_node] - for input_key, input_node in operator.inputs.items() + for input_key, input_node in step.op.inputs.items() } # Get Beam implementation implementation_cls = implementation_lib.get_implementation_class( operator_def.key ) - implementation = implementation_cls(operator) + implementation = implementation_cls(step.op) # Add implementation to Beam pipeline operator_outputs = implementation(**operator_inputs) # Collect outputs - for output_key, output_node in operator.outputs.items(): + for output_key, output_node in step.op.outputs.items(): data[output_node] = operator_outputs[output_key] return {output: data[output] for output in outputs} diff --git a/temporian/core/evaluation.py b/temporian/core/evaluation.py index 570ba9f19..2ac79deef 100644 --- a/temporian/core/evaluation.py +++ b/temporian/core/evaluation.py @@ -24,7 +24,7 @@ from temporian.implementation.numpy import evaluation as np_eval from temporian.implementation.numpy.data.event_set import EventSet from temporian.core.graph import infer_graph -from temporian.core.schedule import Schedule +from temporian.core.schedule import Schedule, ScheduleStep from temporian.core.operators.leak import LeakOperator EvaluationQuery = Union[ @@ -143,7 +143,7 @@ def run( if verbose == 1: print( - f"Run {len(schedule.ordered_operators)} operators", + f"Run {len(schedule.steps)} operators", file=sys.stderr, ) @@ -208,7 +208,8 @@ def build_schedule( # Operators ready to be computed (i.e. ready to be added to "planned_ops") # as all their inputs are already computed by "planned_ops" or specified by # "inputs". - ready_ops: Set[Operator] = set() + ready_ops: List[Operator] = [] + ready_ops_set: Set[Operator] = set() # "node_to_op[e]" is the list of operators with node "e" as input. node_to_op: Dict[EventSetNode, List[Operator]] = defaultdict(lambda: []) @@ -222,25 +223,50 @@ def build_schedule( for op in graph.operators: num_pending_inputs = 0 for input_node in op.inputs.values(): + node_to_op[input_node].append(op) if input_node in graph.inputs: # This input is already available continue - node_to_op[input_node].append(op) num_pending_inputs += 1 if num_pending_inputs == 0: # Ready to be scheduled - ready_ops.add(op) + ready_ops.append(op) + ready_ops_set.add(op) else: # Some of the inputs are missing. op_to_num_pending_inputs[op] = num_pending_inputs + # Make evaluation order deterministic. + # + # Execute the op with smallest internal ordered id first. + ready_ops.sort(key=lambda op: op._internal_ordered_id, reverse=True) + # Compute the schedule while ready_ops: # Get an op ready to be scheduled op = ready_ops.pop() + ready_ops_set.remove(op) + + # Nodes released after the op is executed + released_nodes = [] + for input in op.inputs.values(): + if input in outputs: + continue + if input not in node_to_op: + continue + # The list of ops that depends on this input (including the current + # op "op"). + input_usage = node_to_op[input] + input_usage.remove(op) + + if not input_usage: + released_nodes.append(input) + del node_to_op[input] # Schedule the op - schedule.ordered_operators.append(op) + schedule.steps.append( + ScheduleStep(op=op, released_nodes=released_nodes) + ) # Update all the ops that depends on "op". Enlist the ones that are # ready to be computed @@ -256,7 +282,8 @@ def build_schedule( if num_missing_inputs == 0: # "new_op" can be computed - ready_ops.add(new_op) + ready_ops.append(new_op) + ready_ops_set.add(new_op) del op_to_num_pending_inputs[new_op] assert not op_to_num_pending_inputs diff --git a/temporian/core/operators/base.py b/temporian/core/operators/base.py index 72a704401..609169ae3 100644 --- a/temporian/core/operators/base.py +++ b/temporian/core/operators/base.py @@ -72,6 +72,8 @@ def __exit__(self, exc_type, exc_val, traceback): class Operator(ABC): """Interface definition and common logic for operators.""" + next_internal_id: int = 0 + def __init__(self): self._inputs: Dict[str, EventSetNode] = {} self._outputs: Dict[str, EventSetNode] = {} @@ -81,10 +83,18 @@ def __init__(self): attr.key: attr.type for attr in self._definition.attributes } + # Id of the operator object such that an operator A instantiated before + # an operator B will have a smaller _internal_ordered_id. + # + # _internal_ordered_id is used to ensure the deterministic graph + # evaluation. + self._internal_ordered_id = Operator.next_internal_id + Operator.next_internal_id += 1 + def __repr__(self): return ( - f"Operator(key={self.definition().key!r}, id={id(self)!r}," - f" attributes={self.attributes!r})" + f"Operator(key={self.definition().key!r}," + f" id={self._internal_ordered_id}, attributes={self.attributes!r})" ) @property diff --git a/temporian/core/schedule.py b/temporian/core/schedule.py index f5924ceef..c9f59c7b0 100644 --- a/temporian/core/schedule.py +++ b/temporian/core/schedule.py @@ -19,7 +19,15 @@ from temporian.core.operators.base import Operator +@dataclass +class ScheduleStep: + op: Operator + + # List of nodes that will not be used anymore after "op" is executed. + released_nodes: List[EventSetNode] + + @dataclass class Schedule: - ordered_operators: List[Operator] = field(default_factory=list) + steps: List[ScheduleStep] = field(default_factory=list) input_nodes: Set[EventSetNode] = field(default_factory=set) diff --git a/temporian/core/test/evaluation_test.py b/temporian/core/test/evaluation_test.py index fd178a1b3..916bee118 100644 --- a/temporian/core/test/evaluation_test.py +++ b/temporian/core/test/evaluation_test.py @@ -29,13 +29,13 @@ def test_schedule_trivial(self): schedule = evaluation.build_schedule( inputs={a}, outputs={b.outputs["output"]} ) - self.assertEqual(schedule.ordered_operators, [b]) + self.assertEqual(schedule.steps, [evaluation.ScheduleStep(b, [a])]) def test_schedule_empty(self): a = utils.create_input_node() schedule = evaluation.build_schedule(inputs={a}, outputs={a}) - self.assertEqual(schedule.ordered_operators, []) + self.assertEqual(schedule.steps, []) def test_schedule_two_delayed_inputs(self): i1 = utils.create_input_node() @@ -46,9 +46,15 @@ def test_schedule_two_delayed_inputs(self): schedule = evaluation.build_schedule( inputs={i1, i2}, outputs={o3.outputs["output"]} ) - self.assertTrue( - (schedule.ordered_operators == [o2, o1, o3]) - or (schedule.ordered_operators == [o1, o2, o3]) + self.assertEqual( + schedule.steps, + [ + evaluation.ScheduleStep(o1, [i1]), + evaluation.ScheduleStep(o2, [i2]), + evaluation.ScheduleStep( + o3, [o1.outputs["output"], o2.outputs["output"]] + ), + ], ) def test_schedule_basic(self): @@ -62,9 +68,16 @@ def test_schedule_basic(self): inputs={i1, i3}, outputs={o5.outputs["output_1"], o4.outputs["output"]}, ) - self.assertTrue( - (schedule.ordered_operators == [o2, o4, o5]) - or (schedule.ordered_operators == [o4, o2, o5]) + + self.assertEqual( + schedule.steps, + [ + evaluation.ScheduleStep(op=o2, released_nodes=[i1]), + evaluation.ScheduleStep( + op=o4, released_nodes=[o2.outputs["output"], i3] + ), + evaluation.ScheduleStep(op=o5, released_nodes=[]), + ], ) def test_schedule_mid_chain(self): @@ -77,7 +90,35 @@ def test_schedule_mid_chain(self): schedule = evaluation.build_schedule( inputs={o3.outputs["output"]}, outputs={o5.outputs["output"]} ) - self.assertEqual(schedule.ordered_operators, [o4, o5]) + + self.assertEqual( + schedule.steps, + [ + evaluation.ScheduleStep( + op=o4, released_nodes=[o3.outputs["output"]] + ), + evaluation.ScheduleStep( + op=o5, released_nodes=[o4.outputs["output"]] + ), + ], + ) + + def test_schedule_interm_results(self): + i1 = utils.create_input_node() + o2 = utils.OpI1O1(i1) + o3 = utils.OpI1O1(o2.outputs["output"]) + + schedule = evaluation.build_schedule( + inputs={i1}, outputs={o3.outputs["output"], o2.outputs["output"]} + ) + + self.assertEqual( + schedule.steps, + [ + evaluation.ScheduleStep(op=o2, released_nodes=[i1]), + evaluation.ScheduleStep(op=o3, released_nodes=[]), + ], + ) def test_run_value(self): i1 = utils.create_input_node() diff --git a/temporian/implementation/numpy/evaluation.py b/temporian/implementation/numpy/evaluation.py index 23da73d74..a9e576c04 100644 --- a/temporian/implementation/numpy/evaluation.py +++ b/temporian/implementation/numpy/evaluation.py @@ -46,9 +46,9 @@ def run_schedule( """ data = {**inputs} - num_operators = len(schedule.ordered_operators) - for operator_idx, operator in enumerate(schedule.ordered_operators): - operator_def = operator.definition() + num_steps = len(schedule.steps) + for step_idx, step in enumerate(schedule.steps): + operator_def = step.op.definition() # Get implementation implementation_cls = implementation_lib.get_implementation_class( @@ -56,28 +56,25 @@ def run_schedule( ) # Instantiate implementation - implementation = implementation_cls(operator) + implementation = implementation_cls(step.op) if verbose == 1: print( - ( - f" {operator_idx+1} / {num_operators}:" - f" {operator.operator_key()}" - ), + f" {step_idx+1} / {num_steps}: {step.op.operator_key()}", file=sys.stderr, end="", ) elif verbose >= 2: print("=============================", file=sys.stderr) print( - f"{operator_idx+1} / {num_operators}: Run {operator}", + f"{step_idx+1} / {num_steps}: Run {step.op}", file=sys.stderr, ) # Construct operator inputs operator_inputs = { input_key: data[input_node] - for input_key, input_node in operator.inputs.items() + for input_key, input_node in step.op.inputs.items() } if verbose >= 2: @@ -98,11 +95,14 @@ def run_schedule( print(f"Duration: {end_time - begin_time} s", file=sys.stderr) # materialize data in output nodes - for output_key, output_node in operator.outputs.items(): + for output_key, output_node in step.op.outputs.items(): output_evset = operator_outputs[output_key] output_evset._internal_node = output_node data[output_node] = output_evset - # TODO: Only return the required data. - # TODO: Un-allocate not used anymore object. + # Release unused memory + for node in step.released_nodes: + assert node in data + del data[node] + return data