-
Notifications
You must be signed in to change notification settings - Fork 403
/
tasks.py
161 lines (135 loc) · 5.24 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import logging
import typing
from django.utils import timezone
from task_processor.decorators import register_task_handler
from task_processor.models import TaskPriority
from audit.models import AuditLog
from audit.related_object_type import RelatedObjectType
from edge_api.identities.types import IdentityChangeset
from environments.dynamodb import DynamoEnvironmentV2Wrapper
from environments.models import Environment, Webhook
from features.models import Feature, FeatureState
from users.models import FFAdminUser
from util.mappers import map_identity_changeset_to_identity_override_changeset
from webhooks.webhooks import WebhookEventType, call_environment_webhooks
logger = logging.getLogger(__name__)
@register_task_handler()
def call_environment_webhook_for_feature_state_change(
feature_id: int,
environment_api_key: str,
identity_id: typing.Union[id, str],
identity_identifier: str,
timestamp: str,
changed_by_user_id: int = None, # deprecated(use changed_by)
changed_by: str = None,
new_enabled_state: bool = None,
new_value: typing.Union[bool, int, str] = None,
previous_enabled_state: bool = None,
previous_value: typing.Union[bool, int, str] = None,
):
environment = Environment.objects.get(api_key=environment_api_key)
if not environment.webhooks.filter(enabled=True).exists():
logger.debug(
"No webhooks exist for environment %d. Not calling webhooks.",
environment.id,
)
return
feature = Feature.objects.get(id=feature_id)
if changed_by_user_id:
changed_by = FFAdminUser.objects.get(id=changed_by_user_id).email
data = {
"changed_by": changed_by,
"timestamp": timestamp,
"new_state": None,
}
if previous_enabled_state is not None:
data["previous_state"] = Webhook.generate_webhook_feature_state_data(
feature=feature,
environment=environment,
identity_id=identity_id,
identity_identifier=identity_identifier,
enabled=previous_enabled_state,
value=previous_value,
)
if new_enabled_state is not None:
data["new_state"] = Webhook.generate_webhook_feature_state_data(
feature=feature,
environment=environment,
identity_id=identity_id,
identity_identifier=identity_identifier,
enabled=new_enabled_state,
value=new_value,
)
event_type = (
WebhookEventType.FLAG_DELETED
if new_enabled_state is None
else WebhookEventType.FLAG_UPDATED
)
call_environment_webhooks(environment.id, data, event_type=event_type.value)
@register_task_handler(priority=TaskPriority.HIGH)
def sync_identity_document_features(identity_uuid: str):
from .models import EdgeIdentity
identity = EdgeIdentity.from_identity_document(
EdgeIdentity.dynamo_wrapper.get_item_from_uuid(identity_uuid)
)
valid_feature_names = set(
FeatureState.objects.filter(
environment__api_key=identity.environment_api_key
).values_list("feature__name", flat=True)
)
identity.synchronise_features(valid_feature_names)
identity.save()
@register_task_handler()
def generate_audit_log_records(
environment_api_key: str,
identifier: str,
identity_uuid: str,
changes: IdentityChangeset,
user_id: int | None = None,
master_api_key_id: int | None = None,
) -> None:
audit_records = []
feature_override_changes = changes["feature_overrides"]
if not feature_override_changes:
return
environment = Environment.objects.select_related(
"project", "project__organisation"
).get(api_key=environment_api_key)
for feature_name, change_details in feature_override_changes.items():
action = {"+": "created", "-": "deleted", "~": "updated"}.get(
change_details["change_type"]
)
log = f"Feature override {action} for feature '{feature_name}' and identity '{identifier}'"
audit_records.append(
AuditLog(
project=environment.project,
environment=environment,
log=log,
author_id=user_id,
related_object_type=RelatedObjectType.EDGE_IDENTITY.name,
related_object_uuid=identity_uuid,
master_api_key_id=master_api_key_id,
created_date=timezone.now(),
)
)
AuditLog.objects.bulk_create(audit_records)
@register_task_handler()
def update_flagsmith_environments_v2_identity_overrides(
environment_api_key: str,
identity_uuid: str,
identifier: str,
changes: IdentityChangeset,
) -> None:
feature_override_changes = changes["feature_overrides"]
if not feature_override_changes:
return
environment = Environment.objects.get(api_key=environment_api_key)
dynamodb_wrapper_v2 = DynamoEnvironmentV2Wrapper()
identity_override_changeset = map_identity_changeset_to_identity_override_changeset(
identity_changeset=changes,
identity_uuid=identity_uuid,
environment_api_key=environment_api_key,
environment_id=environment.id,
identifier=identifier,
)
dynamodb_wrapper_v2.update_identity_overrides(identity_override_changeset)