Skip to content

Commit

Permalink
[Framework] Improvements to delete netities on real time (#571)
Browse files Browse the repository at this point in the history
# Description

What - Improvements to delete entities on real time, denying inefficient
cases
Why - previous changes might resulted a case of trying to calculate full
jq mapping of entities that wouldn't being registered eventually, and
also performing lots of api requests to port api.
How - using multiple rules search api when checking entities to delete
and calculating only relevant for deletion jq fields

## Type of change

Please leave one option from the following and delete the rest:

- [X] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.
  • Loading branch information
omby8888 authored Apr 24, 2024
1 parent 746a3ec commit 5e19fd0
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 85 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.5.14 (2024-04-24)

### Improvements

- Implemented real-time entity deletion exclusively for instances that haven't matched any selectors.
- Change the JQ calculation to process only identifier and blueprint for raw entities not selected during real-time events to only get the required data for the delete.

## 0.5.13 (2024-04-17)

### Features
Expand Down
44 changes: 25 additions & 19 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,27 +173,33 @@ async def search_entities(
handle_status_code(response)
return [Entity.parse_obj(result) for result in response.json()["entities"]]

async def does_integration_has_ownership_over_entity(
self, entity: Entity, user_agent_type: UserAgentType
) -> bool:
logger.info(f"Validating ownership on entity {entity.identifier}")
found_entities: list[Entity] = await self.search_entities(
async def search_batch_entities(
self, user_agent_type: UserAgentType, entities_to_search: list[Entity]
) -> list[Entity]:
search_rules = []
for entity in entities_to_search:
search_rules.append(
{
"combinator": "and",
"rules": [
{
"property": "$identifier",
"operator": "=",
"value": entity.identifier,
},
{
"property": "$blueprint",
"operator": "=",
"value": entity.blueprint,
},
],
}
)

return await self.search_entities(
user_agent_type,
{
"combinator": "and",
"rules": [
{
"property": "$identifier",
"operator": "contains",
"value": entity.identifier,
},
{
"property": "$blueprint",
"operator": "contains",
"value": entity.blueprint,
},
],
"rules": [{"combinator": "or", "rules": search_rules}],
},
)

return len(found_entities) > 0
6 changes: 3 additions & 3 deletions port_ocean/core/handlers/entity_processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
from port_ocean.core.models import Entity
from port_ocean.core.ocean_types import (
RawEntity,
RAW_ITEM,
EntitySelectorDiff,
)

Expand Down Expand Up @@ -38,15 +38,15 @@ class BaseEntityProcessor(BaseHandler):
async def _parse_items(
self,
mapping: ResourceConfig,
raw_data: list[RawEntity],
raw_data: list[RAW_ITEM],
parse_all: bool = False,
) -> EntitySelectorDiff:
pass

async def parse_items(
self,
mapping: ResourceConfig,
raw_data: list[RawEntity],
raw_data: list[RAW_ITEM],
parse_all: bool = False,
) -> EntitySelectorDiff:
"""Public method to parse raw entity data and map it to an EntityDiff.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
from port_ocean.core.models import Entity
from port_ocean.core.ocean_types import (
RawEntity,
RAW_ITEM,
EntitySelectorDiff,
)
from port_ocean.exceptions.core import EntityProcessorException
Expand Down Expand Up @@ -122,7 +122,7 @@ async def _calculate_entity(
async def _parse_items(
self,
mapping: ResourceConfig,
raw_results: list[RawEntity],
raw_results: list[RAW_ITEM],
parse_all: bool = False,
) -> EntitySelectorDiff:
raw_entity_mappings: dict[str, Any] = mapping.port.entity.mappings.dict(
Expand Down Expand Up @@ -153,4 +153,4 @@ async def _parse_items(
else:
failed_entities.append(parsed_entity)

return {"passed": passed_entities, "failed": failed_entities}
return EntitySelectorDiff(passed=passed_entities, failed=failed_entities)
102 changes: 58 additions & 44 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
RAW_RESULT,
RESYNC_RESULT,
RawEntityDiff,
EntityDiff,
ASYNC_GENERATOR_RESYNC_TYPE,
RawEntity,
RAW_ITEM,
EntitySelectorDiff,
)
from port_ocean.core.utils import zip_and_sum
Expand Down Expand Up @@ -123,7 +122,7 @@ async def _execute_resync_tasks(

async def _calculate_raw(
self,
raw_diff: list[tuple[ResourceConfig, list[RawEntity]]],
raw_diff: list[tuple[ResourceConfig, list[RAW_ITEM]]],
parse_all: bool = False,
) -> list[EntitySelectorDiff]:
return await asyncio.gather(
Expand All @@ -139,40 +138,23 @@ async def _register_resource_raw(
results: list[dict[Any, Any]],
user_agent_type: UserAgentType,
parse_all: bool = False,
) -> list[Entity]:
) -> EntitySelectorDiff:
objects_diff = await self._calculate_raw([(resource, results)], parse_all)
await self.entities_state_applier.upsert(
objects_diff[0].passed, user_agent_type
)

entities_after: list[Entity] = objects_diff[0]["passed"]
await self.entities_state_applier.upsert(entities_after, user_agent_type)

# If an entity didn't pass the JQ selector, we want to delete it if it exists in Port
for entity_to_delete in objects_diff[0]["failed"]:
is_owner = (
await ocean.port_client.does_integration_has_ownership_over_entity(
entity_to_delete, user_agent_type
)
)
if not is_owner:
logger.info(
f"Skipping deletion of entity {entity_to_delete.identifier}, "
f"Couldn't find an entity that's related to the current integration."
)
continue
await self.entities_state_applier.delete(
objects_diff[0]["failed"], user_agent_type
)

return entities_after
return objects_diff[0]

async def _unregister_resource_raw(
self,
resource: ResourceConfig,
results: list[RawEntity],
results: list[RAW_ITEM],
user_agent_type: UserAgentType,
) -> list[Entity]:
objects_diff = await self._calculate_raw([(resource, results)])

entities_after: list[Entity] = objects_diff[0]["passed"]
entities_after: list[Entity] = objects_diff[0].passed
await self.entities_state_applier.delete(entities_after, user_agent_type)
logger.info("Finished unregistering change")
return entities_after
Expand All @@ -189,17 +171,21 @@ async def _register_in_batches(
else:
async_generators.append(result)

entities = await self._register_resource_raw(
resource_config, raw_results, user_agent_type
)
entities = (
await self._register_resource_raw(
resource_config, raw_results, user_agent_type
)
).passed

for generator in async_generators:
try:
async for items in generator:
entities.extend(
await self._register_resource_raw(
resource_config, items, user_agent_type
)
(
await self._register_resource_raw(
resource_config, items, user_agent_type
)
).passed
)
except* OceanAbortException as error:
errors.append(error)
Expand Down Expand Up @@ -233,13 +219,44 @@ async def register_raw(
resource for resource in config.resources if resource.kind == kind
]

return await asyncio.gather(
diffs: list[EntitySelectorDiff] = await asyncio.gather(
*(
self._register_resource_raw(resource, results, user_agent_type, True)
for resource in resource_mappings
)
)

registered_entities, entities_to_delete = zip_and_sum(
(entities_diff.passed, entities_diff.failed) for entities_diff in diffs
)

registered_entities_attributes = {
(entity.identifier, entity.blueprint) for entity in registered_entities
}

filtered_entities_to_delete: list[Entity] = (
await ocean.port_client.search_batch_entities(
user_agent_type,
[
entity
for entity in entities_to_delete
if (entity.identifier, entity.blueprint)
not in registered_entities_attributes
],
)
)

if filtered_entities_to_delete:
logger.info(
f"Deleting {len(filtered_entities_to_delete)} entities that didn't pass any of the selectors"
)

await self.entities_state_applier.delete(
filtered_entities_to_delete, user_agent_type
)

return registered_entities

async def unregister_raw(
self,
kind: str,
Expand Down Expand Up @@ -306,16 +323,13 @@ async def update_raw_diff(
[(mapping, raw_desired_state["after"]) for mapping in resource_mappings]
)

entities_before_flatten = [
item
for sublist in [d["passed"] for d in entities_before]
for item in sublist
]
entities_after_flatten = [
item
for sublist in [d["passed"] for d in entities_after]
for item in sublist
]
entities_before_flatten: list[Entity] = sum(
(entities_diff.passed for entities_diff in entities_before), []
)

entities_after_flatten: list[Entity] = sum(
(entities_diff.passed for entities_diff in entities_after), []
)

await self.entities_state_applier.apply_diff(
{"before": entities_before_flatten, "after": entities_after_flatten},
Expand Down
27 changes: 12 additions & 15 deletions port_ocean/core/ocean_types.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,33 @@
from typing import TypedDict, Any, AsyncIterator, Callable, Awaitable
from typing import TypedDict, Any, AsyncIterator, Callable, Awaitable, NamedTuple

from port_ocean.core.models import Entity


RawEntity = dict[Any, Any]
RAW_ITEM = dict[Any, Any]
RAW_RESULT = list[RAW_ITEM]
ASYNC_GENERATOR_RESYNC_TYPE = AsyncIterator[RAW_RESULT]
RESYNC_RESULT = list[RAW_ITEM | ASYNC_GENERATOR_RESYNC_TYPE]

LISTENER_RESULT = Awaitable[RAW_RESULT] | ASYNC_GENERATOR_RESYNC_TYPE
RESYNC_EVENT_LISTENER = Callable[[str], LISTENER_RESULT]
START_EVENT_LISTENER = Callable[[], Awaitable[None]]


class RawEntityDiff(TypedDict):
before: list[RawEntity]
after: list[RawEntity]
before: list[RAW_ITEM]
after: list[RAW_ITEM]


class EntityDiff(TypedDict):
before: list[Entity]
after: list[Entity]


class EntitySelectorDiff(TypedDict):
class EntitySelectorDiff(NamedTuple):
passed: list[Entity]
failed: list[Entity]


RAW_ITEM = dict[Any, Any]
RAW_RESULT = list[RAW_ITEM]
ASYNC_GENERATOR_RESYNC_TYPE = AsyncIterator[RAW_RESULT]
RESYNC_RESULT = list[RAW_ITEM | ASYNC_GENERATOR_RESYNC_TYPE]

LISTENER_RESULT = Awaitable[RAW_RESULT] | ASYNC_GENERATOR_RESYNC_TYPE
RESYNC_EVENT_LISTENER = Callable[[str], LISTENER_RESULT]
START_EVENT_LISTENER = Callable[[], Awaitable[None]]


class IntegrationEventsCallbacks(TypedDict):
start: list[START_EVENT_LISTENER]
resync: dict[str | None, list[RESYNC_EVENT_LISTENER]]
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.5.13"
version = "0.5.14"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit 5e19fd0

Please sign in to comment.