Skip to content

Commit

Permalink
Move org storage recalculation into background job (#2138)
Browse files Browse the repository at this point in the history
Fixes #2112 

- Moves org storage recalculation to background job, modify endpoint to
return job id as part of response
- Updates crawl + QA backend tests that broke due to
https://webrecorder.net website changes

---------

Co-authored-by: Ilya Kreymer <ikreymer@users.noreply.github.com>
  • Loading branch information
tw4l and ikreymer authored Nov 19, 2024
1 parent 333ab6d commit ba5ca3f
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 93 deletions.
59 changes: 58 additions & 1 deletion backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
Expand Down Expand Up @@ -320,6 +321,51 @@ async def create_delete_org_job(
print(f"warning: delete org job could not be started: {exc}")
return None

async def create_recalculate_org_stats_job(
self,
org: Organization,
existing_job_id: Optional[str] = None,
) -> Optional[str]:
"""Create background job to recalculate org stats"""

try:
job_id = await self.crawl_manager.run_recalculate_org_stats_job(
oid=str(org.id),
backend_image=os.environ.get("BACKEND_IMAGE", ""),
pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
existing_job_id=existing_job_id,
)
if existing_job_id:
recalculate_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": recalculate_job.started,
"finished": recalculate_job.finished,
}
if recalculate_job.previousAttempts:
recalculate_job.previousAttempts.append(previous_attempt)
else:
recalculate_job.previousAttempts = [previous_attempt]
recalculate_job.started = dt_now()
recalculate_job.finished = None
recalculate_job.success = None
else:
recalculate_job = RecalculateOrgStatsJob(
id=job_id,
oid=org.id,
started=dt_now(),
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": recalculate_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: recalculate org stats job could not be started: {exc}")
return None

async def job_finished(
self,
job_id: str,
Expand Down Expand Up @@ -364,7 +410,9 @@ async def job_finished(

async def get_background_job(
self, job_id: str, oid: Optional[UUID] = None
) -> Union[CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob]:
) -> Union[
CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob, RecalculateOrgStatsJob
]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id}
if oid:
Expand All @@ -384,6 +432,9 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
if data["type"] == BgJobType.DELETE_REPLICA:
return DeleteReplicaJob.from_dict(data)

if data["type"] == BgJobType.RECALCULATE_ORG_STATS:
return RecalculateOrgStatsJob.from_dict(data)

return DeleteOrgJob.from_dict(data)

async def list_background_jobs(
Expand Down Expand Up @@ -518,6 +569,12 @@ async def retry_background_job(
existing_job_id=job_id,
)

if job.type == BgJobType.RECALCULATE_ORG_STATS:
await self.create_recalculate_org_stats_job(
org,
existing_job_id=job_id,
)

return {"success": True}

async def retry_failed_background_jobs(
Expand Down
40 changes: 38 additions & 2 deletions backend/btrixcloud/crawlmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,54 @@ async def run_delete_org_job(
backend_image: str,
pull_policy: str,
existing_job_id: Optional[str] = None,
):
) -> str:
"""run job to delete org and all of its data"""

if existing_job_id:
job_id = existing_job_id
else:
job_id = f"delete-org-{oid}-{secrets.token_hex(5)}"

return await self._run_bg_job_with_ops_classes(
oid, backend_image, pull_policy, job_id, job_type=BgJobType.DELETE_ORG.value
)

async def run_recalculate_org_stats_job(
self,
oid: str,
backend_image: str,
pull_policy: str,
existing_job_id: Optional[str] = None,
) -> str:
"""run job to recalculate storage stats for the org"""

if existing_job_id:
job_id = existing_job_id
else:
job_id = f"org-stats-{oid}-{secrets.token_hex(5)}"

return await self._run_bg_job_with_ops_classes(
oid,
backend_image,
pull_policy,
job_id,
job_type=BgJobType.RECALCULATE_ORG_STATS.value,
)

async def _run_bg_job_with_ops_classes(
self,
oid: str,
backend_image: str,
pull_policy: str,
job_id: str,
job_type: str,
) -> str:
"""run background job with access to ops classes"""

params = {
"id": job_id,
"oid": oid,
"job_type": BgJobType.DELETE_ORG.value,
"job_type": job_type,
"backend_image": backend_image,
"pull_policy": pull_policy,
}
Expand Down
28 changes: 19 additions & 9 deletions backend/btrixcloud/main_bg.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


# ============================================================================
# pylint: disable=too-many-function-args, duplicate-code, too-many-locals
# pylint: disable=too-many-function-args, duplicate-code, too-many-locals, too-many-return-statements
async def main():
"""run background job with access to ops classes"""

Expand All @@ -25,22 +25,32 @@ async def main():
"Sorry, the Browsertrix Backend must be run inside a Kubernetes environment.\
Kubernetes not detected (KUBERNETES_SERVICE_HOST is not set), Exiting"
)
sys.exit(1)
return 1

(org_ops, _, _, _, _, _, _, _, _, _, user_manager) = init_ops()

if not oid:
print("Org id missing, quitting")
return 1

org = await org_ops.get_org_by_id(UUID(oid))
if not org:
print("Org id invalid, quitting")
return 1

# Run job
if job_type == BgJobType.DELETE_ORG:
if not oid:
print("Org id missing, quitting")
return 1
org = await org_ops.get_org_by_id(UUID(oid))
if not org:
print("Org id invalid, quitting")
try:
await org_ops.delete_org_and_data(org, user_manager)
return 0
# pylint: disable=broad-exception-caught
except Exception:
traceback.print_exc()
return 1

if job_type == BgJobType.RECALCULATE_ORG_STATS:
try:
await org_ops.delete_org_and_data(org, user_manager)
await org_ops.recalculate_storage(org)
return 0
# pylint: disable=broad-exception-caught
except Exception:
Expand Down
23 changes: 22 additions & 1 deletion backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,7 @@ class BgJobType(str, Enum):
CREATE_REPLICA = "create-replica"
DELETE_REPLICA = "delete-replica"
DELETE_ORG = "delete-org"
RECALCULATE_ORG_STATS = "recalculate-org-stats"


# ============================================================================
Expand Down Expand Up @@ -2059,11 +2060,24 @@ class DeleteOrgJob(BackgroundJob):
type: Literal[BgJobType.DELETE_ORG] = BgJobType.DELETE_ORG


# ============================================================================
class RecalculateOrgStatsJob(BackgroundJob):
"""Model for tracking jobs to recalculate org stats"""

type: Literal[BgJobType.RECALCULATE_ORG_STATS] = BgJobType.RECALCULATE_ORG_STATS


# ============================================================================
# Union of all job types, for response model

AnyJob = RootModel[
Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob, DeleteOrgJob]
Union[
CreateReplicaJob,
DeleteReplicaJob,
BackgroundJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
]
]


