Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blueprints: gate apply and discovery behing pg lock #12385

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 72 additions & 47 deletions authentik/blueprints/v1/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from pathlib import Path
from sys import platform

import pglock
from dacite.core import from_dict
from django.db import DatabaseError, InternalError, ProgrammingError
from django.db import DatabaseError, InternalError, ProgrammingError, connection
from django.utils.text import slugify
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -152,15 +153,27 @@
@prefill_task
def blueprints_discovery(self: SystemTask, path: str | None = None):
"""Find blueprints and check if they need to be created in the database"""
count = 0
for blueprint in blueprints_find():
if path and blueprint.path != path:
continue
check_blueprint_v1_file(blueprint)
count += 1
self.set_status(
TaskStatus.SUCCESSFUL, _("Successfully imported {count} files.".format(count=count))
)
with pglock.advisory(
lock_id=f"goauthentik.io/{connection.schema_name}/blueprints/discovery",
timeout=0,
side_effect=pglock.Return,
) as lock_acquired:
if not lock_acquired:
LOGGER.debug("Not running blueprint discovery, lock was not acquired")
self.set_status(

Check warning on line 163 in authentik/blueprints/v1/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/blueprints/v1/tasks.py#L162-L163

Added lines #L162 - L163 were not covered by tests
TaskStatus.SUCCESSFUL,
_("Blueprint discovery lock could not be acquired. Skipping discovery."),
)
return

Check warning on line 167 in authentik/blueprints/v1/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/blueprints/v1/tasks.py#L167

Added line #L167 was not covered by tests
count = 0
for blueprint in blueprints_find():
if path and blueprint.path != path:
continue

Check warning on line 171 in authentik/blueprints/v1/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/blueprints/v1/tasks.py#L171

Added line #L171 was not covered by tests
check_blueprint_v1_file(blueprint)
count += 1
self.set_status(
TaskStatus.SUCCESSFUL, _("Successfully imported {count} files.".format(count=count))
)


def check_blueprint_v1_file(blueprint: BlueprintFile):
Expand Down Expand Up @@ -197,48 +210,60 @@
def apply_blueprint(self: SystemTask, instance_pk: str):
"""Apply single blueprint"""
self.save_on_success = False
instance: BlueprintInstance | None = None
try:
instance: BlueprintInstance = BlueprintInstance.objects.filter(pk=instance_pk).first()
if not instance or not instance.enabled:
with pglock.advisory(
lock_id=f"goauthentik.io/{connection.schema_name}/blueprints/apply/{instance_pk}",
timeout=0,
side_effect=pglock.Return,
) as lock_acquired:
if not lock_acquired:
LOGGER.debug("Not running blueprint discovery, lock was not acquired")
self.set_status(

Check warning on line 220 in authentik/blueprints/v1/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/blueprints/v1/tasks.py#L219-L220

Added lines #L219 - L220 were not covered by tests
TaskStatus.SUCCESSFUL,
_("Blueprint apply lock could not be acquired. Skipping apply."),
)
return
self.set_uid(slugify(instance.name))
blueprint_content = instance.retrieve()
file_hash = sha512(blueprint_content.encode()).hexdigest()
importer = Importer.from_string(blueprint_content, instance.context)
if importer.blueprint.metadata:
instance.metadata = asdict(importer.blueprint.metadata)
valid, logs = importer.validate()
if not valid:
instance.status = BlueprintInstanceStatus.ERROR
instance.save()
self.set_status(TaskStatus.ERROR, *logs)
return
with capture_logs() as logs:
applied = importer.apply()
if not applied:
instance: BlueprintInstance | None = None
try:
instance: BlueprintInstance = BlueprintInstance.objects.filter(pk=instance_pk).first()
if not instance or not instance.enabled:
return
self.set_uid(slugify(instance.name))
blueprint_content = instance.retrieve()
file_hash = sha512(blueprint_content.encode()).hexdigest()
importer = Importer.from_string(blueprint_content, instance.context)
if importer.blueprint.metadata:
instance.metadata = asdict(importer.blueprint.metadata)
valid, logs = importer.validate()
if not valid:
instance.status = BlueprintInstanceStatus.ERROR
instance.save()
self.set_status(TaskStatus.ERROR, *logs)
return
instance.status = BlueprintInstanceStatus.SUCCESSFUL
instance.last_applied_hash = file_hash
instance.last_applied = now()
self.set_status(TaskStatus.SUCCESSFUL)
except (
OSError,
DatabaseError,
ProgrammingError,
InternalError,
BlueprintRetrievalFailed,
EntryInvalidError,
) as exc:
if instance:
instance.status = BlueprintInstanceStatus.ERROR
self.set_error(exc)
finally:
if instance:
instance.save()
with capture_logs() as logs:
applied = importer.apply()
if not applied:
instance.status = BlueprintInstanceStatus.ERROR
instance.save()
self.set_status(TaskStatus.ERROR, *logs)
return

Check warning on line 248 in authentik/blueprints/v1/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/blueprints/v1/tasks.py#L245-L248

Added lines #L245 - L248 were not covered by tests
instance.status = BlueprintInstanceStatus.SUCCESSFUL
instance.last_applied_hash = file_hash
instance.last_applied = now()
self.set_status(TaskStatus.SUCCESSFUL)
except (
OSError,
DatabaseError,
ProgrammingError,
InternalError,
BlueprintRetrievalFailed,
EntryInvalidError,
) as exc:
if instance:
instance.status = BlueprintInstanceStatus.ERROR
self.set_error(exc)
finally:
if instance:
instance.save()


@CELERY_APP.task()
Expand Down
Loading