Skip to content

Commit

Permalink
Revert default benchmark scheduler changes.
Browse files Browse the repository at this point in the history
This reverts commit 86a1c06.

I apparently accidentally fixed up google#271 into google#270. This reverts the
changes that accidentally made there way into the tree.

Pull Request: google#276
  • Loading branch information
boomanaiden154 committed Dec 31, 2024
1 parent 09a6cde commit 8eff101
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 174 deletions.
75 changes: 0 additions & 75 deletions gematria/datasets/pipelines/benchmark_cpu_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@

import abc
from typing_extensions import override
from collections.abc import Iterable
import os
import re


class BenchmarkScheduler(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -69,75 +66,3 @@ def setup_and_get_benchmark_core(self) -> int | None:
@override
def verify(self):
pass


class DefaultBenchmarkScheduler(BenchmarkScheduler):
"""A BenchmarkScheduler that schedules processes separately.
DefaultBenchmarkScheduler schedules the main process and the benchmark
subprocess on separate cores, making sure to reserve the second hyperthread
on the benchmarking core to prevent interference. It expects that the main
process is initially given a CPU Mask with three active threads, additionally
assuming that two of the threads are neighboring (part of the same core).
Errors are raised if these conditions are not met. The benchmarking core
returned is one of the two neighboring threads. The main process has its
COU mask limited to the thread that neighbors neither of the other threads.
"""

def __init__(self):
self._cpu_mask = []

@staticmethod
def _get_neighboring_threads(cpu_index: int) -> list[int]:
with open(
f'/sys/devices/system/cpu/cpu{cpu_index}/topology/thread_siblings_list'
) as thread_sibling_list_handle:
neighboring_threads_strings = re.split(
r'[-,]+', thread_sibling_list_handle.read().strip()
)
neighboring_threads = [
int(cpu_index_str) for cpu_index_str in neighboring_threads_strings
]
return neighboring_threads

def _get_aux_core_and_hyperthread_pair(
self,
cpu_mask: Iterable[int],
) -> tuple[int, list[int]]:
for cpu_index in cpu_mask:
neighboring_threads = self._get_neighboring_threads(cpu_index)
if len(neighboring_threads) != 2:
raise ValueError('Expected two hyperthreads per CPU.')

if (
neighboring_threads[0] in cpu_mask
and neighboring_threads[1] in cpu_mask
):
cpus = list(cpu_mask)
cpus.remove(neighboring_threads[0])
cpus.remove(neighboring_threads[1])
return (cpus[0], [neighboring_threads[0], neighboring_threads[1]])
raise ValueError(
'Expected a pair of neighboring hyperthreads in the CPU mask.'
)

@override
def setup_and_get_benchmark_core(self) -> int | None:
cpu_mask = os.sched_getaffinity(0)

if len(cpu_mask) != 3:
raise ValueError('Expected to have three CPUs.')

aux_core, hyperthread_pair = self._get_aux_core_and_hyperthread_pair(
cpu_mask
)
os.sched_setaffinity(0, [aux_core])
self._cpu_mask = [aux_core]

return hyperthread_pair[0]

@override
def verify(self):
cpu_mask = list(os.sched_getaffinity(0))
if self._cpu_mask != cpu_mask:
raise ValueError('Expected the CPU mask to not change.')
99 changes: 0 additions & 99 deletions gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from collections.abc import Iterable

from absl.testing import absltest

from gematria.datasets.pipelines import benchmark_cpu_scheduler
Expand All @@ -27,102 +24,6 @@ def test_no_scheduling(self):
self.assertIsNone(scheduler.setup_and_get_benchmark_core())
scheduler.verify()

def test_default_scheduler_get_neighboring_threads(self):
scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
neighboring_threads = scheduler._get_neighboring_threads(0)

# Just check that we get two CPU ids back that are not the same. We cannot
# do much more without knowing more about the system topology, and this
# should be a reasonable enough test.
self.assertLen(neighboring_threads, 2)
self.assertNotEqual(neighboring_threads[0], neighboring_threads[1])

@staticmethod
def _set_normal_affinity():
cpu_mask = os.sched_getaffinity(0)
cpu_mask_list = list(cpu_mask)
aux_cpu = cpu_mask.pop()
hyperthread_pair_part = cpu_mask.pop()
hyperthread_pair = benchmark_cpu_scheduler.DefaultBenchmarkScheduler._get_neighboring_threads(
hyperthread_pair_part
)
new_cpu_mask = [aux_cpu, *hyperthread_pair]

os.sched_setaffinity(0, new_cpu_mask)
return (aux_cpu, hyperthread_pair, cpu_mask_list)

@staticmethod
def _reset_cpu_affinity(cpu_mask: Iterable[int]):
os.sched_setaffinity(0, cpu_mask)

def test_default_scheduler_get_cores(self):
expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = (
self._set_normal_affinity()
)
scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
cpu_mask = os.sched_getaffinity(0)
aux_cpu, hyperthread_pair = scheduler._get_aux_core_and_hyperthread_pair(
cpu_mask
)
self.assertEqual(aux_cpu, expected_aux_cpu)
self.assertContainsSubsequence(hyperthread_pair, expected_hyperthread_pair)
self._reset_cpu_affinity(old_cpu_mask)

def test_default_scheduler_get_cores_no_neighboring_threads(self):
cpu_mask = os.sched_getaffinity(0)
three_cores = [cpu_mask.pop(), cpu_mask.pop(), cpu_mask.pop()]

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
with self.assertRaises(ValueError):
scheduler._get_aux_core_and_hyperthread_pair(three_cores)

def test_default_scheduler_setup(self):
expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = (
self._set_normal_affinity()
)

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
benchmark_core = scheduler.setup_and_get_benchmark_core()
self.assertIn(benchmark_core, expected_hyperthread_pair)
set_cpu_mask = os.sched_getaffinity(0)
self.assertLen(set_cpu_mask, 1)
self.assertEqual(set_cpu_mask.pop(), expected_aux_cpu)

self._reset_cpu_affinity(old_cpu_mask)

def test_default_scheduler_not_three_cpus(self):
old_cpu_mask = os.sched_getaffinity(0)
cpu_mask_list = list(old_cpu_mask)
os.sched_setaffinity(0, cpu_mask_list[0:2])

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
with self.assertRaises(ValueError):
scheduler.setup_and_get_benchmark_core()

os.sched_setaffinity(0, old_cpu_mask)

def test_default_scheduler_verify(self):
_, _, old_cpu_mask = self._set_normal_affinity()

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
scheduler.setup_and_get_benchmark_core()
scheduler.verify()

self._reset_cpu_affinity(old_cpu_mask)

def test_default_scheduler_verify_mask_changed(self):
_, _, old_cpu_mask = self._set_normal_affinity()

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
scheduler.setup_and_get_benchmark_core()

cpu_mask_list = list(old_cpu_mask)
os.sched_setaffinity(0, cpu_mask_list[1:3])
with self.assertRaises(ValueError):
scheduler.verify()

self._reset_cpu_affinity(old_cpu_mask)


if __name__ == '__main__':
absltest.main()

0 comments on commit 8eff101

Please sign in to comment.