Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server explains the reasons to the worker if no tasks are available #1582

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
48 changes: 41 additions & 7 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import configparser
import copy
import enum
import math
import os
import random
Expand Down Expand Up @@ -41,6 +42,32 @@
last_rundb = None


# Just a bit of low-maintenance, easy-to-read bookkeeping
class _RequestTaskErrors(enum.Flag):
MachineLimit = enum.auto()
LowThreads = enum.auto()
HighThreads = enum.auto()
LowMemory = enum.auto()
NoBinary = enum.auto()
SkipSTC = enum.auto()
ServerSide = enum.auto()

# __private_names are not enum-ized
# These messages refer to worker command line options, and so need to be updated as the worker is.
__messages = {
MachineLimit: "This user has reached the max machines limit.",
LowThreads: "An available run requires more than CONCURRENCY threads.",
HighThreads: "An available run requires less than MIN_THREADS threads.",
LowMemory: "An available run requires more than MAX_MEMORY memory.",
NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run.",
SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord.",
ServerSide: "Server error or no active runs. Try again shortly.",
}

def __str__(self):
return self.__messages[self]


def get_port():
params = {}
args = sys.argv
Expand Down Expand Up @@ -672,10 +699,11 @@ def request_task(self, worker_info):
self.task_semaphore.release()
else:
print("request_task too busy", flush=True)
return {"task_waiting": False}
return {"error_msg": _msg_sep + str(_RequestTaskErrors.ServerSide)}

def sync_request_task(self, worker_info):
unique_key = worker_info["unique_key"]
_msg_sep = "\n - "

# We get the list of unfinished runs.
# To limit db access the list is cached for
Expand Down Expand Up @@ -751,16 +779,15 @@ def priority(run): # lower is better
connections = connections + 1

if connections >= self.userdb.get_machine_limit(worker_info["username"]):
error = "Request_task: Machine limit reached for user {}".format(
worker_info["username"]
)
error = f'Request_task: Machine limit reached for user {worker_info["username"]}'
print(error, flush=True)
return {"task_waiting": False, "error": error}
return {"error_msg": _msg_sep + str(_RequestTaskErrors.MachineLimit)}

# Now go through the sorted list of unfinished runs.
# We will add a task to the first run that is suitable.

run_found = False
errors = _RequestTaskErrors(0) # Ignored when run_found

for run in self.task_runs:
if run["finished"]:
Expand All @@ -770,9 +797,11 @@ def priority(run): # lower is better
continue

if run["args"]["threads"] > max_threads:
errors |= _RequestTaskErrors.LowThreads
continue

if run["args"]["threads"] < min_threads:
errors |= _RequestTaskErrors.HighThreads
continue

# Check if there aren't already enough workers
Expand All @@ -799,14 +828,15 @@ def priority(run): # lower is better
need_tt += get_hash(run["args"]["new_options"])
need_tt += get_hash(run["args"]["base_options"])
need_tt *= max_threads // run["args"]["threads"]
# estime another 10MB per process, 30MB per thread, and 40MB for net as a base memory need besides hash
# estimate another 10MB per process, 30MB per thread, and 40MB for net as a base memory need besides hash
need_base = (
2
* (max_threads // run["args"]["threads"])
* (10 + 40 + 30 * run["args"]["threads"])
)

if need_base + need_tt > max_memory:
errors |= _RequestTaskErrors.LowMemory
continue

# Github API limit...
Expand All @@ -816,6 +846,7 @@ def priority(run): # lower is better
and run["_id"] in self.worker_runs[unique_key]
)
if not have_binary:
errors |= _RequestTaskErrors.NoBinary
continue

# To avoid time losses in the case of large concurrency and short TC,
Expand All @@ -833,6 +864,7 @@ def priority(run): # lower is better
< 1.0
)
if tc_too_short:
errors |= _RequestTaskErrors.SkipSTC
continue

# Limit the number of cores.
Expand Down Expand Up @@ -861,7 +893,9 @@ def priority(run): # lower is better

# If there is no suitable run, tell the worker.
if not run_found:
return {"task_waiting": False}
if not errors: # No active tasks whatsoever, no fault of the worker
errors = _RequestTaskErrors.ServerSide
return {"error_msg": _msg_sep + _msg_sep.join(str(e) for e in errors)}

# Now we create a new task for this run.
opening_offset = 0
Expand Down
2 changes: 1 addition & 1 deletion worker/sri.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"__version": 198, "updater.py": "PHFUVXcoxBFiW2hTqFN5q5WdAw2pK8uzFC7hyMUC3cLY5bGZPhL8TBGThtqDmcXd", "worker.py": "lUzZJZvuxA2r37cUI262tUsAqYhhsqadLLr1yM3Rrhq4VHNnUnDBkVhiJYQeiHwH", "games.py": "QE8sBBJrT7bsFrfxW7wj4CeG1okuLQy3WIvgge6gG7Z07GBayucPhW+givsaA7bN"}
{"__version": 198, "updater.py": "PHFUVXcoxBFiW2hTqFN5q5WdAw2pK8uzFC7hyMUC3cLY5bGZPhL8TBGThtqDmcXd", "worker.py": "CwxU69Ga5Uu/EG3JlJbNi9UjB/fd4fYpju7z8tr1DwqR7ypZYKCVkcyNAuxAIUWT", "games.py": "QE8sBBJrT7bsFrfxW7wj4CeG1okuLQy3WIvgge6gG7Z07GBayucPhW+givsaA7bN"}
8 changes: 3 additions & 5 deletions worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ def _get_help_string(self, action):
"-t",
"--min_threads",
dest="min_threads",
metavar="MIN_THREADS",
default=config.getint("parameters", "min_threads"),
type=int,
help="do not accept tasks with fewer threads than MIN_THREADS",
Expand Down Expand Up @@ -1331,12 +1332,9 @@ def fetch_and_handle_task(worker_info, password, remote, lock_file, current_stat
except WorkerException:
return False # error message has already been printed

if "error" in req:
return False # likewise

# No tasks ready for us yet, just wait...
if "task_waiting" in req:
print("No tasks available at this time, waiting...")
if "error_msg" in req:
print(f"Request task failure:{req['error_msg']}\nWaiting...")
return False

run, task_id = req["run"], req["task_id"]
Expand Down