Skip to content

Commit

Permalink
Limit 1 user to 1 active task at a time (#7106)
Browse files Browse the repository at this point in the history
The PR contains a REST API test that checks that a single user cannot
clog the import queue with his tasks.
  • Loading branch information
Marishka17 committed Nov 22, 2023
1 parent dcaf115 commit fe21844
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 22 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ jobs:
COVERAGE_PROCESS_START: ".coveragerc"
run: |
pytest tests/python/ --cov --cov-report=json
for COVERAGE_FILE in `find -name "coverage*.json" -type f -printf "%f\n"`; do mv ${COVERAGE_FILE} "${COVERAGE_FILE%%.*}_0.json"; done
ONE_RUNNING_JOB_IN_QUEUE_PER_USER="true" pytest tests/python/rest_api/test_queues.py --cov --cov-report=json
- name: Uploading code coverage results as an artifact
uses: actions/upload-artifact@v3.1.1
Expand Down
18 changes: 11 additions & 7 deletions tests/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@ which are used by containers for the testing system.

**Running tests**

Run all REST API tests:
- Run all REST API tests:

```
pytest ./tests/python
```
```shell
pytest ./tests/python
```

This command will automatically start all necessary docker containers.
This command will automatically start all necessary docker containers.
See the [contributing guide](../../site/content/en/docs/contributing/running-tests.md)
to get more information about tests running.

See the [contributing guide](../../site/content/en/docs/contributing/running-tests.md)
to get more information about tests running.
- Run tests to check the functionality of limiting active jobs in a queue per user:

```shell
ONE_RUNNING_JOB_IN_QUEUE_PER_USER="true" pytest tests/python/rest_api/test_queues.py
```
## How to upgrade testing assets?

When you have a new use case which cannot be expressed using objects already
Expand Down
96 changes: 96 additions & 0 deletions tests/python/rest_api/test_queues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright (C) 2023 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT


import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from http import HTTPStatus

import pytest

import shared.utils.s3 as s3
from shared.utils.config import make_api_client
from shared.utils.helpers import generate_image_file

from .utils import create_task


@pytest.mark.usefixtures("restore_db_per_function")
@pytest.mark.usefixtures("restore_cvat_data")
@pytest.mark.usefixtures("restore_redis_db_per_function")
class TestRQQueueWorking:
_USER_1 = "admin1"
_USER_2 = "admin2"

@pytest.mark.with_external_services
@pytest.mark.timeout(60)
@pytest.mark.skipif(
os.getenv("ONE_RUNNING_JOB_IN_QUEUE_PER_USER", "false").lower() not in {"true", "yes", "1"},
reason="The server is not configured to enable limit 1 user 1 task at a time",
)
@pytest.mark.parametrize("cloud_storage_id", [2])
def test_user_cannot_clog_import_queue_with_his_tasks(
self, cloud_storage_id: int, cloud_storages, request
):
def _create_task(idx: int, username: str) -> int:
task_spec = {
"name": f"Test task {idx}",
"labels": [
{
"name": "car",
}
],
}

task_data = {
"image_quality": 90,
"server_files": ["dataset/"],
"cloud_storage_id": cloud_storage_id,
"use_cache": False,
}

task_id, _ = create_task(username, task_spec, task_data)
return task_id

s3_client = s3.make_client()
cs_name = cloud_storages[cloud_storage_id]["resource"]
dataset_size = 100

img_content = generate_image_file(size=(1920, 1080)).getvalue()

for i in range(dataset_size):
filename = f"dataset/image_{i}.jpeg"
s3_client.create_file(bucket=cs_name, filename=filename, data=img_content)
request.addfinalizer(
partial(
s3_client.remove_file,
bucket=cs_name,
filename=filename,
)
)

number_of_tasks = 4
users = [self._USER_1] * (number_of_tasks - 1)
users.append(self._USER_2)

futures, task_ids = [], []

with ThreadPoolExecutor(max_workers=number_of_tasks) as executor:
for idx, user in enumerate(users):
futures.append(executor.submit(_create_task, idx, user))

for future in futures:
task_ids.append(future.result())

tasks = []

for idx, task_id in enumerate(task_ids):
with make_api_client(users[idx]) as api_client:
(task, response) = api_client.tasks_api.retrieve(task_id)
assert response.status == HTTPStatus.OK
tasks.append(task)

sorted_tasks = sorted(tasks, key=lambda x: x.updated_date)
assert self._USER_2 in [t.owner.username for t in sorted_tasks[:2]]
29 changes: 14 additions & 15 deletions tests/python/shared/fixtures/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,19 +287,22 @@ def delete_compose_files(container_name_files):
filename.unlink(missing_ok=True)


def wait_for_services(num_secs=300):
def wait_for_services(num_secs: int = 300) -> None:
for i in range(num_secs):
logger.debug(f"waiting for the server to load ... ({i})")
response = requests.get(get_server_url("api/server/health/", format="json"))
if response.status_code == HTTPStatus.OK:
logger.debug("the server has finished loading!")
return
else:
try:
statuses = response.json()
logger.debug(f"server status: \n{statuses}")
except Exception as e:
logger.debug(f"an error occurred during the server status checking: {e}")

try:
statuses = response.json()
logger.debug(f"server status: \n{statuses}")

if response.status_code == HTTPStatus.OK:
logger.debug("the server has finished loading!")
return

except Exception as e:
logger.debug(f"an error occurred during the server status checking: {e}")

sleep(1)

raise Exception(
Expand Down Expand Up @@ -432,11 +435,7 @@ def local_start(
stop_services(dc_files, cvat_root_dir)
pytest.exit("All testing containers are stopped", returncode=0)

if (
not any(set(running_containers()) & {f"{PREFIX}_cvat_server_1", f"{PREFIX}_cvat_db_1"})
or rebuild
):
start_services(dc_files, rebuild, cvat_root_dir)
start_services(dc_files, rebuild, cvat_root_dir)

docker_restore_data_volumes()
docker_cp(cvat_db_dir / "restore.sql", f"{PREFIX}_cvat_db_1:/tmp/restore.sql")
Expand Down

0 comments on commit fe21844

Please sign in to comment.