Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler optimization #185

Merged
merged 2 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions temporian/beam/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,33 +126,33 @@ def run_multi_io(

data = {**inputs}

num_operators = len(schedule.ordered_operators)
for operator_idx, operator in enumerate(schedule.ordered_operators):
operator_def = operator.definition()
num_steps = len(schedule.steps)
for step_idx, step in enumerate(schedule.steps):
operator_def = step.op.definition()

print("=============================", file=sys.stderr)
print(
f"{operator_idx+1} / {num_operators}: Run {operator}",
f"{step_idx+1} / {num_steps}: Run {step.op}",
file=sys.stderr,
)

# Construct operator inputs
operator_inputs = {
input_key: data[input_node]
for input_key, input_node in operator.inputs.items()
for input_key, input_node in step.op.inputs.items()
}

# Get Beam implementation
implementation_cls = implementation_lib.get_implementation_class(
operator_def.key
)
implementation = implementation_cls(operator)
implementation = implementation_cls(step.op)

# Add implementation to Beam pipeline
operator_outputs = implementation(**operator_inputs)

# Collect outputs
for output_key, output_node in operator.outputs.items():
for output_key, output_node in step.op.outputs.items():
data[output_node] = operator_outputs[output_key]

return {output: data[output] for output in outputs}
41 changes: 34 additions & 7 deletions temporian/core/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from temporian.implementation.numpy import evaluation as np_eval
from temporian.implementation.numpy.data.event_set import EventSet
from temporian.core.graph import infer_graph
from temporian.core.schedule import Schedule
from temporian.core.schedule import Schedule, ScheduleStep
from temporian.core.operators.leak import LeakOperator

EvaluationQuery = Union[
Expand Down Expand Up @@ -143,7 +143,7 @@ def run(

if verbose == 1:
print(
f"Run {len(schedule.ordered_operators)} operators",
f"Run {len(schedule.steps)} operators",
file=sys.stderr,
)

Expand Down Expand Up @@ -208,7 +208,8 @@ def build_schedule(
# Operators ready to be computed (i.e. ready to be added to "planned_ops")
# as all their inputs are already computed by "planned_ops" or specified by
# "inputs".
ready_ops: Set[Operator] = set()
ready_ops: List[Operator] = []
ready_ops_set: Set[Operator] = set()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to remove ready_ops_set now we got ready_ops. I don't see its purpose.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ready_ops" and "ready_ops_set" contain the same data. Those two containers have different properties and costs (one is a list, the other is a set). The list is great for a FILO, while the set is great to check for presence of an item.

I think it's safe to remove ready_ops_set now we got ready_ops

You mean there is a situation where this code will not work? If so, can you give details?


# "node_to_op[e]" is the list of operators with node "e" as input.
node_to_op: Dict[EventSetNode, List[Operator]] = defaultdict(lambda: [])
Expand All @@ -222,25 +223,50 @@ def build_schedule(
for op in graph.operators:
num_pending_inputs = 0
for input_node in op.inputs.values():
node_to_op[input_node].append(op)
if input_node in graph.inputs:
# This input is already available
continue
node_to_op[input_node].append(op)
num_pending_inputs += 1
if num_pending_inputs == 0:
# Ready to be scheduled
ready_ops.add(op)
ready_ops.append(op)
ready_ops_set.add(op)
else:
# Some of the inputs are missing.
op_to_num_pending_inputs[op] = num_pending_inputs

# Make evaluation order deterministic.
#
# Execute the op with smallest internal ordered id first.
ready_ops.sort(key=lambda op: op._internal_ordered_id, reverse=True)
Copy link
Collaborator

@DonBraulio DonBraulio Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only affects the ops that depend directly on the inputs (current ops in ready_ops) and executes them in instantiation order, right?
I'm not sure why this is better for memory release.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort ensure that the execution order of the operators is deterministic. This has not impact for the result. However, we have unit tests that check some of the internals that were affected by this non-deterministic evaluation. With this change, the unit tests are simpler :)

While the results are not impacted, the order of execution could change the speed and RAM usage of executing the graph. Making the order of execution deterministic reduces the risk of flakiness in resource constraints environment.


# Compute the schedule
while ready_ops:
# Get an op ready to be scheduled
op = ready_ops.pop()
ready_ops_set.remove(op)

# Nodes released after the op is executed
released_nodes = []
for input in op.inputs.values():
if input in outputs:
continue
if input not in node_to_op:
continue
# The list of ops that depends on this input (including the current
# op "op").
input_usage = node_to_op[input]
input_usage.remove(op)

if not input_usage:
released_nodes.append(input)
del node_to_op[input]

# Schedule the op
schedule.ordered_operators.append(op)
schedule.steps.append(
ScheduleStep(op=op, released_nodes=released_nodes)
)

# Update all the ops that depends on "op". Enlist the ones that are
# ready to be computed
Expand All @@ -256,7 +282,8 @@ def build_schedule(

if num_missing_inputs == 0:
# "new_op" can be computed
ready_ops.add(new_op)
ready_ops.append(new_op)
ready_ops_set.add(new_op)
del op_to_num_pending_inputs[new_op]

assert not op_to_num_pending_inputs
Expand Down
14 changes: 12 additions & 2 deletions temporian/core/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def __exit__(self, exc_type, exc_val, traceback):
class Operator(ABC):
"""Interface definition and common logic for operators."""

next_internal_id: int = 0

def __init__(self):
self._inputs: Dict[str, EventSetNode] = {}
self._outputs: Dict[str, EventSetNode] = {}
Expand All @@ -81,10 +83,18 @@ def __init__(self):
attr.key: attr.type for attr in self._definition.attributes
}

# Id of the operator object such that an operator A instantiated before
# an operator B will have a smaller _internal_ordered_id.
#
# _internal_ordered_id is used to ensure the deterministic graph
# evaluation.
self._internal_ordered_id = Operator.next_internal_id
Operator.next_internal_id += 1
Comment on lines +91 to +92
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary? could we instead ensure that graph.operators is always the same order? e.g. by using an ordered set instead of a set in the graph's _operators, _nodes, etc. while building the schedule (ordered set in python is a dict with no values :) ). not sure if this would suffice?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative solution would be to replace some of the set-inputs in the API and intermediate functions by lists. I don't see that as ideal.

The other benefit of this change is that "id" is now a small number that is easier for people to read in error messages. Ideally, I would like to update all the "ids" with this mechanism.

The third benefit is that an "id()" is not guaranteed to be unique for two objects with non-overlapping lifetimes. This is not an issue for our current code, but this is a property that is not great for what we use it for.

See https://docs.python.org/3/library/functions.html#id


def __repr__(self):
return (
f"Operator(key={self.definition().key!r}, id={id(self)!r},"
f" attributes={self.attributes!r})"
f"Operator(key={self.definition().key!r},"
f" id={self._internal_ordered_id}, attributes={self.attributes!r})"
)

@property
Expand Down
10 changes: 9 additions & 1 deletion temporian/core/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@
from temporian.core.operators.base import Operator


@dataclass
class ScheduleStep:
op: Operator

# List of nodes that will not be used anymore after "op" is executed.
released_nodes: List[EventSetNode]


@dataclass
class Schedule:
ordered_operators: List[Operator] = field(default_factory=list)
steps: List[ScheduleStep] = field(default_factory=list)
input_nodes: Set[EventSetNode] = field(default_factory=set)
59 changes: 50 additions & 9 deletions temporian/core/test/evaluation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ def test_schedule_trivial(self):
schedule = evaluation.build_schedule(
inputs={a}, outputs={b.outputs["output"]}
)
self.assertEqual(schedule.ordered_operators, [b])
self.assertEqual(schedule.steps, [evaluation.ScheduleStep(b, [a])])

def test_schedule_empty(self):
a = utils.create_input_node()

schedule = evaluation.build_schedule(inputs={a}, outputs={a})
self.assertEqual(schedule.ordered_operators, [])
self.assertEqual(schedule.steps, [])

def test_schedule_two_delayed_inputs(self):
i1 = utils.create_input_node()
Expand All @@ -46,9 +46,15 @@ def test_schedule_two_delayed_inputs(self):
schedule = evaluation.build_schedule(
inputs={i1, i2}, outputs={o3.outputs["output"]}
)
self.assertTrue(
(schedule.ordered_operators == [o2, o1, o3])
or (schedule.ordered_operators == [o1, o2, o3])
self.assertEqual(
schedule.steps,
[
evaluation.ScheduleStep(o1, [i1]),
evaluation.ScheduleStep(o2, [i2]),
evaluation.ScheduleStep(
o3, [o1.outputs["output"], o2.outputs["output"]]
),
],
)

def test_schedule_basic(self):
Expand All @@ -62,9 +68,16 @@ def test_schedule_basic(self):
inputs={i1, i3},
outputs={o5.outputs["output_1"], o4.outputs["output"]},
)
self.assertTrue(
(schedule.ordered_operators == [o2, o4, o5])
or (schedule.ordered_operators == [o4, o2, o5])

self.assertEqual(
schedule.steps,
[
evaluation.ScheduleStep(op=o2, released_nodes=[i1]),
evaluation.ScheduleStep(
op=o4, released_nodes=[o2.outputs["output"], i3]
),
evaluation.ScheduleStep(op=o5, released_nodes=[]),
],
)

def test_schedule_mid_chain(self):
Expand All @@ -77,7 +90,35 @@ def test_schedule_mid_chain(self):
schedule = evaluation.build_schedule(
inputs={o3.outputs["output"]}, outputs={o5.outputs["output"]}
)
self.assertEqual(schedule.ordered_operators, [o4, o5])

self.assertEqual(
schedule.steps,
[
evaluation.ScheduleStep(
op=o4, released_nodes=[o3.outputs["output"]]
),
evaluation.ScheduleStep(
op=o5, released_nodes=[o4.outputs["output"]]
),
],
)

def test_schedule_interm_results(self):
i1 = utils.create_input_node()
o2 = utils.OpI1O1(i1)
o3 = utils.OpI1O1(o2.outputs["output"])

schedule = evaluation.build_schedule(
inputs={i1}, outputs={o3.outputs["output"], o2.outputs["output"]}
)

self.assertEqual(
schedule.steps,
[
evaluation.ScheduleStep(op=o2, released_nodes=[i1]),
evaluation.ScheduleStep(op=o3, released_nodes=[]),
],
)

def test_run_value(self):
i1 = utils.create_input_node()
Expand Down
26 changes: 13 additions & 13 deletions temporian/implementation/numpy/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,35 @@ def run_schedule(
"""
data = {**inputs}

num_operators = len(schedule.ordered_operators)
for operator_idx, operator in enumerate(schedule.ordered_operators):
operator_def = operator.definition()
num_steps = len(schedule.steps)
for step_idx, step in enumerate(schedule.steps):
operator_def = step.op.definition()

# Get implementation
implementation_cls = implementation_lib.get_implementation_class(
operator_def.key
)

# Instantiate implementation
implementation = implementation_cls(operator)
implementation = implementation_cls(step.op)

if verbose == 1:
print(
(
f" {operator_idx+1} / {num_operators}:"
f" {operator.operator_key()}"
),
f" {step_idx+1} / {num_steps}: {step.op.operator_key()}",
file=sys.stderr,
end="",
)
elif verbose >= 2:
print("=============================", file=sys.stderr)
print(
f"{operator_idx+1} / {num_operators}: Run {operator}",
f"{step_idx+1} / {num_steps}: Run {step.op}",
file=sys.stderr,
)

# Construct operator inputs
operator_inputs = {
input_key: data[input_node]
for input_key, input_node in operator.inputs.items()
for input_key, input_node in step.op.inputs.items()
}

if verbose >= 2:
Expand All @@ -98,11 +95,14 @@ def run_schedule(
print(f"Duration: {end_time - begin_time} s", file=sys.stderr)

# materialize data in output nodes
for output_key, output_node in operator.outputs.items():
for output_key, output_node in step.op.outputs.items():
output_evset = operator_outputs[output_key]
output_evset._internal_node = output_node
data[output_node] = output_evset

# TODO: Only return the required data.
# TODO: Un-allocate not used anymore object.
# Release unused memory
for node in step.released_nodes:
assert node in data
del data[node]

return data