Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Setting CPU limit for subtasks #4852

Merged
merged 14 commits into from
Nov 6, 2019
Merged
8 changes: 6 additions & 2 deletions golem/docker/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self,
work_dir: str,
output_dir: str,
stats_dir: str,
cpu_limit: Optional[int] = None,
volumes: Optional[Iterable[str]] = None,
environment: Optional[dict] = None,
host_config: Optional[Dict] = None,
Expand Down Expand Up @@ -123,6 +124,8 @@ def __init__(self,
self.logging_thread = None
self.stop_logging_thread = False

self.cpu_limit = cpu_limit

def _prepare(self):
self.work_dir_mod = self._host_dir_chmod(self.work_dir, "rw")
self.resources_dir_mod = self._host_dir_chmod(self.resources_dir, "rw")
Expand Down Expand Up @@ -157,8 +160,9 @@ def _prepare(self):
self.resources_dir, self.output_dir, self.stats_dir)

def _build_stats_entrypoint(self) -> str:
return f'docker-cgroups-stats -o {self.STATS_DIR}/{self.STATS_FILE} ' \
+ self.entrypoint
limit = f'-l {self.cpu_limit} ' if self.cpu_limit else ''
return f'docker-cgroups-stats {limit}' \
f'-o {self.STATS_DIR}/{self.STATS_FILE} ' + self.entrypoint

def _cleanup(self):
if self.container:
Expand Down
5 changes: 4 additions & 1 deletion golem/docker/task_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, # pylint: disable=too-many-arguments
extra_data: Dict,
dir_mapping: DockerDirMapping,
timeout: int,
cpu_limit: Optional[int] = None,
check_mem: bool = False) -> None:

if not docker_images:
Expand All @@ -101,6 +102,7 @@ def __init__(self, # pylint: disable=too-many-arguments
self.job: Optional[DockerJob] = None
self.check_mem = check_mem
self.dir_mapping = dir_mapping
self.cpu_limit = cpu_limit

# pylint:disable=too-many-arguments
@staticmethod
Expand Down Expand Up @@ -202,7 +204,8 @@ def _run_docker_job(self) -> Optional[int]:
stats_dir=str(self.dir_mapping.stats),
volumes=volumes,
environment=environment,
host_config=host_config
host_config=host_config,
cpu_limit=self.cpu_limit
)

with DockerJob(**params) as job, MemoryChecker(self.check_mem) as mc:
Expand Down
8 changes: 4 additions & 4 deletions golem/task/localcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ def _get_task_thread(self, ctd: ComputeTaskDef) -> DockerTaskThread:
temporary=self.tmp_dir,
)
return DockerTaskThread(
ctd['docker_images'],
ctd['extra_data'],
dir_mapping,
0,
docker_images=ctd['docker_images'],
extra_data=ctd['extra_data'],
dir_mapping=dir_mapping,
timeout=0,
check_mem=self.check_mem,
)

Expand Down
3 changes: 2 additions & 1 deletion golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ def start_computation(self) -> None: # pylint: disable=too-many-locals
return

deadline = min(task_header.deadline, subtask_deadline)
cpu_limit = task_header.subtask_budget
task_timeout = deadline_to_timeout(deadline)

unique_str = str(uuid.uuid4())
Expand All @@ -688,7 +689,7 @@ def start_computation(self) -> None: # pylint: disable=too-many-locals
dir_mapping = DockerTaskThread.generate_dir_mapping(resource_dir,
temp_dir)
tt: TaskThread = DockerTaskThread(
docker_images, extra_data, dir_mapping, task_timeout)
docker_images, extra_data, dir_mapping, task_timeout, cpu_limit)
elif self.support_direct_computation:
tt = PyTaskThread(extra_data, resource_dir, temp_dir,
task_timeout)
Expand Down
9 changes: 5 additions & 4 deletions golem/testutils_app_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,11 @@ def _run_test_job(self, task, root_dir, params):
env = task.ENVIRONMENT_CLASS
image = DockerImage(repository=env.DOCKER_IMAGE, tag=env.DOCKER_TAG)

dtt = DockerTaskThread(docker_images=[image],
extra_data=params,
dir_mapping=dir_mapping,
timeout=task.task_definition.subtask_timeout)
dtt = DockerTaskThread(
docker_images=[image],
extra_data=params,
dir_mapping=dir_mapping,
timeout=task.task_definition.subtask_timeout)

logger.info("Running docker image {} on mock provider".format(image))

Expand Down
23 changes: 21 additions & 2 deletions tests/golem/docker/test_docker_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def tearDown(self):
if self.test_dir:
shutil.rmtree(self.test_dir)

def _create_test_job(self, script=TEST_SCRIPT, params=None):
def _create_test_job(self, script=TEST_SCRIPT, params=None, cpu_limit=None):
self.test_job = DockerJob(
image=self.image,
entrypoint=f'python3 {script}',
Expand Down Expand Up @@ -121,7 +121,8 @@ def _create_test_job(self, script=TEST_SCRIPT, params=None):
"mode": "rw"
}
}
})
},
cpu_limit=cpu_limit)
return self.test_job


Expand Down Expand Up @@ -279,6 +280,24 @@ def test_logs_stderr(self):
print(line)
self.assertTrue(line.find("python3: can't open file") != -1)

def test_stats_entrypoint_no_limit(self):
with self._create_test_job(script='/non/existent') as job:
stats_entrypoint = job._build_stats_entrypoint()
self.assertEqual(
stats_entrypoint,
'docker-cgroups-stats '
'-o /golem/stats/stats.json python3 /non/existent'
)

def test_stats_entrypoint_with_limit(self):
with self._create_test_job(script='/non/existent', cpu_limit=1) as job:
stats_entrypoint = job._build_stats_entrypoint()
self.assertEqual(
stats_entrypoint,
'docker-cgroups-stats '
'-l 1 -o /golem/stats/stats.json python3 /non/existent'
)

def test_wait_timeout(self):
src = "import time\ntime.sleep(10)\n"
with open(path.join(self.resources_dir, "custom.py"), "w") as f:
Expand Down