Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logical time and lateness #122

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
424 changes: 424 additions & 0 deletions docs/examples/lateness-full.ipynb

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pipit/partitions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright 2022-2023 Parallel Software and Systems Group, University of
# Maryland. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

from .event import Event
from .partition import Partition
from .partition_dag import Partition_DAG
30 changes: 30 additions & 0 deletions pipit/partitions/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Set

# Class to store individual events and info on their matching events

# Each event will contain an index
class Event:
# The event_id is the index of the event in the trace.events DF
# Matching Events is a set of matching communication events
# I.e. MPI Send and MPI Recv

def __init__(self, event_id: int, process: int, start_time: float, end_time: float):
"""
Constructor for Event
@param event_id: int - index of event in trace.events DF
@param process: int - process id (rank)
@param start_time: float - time of the event's 'enter' or 'start'
@param end_time: float - time of the event's 'exit' or 'end'
"""
self.event_id = event_id
self.matching_events = Set[int]
# self.partition_id = None
self.process = process
self.start_time = start_time
self.end_time = end_time

def add_matching_event(self, matching_event_id: int):
self.matching_events.add(matching_event_id)

# def set_partition(self, partition_id: int):
# self.partition_id = partition_id
466 changes: 466 additions & 0 deletions pipit/partitions/leap.py

Large diffs are not rendered by default.

199 changes: 199 additions & 0 deletions pipit/partitions/partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from .event import Event
from typing import Set, List, Dict

class Partition:
# Each Partition is started as a singular event consisting of an MPI operation
# We will later add in computation events

def create_empty_partition(self, partition_id: int):
"""
Constructor for empty partition
"""
self.partition_id: int = partition_id

# event's set
# each id in events correlates to the trace.events df
self.events: Set[Event] = set()

# for all p in parents, p happens before self
self.parents: Set[Partition] = set()
# for all c in children, c happens after self
self.children: Set[Partition] = set()

self.processes: Set[int] = set()


# variables for leap
self.distance = 0
self.min_event_start: float = float('inf')
self.max_event_end: 0
# self.__calc_min_max_time()

# Variables for Tarjan's algorithm
self.visited = False
self.index = -1
self.low_link = -1




def __init__(self, event: Event):
# if event is an int, then we are creating an empty partition
if isinstance(event, int):
self.create_empty_partition(event)
return
self.partition_id: int = event.event_id

# event's set
# each id in events correlates to the trace.events df
self.events: Set[Event] = set()
self.events.add(event)

# for all p in parents, p happens before self
self.parents: Set[Partition] = set()
# for all c in children, c happens after self
self.children: Set[Partition] = set()

self.processes: Set[int] = set()


# variables for leap
self.distance = 0
self.min_event_start: float = float('inf')
self.max_event_end: float = 0
self.__calc_min_max_time()

# Variables for Tarjan's algorithm
self.visited = False
self.index = -1
self.low_link = -1


def __hash__(self) -> int:
return self.partition_id

def __eq__(self, other):
return self.partition_id == other.partition_id

def __ne__(self, other):
return self.partition_id != other.partition_id

def absorb_partition(self, other: "Partition"):
self.events.update(other.events)

self.parents.update(other.parents)
self.children.update(other.children)
self.processes.update(other.processes)

if self in self.parents:
self.parents.remove(self)
if self in self.children:
self.children.remove(self)
if other in self.parents:
self.parents.remove(other)
if other in self.children:
self.children.remove(other)

return self

def add_processes(self, processes: Set[int]):
self.processes.update(processes)

def __calc_min_max_time(self):
if len(self.events) > 0:
self.min_event_start = min([event.start_time for event in self.events])
self.max_event_end = max([event.end_time for event in self.events])

def initialize_for_tarjan(self):
self.visited = False
self.index = -1
self.low_link = -1

def add_event(self, e : Event):
self.event_dict[e.event_id] = e
e.add_partition(self)
self.events.add(e)
self.processes.add(e.process)

if e.start_time < self.min_event_start:
self.min_event_start = e.start_time

if e.end_time > self.max_event_end:
self.max_event_end = e.end_time

@staticmethod
def tarjan_strongly_connected(graph):
""" Tarjan's Algorithm for finding strongly connected components
Parameters
----------
graph : dict
Dictionary of partition_id -> Partition
Returns
-------
components : list
List of strongly connected components
"""
index = 0
stack = []
components = []
visited = set()

def strong_connect(partition):
nonlocal index, stack, components, visited

partition.index = index
partition.low_link = index
index += 1

stack.append(partition)
visited.add(partition.partition_id)

for child_id in partition.get_children():
if child_id not in visited:
child = graph[child_id]
strong_connect(child)
partition.low_link = min(partition.low_link, child.low_link)
elif graph[child_id] in stack:
partition.low_link = min(partition.low_link, graph[child_id].index)

if partition.low_link == partition.index:
component = []
while stack:
top = stack.pop()
component.append(top.partition_id)
if top == partition:
break
components.append(component)

for _,partition in graph.items():
partition.initialize_for_tarjan()

for partition_id, partition in graph.items():
if partition_id not in visited:
strong_connect(partition)

return components

@staticmethod
def merge_strongly_connected_components(graph, components):
""" Merge strongly connected components into one partition
Parameters
----------
graph : dict
Dictionary of partition_id -> Partition
components : list
List of strongly connected components
Returns
-------
merged_graph : dict
Dictionary of partition_id -> Partition
"""
# Note that the original graph is also modified since merge_partition is inplace so one
# should not use the original graph after calling this function
merged_graph = {}
for component in components:
merged_graph[component[0]] = graph[component[0]]
if len(component) > 1:
for partition_id in component[1:]:
merged_graph[component[0]].merge_partition(graph[partition_id])
return merged_graph
Loading
Loading