-
Notifications
You must be signed in to change notification settings - Fork 40
Refactor branch defined triggers #7696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ | |
| from .models import ( | ||
| ComputedAttrJinja2GraphQL, | ||
| ComputedAttrJinja2GraphQLResponse, | ||
| ComputedAttrJinja2TriggerDefinition, | ||
| PythonTransformTarget, | ||
| ) | ||
|
|
||
|
|
@@ -312,21 +313,46 @@ async def computed_attribute_setup_jinja2( | |
| ) # type: ignore[misc] | ||
| # Configure all ComputedAttrJinja2Trigger in Prefect | ||
|
|
||
| all_triggers = report.triggers_with_type(trigger_type=ComputedAttrJinja2TriggerDefinition) | ||
|
|
||
| # Since we can have multiple trigger per NodeKind | ||
| # we need to extract the list of unique node that should be processed | ||
| unique_nodes: set[tuple[str, str, str]] = { | ||
| (trigger.branch, trigger.computed_attribute.kind, trigger.computed_attribute.attribute.name) # type: ignore[attr-defined] | ||
| for trigger in report.updated + report.created | ||
| } | ||
| for branch, kind, attribute_name in unique_nodes: | ||
| if event_name != BranchDeletedEvent.event_name and branch == branch_name: | ||
| # we need to extract the list of unique node that should be processed, this is done by filtering the triggers that targets_self | ||
| modified_triggers = [ | ||
| trigger | ||
| for trigger in report.modified_triggers_with_type(trigger_type=ComputedAttrJinja2TriggerDefinition) | ||
| if trigger.targets_self | ||
| ] | ||
|
|
||
| for modified_trigger in modified_triggers: | ||
| if event_name != BranchDeletedEvent.event_name and modified_trigger.branch == branch_name: | ||
| if branch_name != registry.default_branch: | ||
| default_branch_triggers = [ | ||
| trigger | ||
| for trigger in all_triggers | ||
| if trigger.branch == registry.default_branch | ||
| and trigger.targets_self | ||
| and trigger.computed_attribute.kind == modified_trigger.computed_attribute.kind | ||
| and trigger.computed_attribute.attribute.name | ||
| == modified_trigger.computed_attribute.attribute.name | ||
| ] | ||
| if ( | ||
| default_branch_triggers | ||
| and len(default_branch_triggers) == 1 | ||
| and default_branch_triggers[0].template_hash == modified_trigger.template_hash | ||
| ): | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This pattern is the same for display_labels and HFIDs, though the fields are a bit different. Instead of always triggering tasks if the trigger has been modified (created or updated, not refreshed) we first check if we're on a non default branch and if so try to find the same trigger on the default branch and if we find it and the hash of the trigger is the same we don't schedule a recalculation to happen. |
||
| log.debug( | ||
| f"Skipping computed attribute updates for {modified_trigger.computed_attribute.kind}." | ||
| f"{modified_trigger.computed_attribute.attribute.name} [{branch_name}], schema is identical to default branch" | ||
| ) | ||
| continue | ||
|
|
||
| await get_workflow().submit_workflow( | ||
| workflow=TRIGGER_UPDATE_JINJA_COMPUTED_ATTRIBUTES, | ||
| context=context, | ||
| parameters={ | ||
| "branch_name": branch, | ||
| "computed_attribute_name": attribute_name, | ||
| "computed_attribute_kind": kind, | ||
| "branch_name": modified_trigger.branch, | ||
| "computed_attribute_name": modified_trigger.computed_attribute.attribute.name, | ||
| "computed_attribute_kind": modified_trigger.computed_attribute.kind, | ||
| }, | ||
| ) | ||
|
Comment on lines
+316
to
357
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainBranch-aware Jinja2 recompute filtering looks right; verify callers and consider indexing default triggers. The new flow that:
matches the described behavior change and should prevent unnecessary recomputes on schema‑identical branches. Two follow‑ups:
🏁 Script executed: #!/bin/bash
# 1) Check how computed_attribute_setup_jinja2 is invoked.
rg -n "computed_attribute_setup_jinja2\(" -C3
# 2) If needed, identify number of COMPUTED_ATTR_JINJA2 triggers per deployment.
rg -n "ComputedAttrJinja2TriggerDefinition" -C3Length of output: 5699 🏁 Script executed: #!/bin/bash
# Find all callers of computed_attribute_setup_jinja2
rg -n "computed_attribute_setup_jinja2" -B5 -A2 | grep -v "^--$" | head -100Length of output: 1247 🏁 Script executed: #!/bin/bash
# Look for event handlers or workflow triggers that might invoke this function
rg -n "TRIGGER_SETUP_JINJA2_COMPUTED_ATTRIBUTES\|computed.attribute.setup.jinja2" -B3 -A3Length of output: 0 🏁 Script executed: #!/bin/bash
# Check the full function body to understand the branch_name handling
sed -n '299,400p' backend/infrahub/computed_attribute/tasks.pyLength of output: 5019 🏁 Script executed: #!/bin/bash
# Look for how computed_attribute_setup_jinja2 is registered as a flow/task
rg -n "@flow\|@task" backend/infrahub/computed_attribute/tasks.py -A5 | head -50Length of output: 0 🏁 Script executed: #!/bin/bash
# Search for event subscriptions or listeners that trigger this function
rg -n "BranchCreatedEvent\|BranchDeletionStarted\|SchemaBranchUpdate" -B2 -A2 -g '*.py' | head -80Length of output: 0 🏁 Script executed: #!/bin/bash
# Look for setup_triggers calls to understand initial trigger setup
rg -n "setup_triggers" -B3 -A3 | head -100Length of output: 7665 🏁 Script executed: #!/bin/bash
# Check models and understand TriggerBranchDefinition structure
ast-grep --pattern 'class TriggerBranchDefinition {
$$$
}' Length of output: 0 🏁 Script executed: #!/bin/bash
# Verify how branch attribute is set on triggers
rg -n "class ComputedAttrJinja2TriggerDefinition" -A20Length of output: 1857 🏁 Script executed: #!/bin/bash
# Check if branch_name is set at function entry point
sed -n '299,340p' backend/infrahub/computed_attribute/tasks.py | grep -E "branch_name|=|report"Length of output: 1328 🏁 Script executed: #!/bin/bash
# Find where computed_attribute_setup_jinja2 might be invoked from event handlers
rg -n "\.submit_workflow\(.*computed.*attribute.*setup\|computed_attribute_setup_jinja2" -B5 -A5 | head -120Length of output: 0 🏁 Script executed: #!/bin/bash
# Look for orchestration/event handling that calls this
find . -name "*.py" -type f -exec grep -l "computed_attribute_setup_jinja2\|TRIGGER_SETUP_JINJA2" {} \; 2>/dev/null | head -20Length of output: 88 🏁 Script executed: #!/bin/bash
# Check the event payload structure - what branch info is available
rg -n "class.*Event.*:" backend/infrahub/ -A10 | grep -i branch | head -40Length of output: 4023 🏁 Script executed: #!/bin/bash
# Search for where workflows are submitted/triggered
rg -n "submit_workflow\|\.submit_workflow" -B5 -A5 | grep -E "JINJA|computed" -B5 -A5 | head -80Length of output: 0 🏁 Script executed: #!/bin/bash
# Look for event handlers and automation trigger setup
rg -n "branch_created_event_handler\|branch.created\|BranchCreatedEvent" -B3 -A8 | head -120Length of output: 0 🏁 Script executed: #!/bin/bash
# Check task_manager event handlers
rg -n "class.*Event.*Handler\|handle_event" backend/infrahub/task_manager/ -B2 -A15 | head -100Length of output: 0 🏁 Script executed: #!/bin/bash
# Find workflow triggering patterns
rg -n "COMPUTED_ATTRIBUTE" backend/infrahub/ -B2 -A2 | head -80Length of output: 5142 🏁 Script executed: #!/bin/bash
# Check if there's a pattern of always setting branch_name before calling this workflow
rg -n "branch_name\s*=" backend/infrahub/computed_attribute/ -B3 -A3 | head -100Length of output: 7869 🏁 Script executed: #!/bin/bash
# Find where workflows are invoked in trigger definitions
rg -n "TRIGGER_COMPUTED_ATTRIBUTE\|COMPUTED_ATTRIBUTE_SETUP" backend/infrahub/computed_attribute/triggers.py -B5 -A10Length of output: 0 🏁 Script executed: #!/bin/bash
# Check trigger catalogue to see how these workflows are called
rg -n "ComputedAttrJinja2\|COMPUTED_ATTRIBUTE_SETUP_JINJA2" backend/infrahub/trigger/catalogue.py -B5 -A10Length of output: 0 🏁 Script executed: #!/bin/bash
# Look at the actual trigger definitions to see parameters passed
cat backend/infrahub/computed_attribute/triggers.py 2>/dev/null || echo "File not found"Length of output: 2303 🏁 Script executed: #!/bin/bash
# Search for where TRIGGER_COMPUTED_ATTRIBUTE_ALL_SCHEMA is defined and used
rg -n "TRIGGER_COMPUTED_ATTRIBUTE_ALL_SCHEMA" -B3 -A8 | head -100Length of output: 2986 🏁 Script executed: #!/bin/bash
# Find event handlers that trigger computed attribute setup
find backend/infrahub -name "*.py" -type f -exec grep -l "computed_attribute\|COMPUTED_ATTRIBUTE" {} \; | xargs grep -l "event" | head -10Length of output: 362 Add missing branch_name fallback to prevent silent trigger filtering failures. The function is invoked via event-based triggers with The sibling function branch_name = branch_name or registry.default_branchThe secondary concern about O(N²) recomputation of 🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
| from infrahub.events.utils import get_all_events | ||
| from infrahub.trigger.constants import NAME_SEPARATOR | ||
| from infrahub.trigger.models import TriggerType | ||
| from infrahub.trigger.setup import gather_all_automations | ||
|
|
||
| from .models import TelemetryPrefectData, TelemetryWorkPoolData | ||
|
|
||
|
|
@@ -53,7 +54,7 @@ async def count_events(event_name: str) -> int: | |
|
|
||
| @task(name="telemetry-gather-automations", task_run_name="Gather Automations", cache_policy=NONE) | ||
| async def gather_prefect_automations(client: PrefectClient) -> dict[str, Any]: | ||
| automations = await client.read_automations() | ||
| automations = await gather_all_automations(client=client) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These updates are to ensure that we can find more than the default return value for automations in the Prefect API (200) |
||
|
|
||
| data: dict[str, Any] = {} | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,8 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from datetime import timedelta | ||
| from enum import Enum | ||
| from typing import TYPE_CHECKING, Any | ||
| from enum import Enum, StrEnum | ||
| from typing import TYPE_CHECKING, Any, TypeVar | ||
|
|
||
| from prefect.events.actions import RunDeployment | ||
| from prefect.events.schemas.automations import Automation, Posture | ||
|
|
@@ -18,16 +18,78 @@ | |
| if TYPE_CHECKING: | ||
| from uuid import UUID | ||
|
|
||
| T = TypeVar("T", bound="TriggerDefinition") | ||
|
|
||
|
|
||
| class TriggerComparison(StrEnum): | ||
| MATCH = "match" # Expected trigger and actual trigger is identical | ||
| REFRESH = "refresh" # The branch parameters doesn't match, the hash does, refresh in Prefect but don't run triggers | ||
| UPDATE = "update" # Neither branch or other data points match, update in Prefect and run triggers | ||
|
|
||
| @property | ||
| def update_prefect(self) -> bool: | ||
| return self in {TriggerComparison.REFRESH, TriggerComparison.UPDATE} | ||
|
|
||
|
|
||
| class TriggerSetupReport(BaseModel): | ||
| created: list[TriggerDefinition] = Field(default_factory=list) | ||
| refreshed: list[TriggerDefinition] = Field(default_factory=list) | ||
| updated: list[TriggerDefinition] = Field(default_factory=list) | ||
| deleted: list[Automation] = Field(default_factory=list) | ||
| unchanged: list[TriggerDefinition] = Field(default_factory=list) | ||
|
|
||
| @property | ||
| def in_use_count(self) -> int: | ||
| return len(self.created + self.updated + self.unchanged) | ||
| return len(self.created + self.updated + self.unchanged + self.refreshed) | ||
|
|
||
| def add_with_comparison(self, trigger: TriggerDefinition, comparison: TriggerComparison) -> None: | ||
| match comparison: | ||
| case TriggerComparison.UPDATE: | ||
| self.updated.append(trigger) | ||
| case TriggerComparison.REFRESH: | ||
| self.refreshed.append(trigger) | ||
| case TriggerComparison.MATCH: | ||
| self.unchanged.append(trigger) | ||
|
|
||
| def _created_triggers_with_type(self, trigger_type: type[T]) -> list[T]: | ||
| return [trigger for trigger in self.created if isinstance(trigger, trigger_type)] | ||
|
|
||
| def _refreshed_triggers_with_type(self, trigger_type: type[T]) -> list[T]: | ||
| return [trigger for trigger in self.refreshed if isinstance(trigger, trigger_type)] | ||
|
|
||
| def _unchanged_triggers_with_type(self, trigger_type: type[T]) -> list[T]: | ||
| return [trigger for trigger in self.unchanged if isinstance(trigger, trigger_type)] | ||
|
|
||
| def _updated_triggers_with_type(self, trigger_type: type[T]) -> list[T]: | ||
| return [trigger for trigger in self.updated if isinstance(trigger, trigger_type)] | ||
|
|
||
| def triggers_with_type(self, trigger_type: type[T]) -> list[T]: | ||
| """Return all triggers that match the specified type. | ||
|
|
||
| Args: | ||
| trigger_type: A TriggerDefinition class or subclass to filter by | ||
|
|
||
| Returns: | ||
| List of triggers of the specified type from all categories | ||
| """ | ||
| created = self._created_triggers_with_type(trigger_type=trigger_type) | ||
| updated = self._updated_triggers_with_type(trigger_type=trigger_type) | ||
| refreshed = self._refreshed_triggers_with_type(trigger_type=trigger_type) | ||
| unchanged = self._unchanged_triggers_with_type(trigger_type=trigger_type) | ||
| return created + updated + refreshed + unchanged | ||
|
|
||
| def modified_triggers_with_type(self, trigger_type: type[T]) -> list[T]: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously we had some issues where the |
||
| """Return all created and updated triggers that match the specified type. | ||
|
|
||
| Args: | ||
| trigger_type: A TriggerDefinition class or subclass to filter by | ||
|
|
||
| Returns: | ||
| List of triggers of the specified type from both created and updated lists | ||
| """ | ||
| created = self._created_triggers_with_type(trigger_type=trigger_type) | ||
| updated = self._updated_triggers_with_type(trigger_type=trigger_type) | ||
| return created + updated | ||
|
|
||
|
|
||
| class TriggerType(str, Enum): | ||
|
|
@@ -41,6 +103,16 @@ class TriggerType(str, Enum): | |
| HUMAN_FRIENDLY_ID = "human_friendly_id" | ||
| # OBJECT = "object" | ||
|
|
||
| @property | ||
| def is_branch_specific(self) -> bool: | ||
| return self in { | ||
| TriggerType.COMPUTED_ATTR_JINJA2, | ||
| TriggerType.COMPUTED_ATTR_PYTHON, | ||
| TriggerType.COMPUTED_ATTR_PYTHON_QUERY, | ||
| TriggerType.DISPLAY_LABEL_JINJA2, | ||
| TriggerType.HUMAN_FRIENDLY_ID, | ||
| } | ||
|
|
||
|
|
||
| def _match_related_dict() -> dict: | ||
| # Make Mypy happy as match related is a dict[str, Any] | list[dict[str, Any]] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we created a set to exclude duplicates of computed attributes that spanned multiple node kinds, with this update where we instead check for triggers that target the actual kind we can remove duplicates in a way that instead keep the correct object type of the reports.