Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move org storage recalculation into background job #2138

Merged
merged 3 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
Expand Down Expand Up @@ -320,6 +321,51 @@ async def create_delete_org_job(
print(f"warning: delete org job could not be started: {exc}")
return None

async def create_recalculate_org_stats_job(
self,
org: Organization,
existing_job_id: Optional[str] = None,
) -> Optional[str]:
"""Create background job to recalculate org stats"""

try:
job_id = await self.crawl_manager.run_recalculate_org_stats_job(
oid=str(org.id),
backend_image=os.environ.get("BACKEND_IMAGE", ""),
pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
existing_job_id=existing_job_id,
)
if existing_job_id:
recalculate_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": recalculate_job.started,
"finished": recalculate_job.finished,
}
if recalculate_job.previousAttempts:
recalculate_job.previousAttempts.append(previous_attempt)
else:
recalculate_job.previousAttempts = [previous_attempt]
recalculate_job.started = dt_now()
recalculate_job.finished = None
recalculate_job.success = None
else:
recalculate_job = RecalculateOrgStatsJob(
id=job_id,
oid=org.id,
started=dt_now(),
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": recalculate_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: recalculate org stats job could not be started: {exc}")
return None

async def job_finished(
self,
job_id: str,
Expand Down Expand Up @@ -364,7 +410,9 @@ async def job_finished(

async def get_background_job(
self, job_id: str, oid: Optional[UUID] = None
) -> Union[CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob]:
) -> Union[
CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob, RecalculateOrgStatsJob
]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id}
if oid:
Expand All @@ -384,6 +432,9 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
if data["type"] == BgJobType.DELETE_REPLICA:
return DeleteReplicaJob.from_dict(data)

if data["type"] == BgJobType.RECALCULATE_ORG_STATS:
return RecalculateOrgStatsJob.from_dict(data)

return DeleteOrgJob.from_dict(data)

async def list_background_jobs(
Expand Down Expand Up @@ -518,6 +569,12 @@ async def retry_background_job(
existing_job_id=job_id,
)

if job.type == BgJobType.RECALCULATE_ORG_STATS:
await self.create_recalculate_org_stats_job(
org,
existing_job_id=job_id,
)

return {"success": True}

async def retry_failed_background_jobs(
Expand Down
40 changes: 38 additions & 2 deletions backend/btrixcloud/crawlmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,54 @@ async def run_delete_org_job(
backend_image: str,
pull_policy: str,
existing_job_id: Optional[str] = None,
):
) -> str:
"""run job to delete org and all of its data"""

if existing_job_id:
job_id = existing_job_id
else:
job_id = f"delete-org-{oid}-{secrets.token_hex(5)}"

return await self._run_bg_job_with_ops_classes(
oid, backend_image, pull_policy, job_id, job_type=BgJobType.DELETE_ORG.value
)

async def run_recalculate_org_stats_job(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should perhaps make a more generic run_bg_job here if most of the params are the same - but can be added in a later refactor as we add more jobs, so shouldn't hold off merge.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, I guess there already is a more generic method, maybe not needed at all, we'll see!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there's a generic private method already, the public methods here really just set the job id to what we want it to be.

self,
oid: str,
backend_image: str,
pull_policy: str,
existing_job_id: Optional[str] = None,
) -> str:
"""run job to delete org and all of its data"""
ikreymer marked this conversation as resolved.
Show resolved Hide resolved

if existing_job_id:
job_id = existing_job_id
else:
job_id = f"org-stats-{oid}-{secrets.token_hex(5)}"

return await self._run_bg_job_with_ops_classes(
oid,
backend_image,
pull_policy,
job_id,
job_type=BgJobType.RECALCULATE_ORG_STATS.value,
)

async def _run_bg_job_with_ops_classes(
self,
oid: str,
backend_image: str,
pull_policy: str,
job_id: str,
job_type: str,
) -> str:
"""run background job with access to ops classes"""

params = {
"id": job_id,
"oid": oid,
"job_type": BgJobType.DELETE_ORG.value,
"job_type": job_type,
"backend_image": backend_image,
"pull_policy": pull_policy,
}
Expand Down
28 changes: 19 additions & 9 deletions backend/btrixcloud/main_bg.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


# ============================================================================
# pylint: disable=too-many-function-args, duplicate-code, too-many-locals
# pylint: disable=too-many-function-args, duplicate-code, too-many-locals, too-many-return-statements
async def main():
"""run background job with access to ops classes"""

Expand All @@ -25,22 +25,32 @@ async def main():
"Sorry, the Browsertrix Backend must be run inside a Kubernetes environment.\
Kubernetes not detected (KUBERNETES_SERVICE_HOST is not set), Exiting"
)
sys.exit(1)
return 1

