From 60749c704dfd9850f471a3fe121bd5a9d4215fab Mon Sep 17 00:00:00 2001 From: Aiden Grossman Date: Tue, 31 Dec 2024 02:05:31 +0000 Subject: [PATCH] Add default benchmark scheduler This patch adds a default benchmark scheduler. This scheduler assumes the worker performing the benchmarking has three hyperthreads available inside its CPU mask, with two of them being part of a pair (from the same core). It then selects one core from the pair to be used for the benchmarking subprocess, and selects the core outside of the pair to be used to run the main exegesis/python program. Verification is also added to ensure that nothing external to the program changes the process mask, or at least to catch it early if it does happen. Pull Request: https://github.com/google/gematria/pull/271 --- .../pipelines/benchmark_cpu_scheduler.py | 75 ++++++++++++++ .../pipelines/benchmark_cpu_scheduler_test.py | 99 +++++++++++++++++++ 2 files changed, 174 insertions(+) diff --git a/gematria/datasets/pipelines/benchmark_cpu_scheduler.py b/gematria/datasets/pipelines/benchmark_cpu_scheduler.py index 7989db40..39b52815 100644 --- a/gematria/datasets/pipelines/benchmark_cpu_scheduler.py +++ b/gematria/datasets/pipelines/benchmark_cpu_scheduler.py @@ -14,6 +14,9 @@ import abc from typing_extensions import override +from collections.abc import Iterable +import os +import re class BenchmarkScheduler(metaclass=abc.ABCMeta): @@ -66,3 +69,75 @@ def setup_and_get_benchmark_core(self) -> int | None: @override def verify(self): pass + + +class DefaultBenchmarkScheduler(BenchmarkScheduler): + """A BenchmarkScheduler that schedules processes separately. + + DefaultBenchmarkScheduler schedules the main process and the benchmark + subprocess on separate cores, making sure to reserve the second hyperthread + on the benchmarking core to prevent interference. It expects that the main + process is initially given a CPU Mask with three active threads, additionally + assuming that two of the threads are neighboring (part of the same core). + Errors are raised if these conditions are not met. The benchmarking core + returned is one of the two neighboring threads. The main process has its + COU mask limited to the thread that neighbors neither of the other threads. + """ + + def __init__(self): + self._cpu_mask = [] + + @staticmethod + def _get_neighboring_threads(cpu_index: int) -> list[int]: + with open( + f'/sys/devices/system/cpu/cpu{cpu_index}/topology/thread_siblings_list' + ) as thread_sibling_list_handle: + neighboring_threads_strings = re.split( + r'[-,]+', thread_sibling_list_handle.read().strip() + ) + neighboring_threads = [ + int(cpu_index_str) for cpu_index_str in neighboring_threads_strings + ] + return neighboring_threads + + def _get_aux_core_and_hyperthread_pair( + self, + cpu_mask: Iterable[int], + ) -> tuple[int, list[int]]: + for cpu_index in cpu_mask: + neighboring_threads = self._get_neighboring_threads(cpu_index) + if len(neighboring_threads) != 2: + raise ValueError('Expected two hyperthreads per CPU.') + + if ( + neighboring_threads[0] in cpu_mask + and neighboring_threads[1] in cpu_mask + ): + cpus = list(cpu_mask) + cpus.remove(neighboring_threads[0]) + cpus.remove(neighboring_threads[1]) + return (cpus[0], [neighboring_threads[0], neighboring_threads[1]]) + raise ValueError( + 'Expected a pair of neighboring hyperthreads in the CPU mask.' + ) + + @override + def setup_and_get_benchmark_core(self) -> int | None: + cpu_mask = os.sched_getaffinity(0) + + if len(cpu_mask) != 3: + raise ValueError('Expected to have three CPUs.') + + aux_core, hyperthread_pair = self._get_aux_core_and_hyperthread_pair( + cpu_mask + ) + os.sched_setaffinity(0, [aux_core]) + self._cpu_mask = [aux_core] + + return hyperthread_pair[0] + + @override + def verify(self): + cpu_mask = list(os.sched_getaffinity(0)) + if self._cpu_mask != cpu_mask: + raise ValueError('Expected the CPU mask to not change.') diff --git a/gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py b/gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py index 37190bc3..be092cd0 100644 --- a/gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py +++ b/gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +from collections.abc import Iterable + from absl.testing import absltest from gematria.datasets.pipelines import benchmark_cpu_scheduler @@ -24,6 +27,102 @@ def test_no_scheduling(self): self.assertIsNone(scheduler.setup_and_get_benchmark_core()) scheduler.verify() + def test_default_scheduler_get_neighboring_threads(self): + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + neighboring_threads = scheduler._get_neighboring_threads(0) + + # Just check that we get two CPU ids back that are not the same. We cannot + # do much more without knowing more about the system topology, and this + # should be a reasonable enough test. + self.assertLen(neighboring_threads, 2) + self.assertNotEqual(neighboring_threads[0], neighboring_threads[1]) + + @staticmethod + def _set_normal_affinity(): + cpu_mask = os.sched_getaffinity(0) + cpu_mask_list = list(cpu_mask) + aux_cpu = cpu_mask.pop() + hyperthread_pair_part = cpu_mask.pop() + hyperthread_pair = benchmark_cpu_scheduler.DefaultBenchmarkScheduler._get_neighboring_threads( + hyperthread_pair_part + ) + new_cpu_mask = [aux_cpu, *hyperthread_pair] + + os.sched_setaffinity(0, new_cpu_mask) + return (aux_cpu, hyperthread_pair, cpu_mask_list) + + @staticmethod + def _reset_cpu_affinity(cpu_mask: Iterable[int]): + os.sched_setaffinity(0, cpu_mask) + + def test_default_scheduler_get_cores(self): + expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = ( + self._set_normal_affinity() + ) + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + cpu_mask = os.sched_getaffinity(0) + aux_cpu, hyperthread_pair = scheduler._get_aux_core_and_hyperthread_pair( + cpu_mask + ) + self.assertEqual(aux_cpu, expected_aux_cpu) + self.assertContainsSubsequence(hyperthread_pair, expected_hyperthread_pair) + self._reset_cpu_affinity(old_cpu_mask) + + def test_default_scheduler_get_cores_no_neighboring_threads(self): + cpu_mask = os.sched_getaffinity(0) + three_cores = [cpu_mask.pop(), cpu_mask.pop(), cpu_mask.pop()] + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + with self.assertRaises(ValueError): + scheduler._get_aux_core_and_hyperthread_pair(three_cores) + + def test_default_scheduler_setup(self): + expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = ( + self._set_normal_affinity() + ) + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + benchmark_core = scheduler.setup_and_get_benchmark_core() + self.assertIn(benchmark_core, expected_hyperthread_pair) + set_cpu_mask = os.sched_getaffinity(0) + self.assertLen(set_cpu_mask, 1) + self.assertEqual(set_cpu_mask.pop(), expected_aux_cpu) + + self._reset_cpu_affinity(old_cpu_mask) + + def test_default_scheduler_not_three_cpus(self): + old_cpu_mask = os.sched_getaffinity(0) + cpu_mask_list = list(old_cpu_mask) + os.sched_setaffinity(0, cpu_mask_list[0:2]) + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + with self.assertRaises(ValueError): + scheduler.setup_and_get_benchmark_core() + + os.sched_setaffinity(0, old_cpu_mask) + + def test_default_scheduler_verify(self): + _, _, old_cpu_mask = self._set_normal_affinity() + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + scheduler.setup_and_get_benchmark_core() + scheduler.verify() + + self._reset_cpu_affinity(old_cpu_mask) + + def test_default_scheduler_verify_mask_changed(self): + _, _, old_cpu_mask = self._set_normal_affinity() + + scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler() + scheduler.setup_and_get_benchmark_core() + + cpu_mask_list = list(old_cpu_mask) + os.sched_setaffinity(0, cpu_mask_list[1:3]) + with self.assertRaises(ValueError): + scheduler.verify() + + self._reset_cpu_affinity(old_cpu_mask) + if __name__ == '__main__': absltest.main()