Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove all message interpretation from worker to server #5

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@

last_rundb = None

# This is duplicated in worker.py. Any changes here must be mirrored there.
class _RequestTaskErrors(enum.IntFlag):
# Just a bit of low-maintenance, easy-to-read bookkeeping
class _RequestTaskErrors(enum.Flag):
MachineLimit = enum.auto()
LowThreads = enum.auto()
HighThreads = enum.auto()
Expand All @@ -51,6 +51,20 @@ class _RequestTaskErrors(enum.IntFlag):
SkipSTC = enum.auto()
ServerSide = enum.auto()

# __private_names are not enum-ized
# These messages refer to worker command line options, and so need to be updated as the worker is.
__messages = {MachineLimit: "This user has reached the max machines limit.",
LowThreads: "An available run requires more than CONCURRENCY threads."
HighThreads: "An available run requires less than MIN_THREADS threads."
LowMemory: "An available run requires more than MAX_MEMORY memory."
NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run."
SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord."
ServerSide: "Server error or no active runs. Try again shortly."
}

def __str__(self):
return self.__messages[self]


def get_port():
params = {}
Expand Down Expand Up @@ -687,10 +701,11 @@ def request_task(self, worker_info):
self.task_semaphore.release()
else:
print("request_task too busy", flush=True)
return {"errors": int(_RequestTaskErrors.ServerSide)}
return {"error_msg": _msg_sep + str(_RequestTaskErrors.ServerSide)}

def sync_request_task(self, worker_info):
unique_key = worker_info["unique_key"]
_msg_sep = '\n - '

# We get the list of unfinished runs.
# To limit db access the list is cached for
Expand Down Expand Up @@ -768,13 +783,13 @@ def priority(run): # lower is better
if connections >= self.userdb.get_machine_limit(worker_info["username"]):
error = f'Request_task: Machine limit reached for user {worker_info["username"]}'
print(error, flush=True)
return {"errors": int(_RequestTaskErrors.MachineLimit)}
return {"error_msg": _msg_sep + str(_RequestTaskErrors.MachineLimit)}

# Now go through the sorted list of unfinished runs.
# We will add a task to the first run that is suitable.

run_found = False
errors = _RequestTaskErrors(0)
errors = _RequestTaskErrors(0) # Ignored when run_found

for run in self.task_runs:
if run["finished"]:
Expand Down Expand Up @@ -882,7 +897,7 @@ def priority(run): # lower is better
if not run_found:
if not errors: # No active tasks whatsoever, no fault of the worker
errors = _RequestTaskErrors.ServerSide
return {"errors": int(errors)}
return {"error_msg": _msg_sep + _msg_sep.join(str(e) for e in errors)}

# Now we create a new task for this run.
opening_offset = 0
Expand Down
34 changes: 2 additions & 32 deletions worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import atexit
import base64
import datetime
import enum
import getpass
import hashlib
import json
Expand Down Expand Up @@ -1286,30 +1285,6 @@ def verify_worker_version(remote, username, password):
return True


# Duplicated from server's rundb.py.
# Ideally we would have a common/ folder next to server/ and worker/, to prevent this...
class _RequestTaskErrors(enum.IntFlag):
MachineLimit = enum.auto()
LowThreads = enum.auto()
HighThreads = enum.auto()
LowMemory = enum.auto()
NoBinary = enum.auto()
SkipSTC = enum.auto()
ServerSide = enum.auto()

# __private_names are not enum-ized
__messages = {MachineLimit: "This user has reached the max machines limit.",
LowThreads: "An available run requires more than CONCURRENCY threads."
HighThreads: "An available run requires less than MIN_THREADS threads."
LowMemory: "An available run requires more than MAX_MEMORY memory."
NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run."
SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord."
ServerSide: "Server error or no active runs. Try again shortly."
}

def __str__(self):
return self.__messages[self]

def fetch_and_handle_task(worker_info, password, remote, lock_file, current_state):
# This function should normally not raise exceptions.
# Unusual conditions are handled by returning False.
Expand Down Expand Up @@ -1358,13 +1333,8 @@ def fetch_and_handle_task(worker_info, password, remote, lock_file, current_stat
return False # error message has already been printed

# No tasks ready for us yet, just wait...
if "errors" in req and req["errors"]:
errors = _RequestTaskErrors(req["errors"])
if _RequestTaskErrors.ServerSide in errors:
print(_RequestTaskErrors.ServerSide)
else:
print(f"No active tasks suitable for the worker at this time:\n - {'\n - '.join(str(e) for e in errors)}")
print("Waiting...")
if "error_msg" in req:
print(f"Request task failure:{req['error_msg']}\nWaiting...")
return False

run, task_id = req["run"], req["task_id"]
Expand Down