(org_ops, _, _, _, _, _, _, _, _, _, user_manager) = init_ops()

if not oid:
print("Org id missing, quitting")
return 1

org = await org_ops.get_org_by_id(UUID(oid))
if not org:
print("Org id invalid, quitting")
return 1

# Run job
if job_type == BgJobType.DELETE_ORG:
if not oid:
print("Org id missing, quitting")
return 1
org = await org_ops.get_org_by_id(UUID(oid))
if not org:
print("Org id invalid, quitting")
try:
await org_ops.delete_org_and_data(org, user_manager)
return 0
# pylint: disable=broad-exception-caught
except Exception:
traceback.print_exc()
return 1

if job_type == BgJobType.RECALCULATE_ORG_STATS:
try:
await org_ops.delete_org_and_data(org, user_manager)
await org_ops.recalculate_storage(org)
return 0
# pylint: disable=broad-exception-caught
except Exception:
Expand Down
23 changes: 22 additions & 1 deletion backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,7 @@ class BgJobType(str, Enum):
CREATE_REPLICA = "create-replica"
DELETE_REPLICA = "delete-replica"
DELETE_ORG = "delete-org"
RECALCULATE_ORG_STATS = "recalculate-org-stats"


# ============================================================================
Expand Down Expand Up @@ -2059,11 +2060,24 @@ class DeleteOrgJob(BackgroundJob):
type: Literal[BgJobType.DELETE_ORG] = BgJobType.DELETE_ORG


# ============================================================================
class RecalculateOrgStatsJob(BackgroundJob):
"""Model for tracking jobs to recalculate org stats"""

type: Literal[BgJobType.RECALCULATE_ORG_STATS] = BgJobType.RECALCULATE_ORG_STATS


# ============================================================================
# Union of all job types, for response model

AnyJob = RootModel[
Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob, DeleteOrgJob]
Union[
CreateReplicaJob,
DeleteReplicaJob,
BackgroundJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
]
]


Expand Down Expand Up @@ -2219,6 +2233,13 @@ class SuccessResponse(BaseModel):
success: bool


# ============================================================================
class SuccessResponseId(SuccessResponse):
"""Response for API endpoints that return success and a background job id"""

id: Optional[str] = None


# ============================================================================
class SuccessResponseStorageQuota(SuccessResponse):
"""Response for API endpoints that return success and storageQuotaReached"""
Expand Down
9 changes: 6 additions & 3 deletions backend/btrixcloud/orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
UpdatedResponse,
AddedResponse,
AddedResponseId,
SuccessResponse,
SuccessResponseId,
OrgInviteResponse,
OrgAcceptInviteResponse,
OrgDeleteInviteResponse,
Expand Down Expand Up @@ -1588,10 +1588,13 @@ async def update_crawling_defaults(
return {"updated": True}

@router.post(
"/recalculate-storage", tags=["organizations"], response_model=SuccessResponse
"/recalculate-storage",
tags=["organizations"],
response_model=SuccessResponseId,
)
async def recalculate_org_storage(org: Organization = Depends(org_owner_dep)):
return await ops.recalculate_storage(org)
job_id = await ops.background_job_ops.create_recalculate_org_stats_job(org)
return {"success": True, "id": job_id}

@router.post("/invite", tags=["invites"], response_model=OrgInviteResponse)
async def invite_user_to_org(
Expand Down
30 changes: 30 additions & 0 deletions backend/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,36 @@ def crawler_crawl_id(crawler_auth_headers, default_org_id):
time.sleep(5)


@pytest.fixture(scope="session")
def qa_crawl_id(crawler_auth_headers, default_org_id):
# Start crawl.
crawl_data = {
"runNow": True,
"name": "Crawler User Crawl for Testing QA",
"description": "crawler test crawl for qa",
"config": {"seeds": [{"url": "https://old.webrecorder.net/"}], "limit": 1},
"crawlerChannel": "test",
}
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/",
headers=crawler_auth_headers,
json=crawl_data,
)
data = r.json()

crawl_id = data["run_now_job"]
# Wait for it to complete and then return crawl ID
while True:
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json",
headers=crawler_auth_headers,
)
data = r.json()
if data["state"] in FINISHED_STATES:
return crawl_id
time.sleep(5)


@pytest.fixture(scope="session")
def wr_specs_crawl_id(crawler_auth_headers, default_org_id):
# Start crawl.
Expand Down
Loading
Loading