Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
  • Loading branch information
BeryJu committed Dec 11, 2024
1 parent b31b41e commit 2813634
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
4 changes: 4 additions & 0 deletions authentik/enterprise/providers/ssf/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,7 @@ class UserStreamSubject(models.Model):

def __str__(self) -> str:
return f"Stream subject {self.stream_id} to {self.user_id}"

class StreamEvent(models.Model):

uuid = models.UUIDField(default=uuid4, primary_key=True, editable=False)
12 changes: 4 additions & 8 deletions authentik/enterprise/providers/ssf/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
UserTypes,
)
from authentik.enterprise.providers.ssf.models import (
DeliveryMethods,
EventTypes,
SSFProvider,
Stream,
)
from authentik.enterprise.providers.ssf.tasks import send_ssf_event, ssf_push_request
from authentik.enterprise.providers.ssf.tasks import send_single_ssf_event, send_ssf_event
from authentik.events.middleware import audit_ignore
from authentik.events.utils import get_user

Expand Down Expand Up @@ -56,19 +55,16 @@ def ssf_providers_post_save(sender: type[Model], instance: SSFProvider, created:

@receiver(post_save, sender=Stream)
def ssf_stream_post_create(sender: type[Model], instance: Stream, created: bool, **_):
"""Send a verification event when a push stream is created"""
"""Send a verification event when a stream is created"""
if not created:
return
if instance.delivery_method != DeliveryMethods.RISC_PUSH:
return
ssf_push_request.delay(
send_single_ssf_event.delay(
str(instance.uuid),
instance.endpoint_url,
{
"jti": uuid4().hex,
# TODO: Figure out how to get iss
"iss": "https://ak.beryju.dev/.well-known/ssf-configuration/abm-ssf/8",
"aud": instance.aud[0],
"aud": instance.aud,
"iat": int(datetime.now().timestamp()),
"sub_id": {"format": "opaque", "id": str(instance.uuid)},
"events": {
Expand Down
22 changes: 14 additions & 8 deletions authentik/enterprise/providers/ssf/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,30 @@


@CELERY_APP.task(bind=True)
def send_ssf_event(event_type: EventTypes, subject):
def send_ssf_event(event_type: EventTypes, data: dict):
tasks = []
for stream in Stream.objects.filter(
delivery_method=DeliveryMethods.RISC_PUSH,
events_requested__in=[event_type],
):
tasks.append(ssf_push_request.si(str(stream.uuid), stream.endpoint_url, {}))
for stream in Stream.objects.filter(events_requested__in=[event_type]):
tasks.append(send_single_ssf_event.si(str(stream.uuid), data))
main_task = group(*tasks)
main_task()


@CELERY_APP.task(bind=True, autoretry=True, autoretry_for=(RequestException,), retry_backoff=True)
def ssf_push_request(self, stream_id: str, endpoint_url: str, data: dict):
def send_single_ssf_event(self, stream_id: str, data: dict):
stream = Stream.objects.filter(pk=stream_id).first()
if not stream:
return
if stream.delivery_method == DeliveryMethods.RISC_PUSH:
ssf_push_request.delay(stream_id, data)


@CELERY_APP.task(bind=True, autoretry=True, autoretry_for=(RequestException,), retry_backoff=True)
def ssf_push_request(self, stream_id: str, data: dict):
stream = Stream.objects.filter(pk=stream_id).first()
if not stream:
return
response = session.post(
endpoint_url,
stream.endpoint_url,
data=stream.encode(data),
headers={"Content-Type": "application/secevent+jwt", "Accept": "application/json"},
)
Expand Down

0 comments on commit 2813634

Please sign in to comment.