Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix worker #354

Merged
merged 3 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions services/worker/src/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,27 @@ def process_next_dataset_job() -> bool:
raise

success = False
retry = False
try:
logger.info(f"compute dataset '{dataset_name}'")
split_full_names = refresh_dataset_split_full_names(dataset_name=dataset_name, hf_token=HF_TOKEN)
success = True
for split_full_name in split_full_names:
add_split_job(
split_full_name["dataset_name"], split_full_name["config_name"], split_full_name["split_name"]
split_full_name["dataset_name"],
split_full_name["config_name"],
split_full_name["split_name"],
retries=0,
)
except StatusError as e:
if isinstance(e, Status500Error) and retries < MAX_JOB_RETRIES:
retry = True
# in any case: don't raise the StatusError, and go to finally
finally:
finish_dataset_job(job_id, success=success)
result = "success" if success else "error"
logger.debug(f"job finished with {result}: {job_id} for dataset: {dataset_name}")
if isinstance(e, Status500Error) and retries < MAX_JOB_RETRIES:
if retry:
add_dataset_job(dataset_name, retries=retries + 1)
logger.debug(f"job re-enqueued (retries: {retries}) for dataset: {dataset_name}")
return True
Expand All @@ -90,6 +98,7 @@ def process_next_split_job() -> bool:
raise

success = False
retry = False
try:
logger.info(f"compute split '{split_name}' from dataset '{dataset_name}' with config '{config_name}'")
refresh_split(
Expand All @@ -104,13 +113,17 @@ def process_next_split_job() -> bool:
)
success = True
except StatusError as e:
if isinstance(e, Status500Error) and retries < MAX_JOB_RETRIES:
retry = True
# in any case: don't raise the StatusError, and go to finally
finally:
finish_split_job(job_id, success=success)
result = "success" if success else "error"
logger.debug(
f"job finished with {result}: {job_id} for split '{split_name}' from dataset '{dataset_name}' with"
f" config '{config_name}'"
)
if isinstance(e, Status500Error) and retries < MAX_JOB_RETRIES:
if retry:
add_split_job(dataset_name, config_name, split_name, retries=retries + 1)
logger.debug(
f"job re-enqueued (retries: {retries}) for split '{split_name}' from dataset '{dataset_name}' with"
Expand Down Expand Up @@ -167,17 +180,16 @@ def sleep() -> None:

def loop() -> None:
logger = logging.getLogger("datasets_server.worker")
while True:
if has_resources():
try:
if process_next_job():
# loop immediately to try another job
# see https://github.com/huggingface/datasets-server/issues/265
continue
except BaseException as e:
logger.critical(f"quit due to an uncaught error while processing the job: {e}")
raise
sleep()
try:
while True:
if has_resources() and process_next_job():
# loop immediately to try another job
# see https://github.com/huggingface/datasets-server/issues/265
continue
sleep()
except BaseException as e:
logger.critical(f"quit due to an uncaught error while processing the job: {e}")
raise


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions services/worker/tests/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

DEFAULT_HF_TOKEN: str = ""
DEFAULT_MONGO_CACHE_DATABASE: str = "datasets_server_cache_test"
DEFAULT_MONGO_QUEUE_DATABASE: str = "datasets_server_queue_test"
DEFAULT_MONGO_URL: str = "mongodb://localhost:27017"
DEFAULT_ROWS_MAX_NUMBER: int = 5

HF_TOKEN = get_str_value(d=os.environ, key="HF_TOKEN", default=DEFAULT_HF_TOKEN)
MONGO_CACHE_DATABASE = get_str_value(d=os.environ, key="MONGO_CACHE_DATABASE", default=DEFAULT_MONGO_CACHE_DATABASE)
MONGO_QUEUE_DATABASE = get_str_value(d=os.environ, key="MONGO_QUEUE_DATABASE", default=DEFAULT_MONGO_QUEUE_DATABASE)
MONGO_URL = get_str_value(d=os.environ, key="MONGO_URL", default=DEFAULT_MONGO_URL)
ROWS_MAX_NUMBER = get_int_value(d=os.environ, key="ROWS_MAX_NUMBER", default=DEFAULT_ROWS_MAX_NUMBER)
40 changes: 40 additions & 0 deletions services/worker/tests/test_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest
from libcache.cache import clean_database as clean_cache_database
from libcache.cache import connect_to_cache
from libqueue.queue import add_dataset_job, add_split_job
from libqueue.queue import clean_database as clean_queue_database
from libqueue.queue import connect_to_queue

from worker.main import process_next_dataset_job, process_next_split_job

from ._utils import MONGO_CACHE_DATABASE, MONGO_QUEUE_DATABASE, MONGO_URL


@pytest.fixture(autouse=True, scope="module")
def safe_guard() -> None:
if "test" not in MONGO_CACHE_DATABASE:
raise ValueError("Test must be launched on a test mongo database")


@pytest.fixture(autouse=True, scope="module")
def client() -> None:
connect_to_cache(database=MONGO_CACHE_DATABASE, host=MONGO_URL)
connect_to_queue(database=MONGO_QUEUE_DATABASE, host=MONGO_URL)


@pytest.fixture(autouse=True)
def clean_mongo_database() -> None:
clean_cache_database()
clean_queue_database()


def test_process_next_dataset_job():
add_dataset_job("acronym_identification")
result = process_next_dataset_job()
assert result is True


def test_process_next_split_job():
add_split_job("acronym_identification", "default", "train")
result = process_next_split_job()
assert result is True