Expand Down Expand Up @@ -2219,6 +2233,13 @@ class SuccessResponse(BaseModel):
success: bool


# ============================================================================
class SuccessResponseId(SuccessResponse):
"""Response for API endpoints that return success and a background job id"""

id: Optional[str] = None


# ============================================================================
class SuccessResponseStorageQuota(SuccessResponse):
"""Response for API endpoints that return success and storageQuotaReached"""
Expand Down
9 changes: 6 additions & 3 deletions backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
UpdatedResponse,
AddedResponse,
AddedResponseId,
SuccessResponse,
SuccessResponseId,
OrgInviteResponse,
OrgAcceptInviteResponse,
OrgDeleteInviteResponse,
Expand Down Expand Up @@ -1588,10 +1588,13 @@ async def update_crawling_defaults(
return {"updated": True}

@router.post(
"/recalculate-storage", tags=["organizations"], response_model=SuccessResponse
"/recalculate-storage",
tags=["organizations"],
response_model=SuccessResponseId,
)
async def recalculate_org_storage(org: Organization = Depends(org_owner_dep)):
return await ops.recalculate_storage(org)
job_id = await ops.background_job_ops.create_recalculate_org_stats_job(org)
return {"success": True, "id": job_id}

@router.post("/invite", tags=["invites"], response_model=OrgInviteResponse)
async def invite_user_to_org(
Expand Down
30 changes: 30 additions & 0 deletions backend/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,36 @@ def crawler_crawl_id(crawler_auth_headers, default_org_id):
time.sleep(5)


@pytest.fixture(scope="session")
def qa_crawl_id(crawler_auth_headers, default_org_id):
# Start crawl.
crawl_data = {
"runNow": True,
"name": "Crawler User Crawl for Testing QA",
"description": "crawler test crawl for qa",
"config": {"seeds": [{"url": "https://old.webrecorder.net/"}], "limit": 1},
"crawlerChannel": "test",
}
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/",
headers=crawler_auth_headers,
json=crawl_data,
)
data = r.json()

crawl_id = data["run_now_job"]
# Wait for it to complete and then return crawl ID
while True:
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json",
headers=crawler_auth_headers,
)
data = r.json()
if data["state"] in FINISHED_STATES:
return crawl_id
time.sleep(5)


@pytest.fixture(scope="session")
def wr_specs_crawl_id(crawler_auth_headers, default_org_id):
# Start crawl.
Expand Down
Loading

0 comments on commit ba5ca3f

Please sign in to comment.