Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed state-manager/.coverage
Binary file not shown.
49 changes: 49 additions & 0 deletions state-manager/app/controller/manual_retry_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from pymongo.errors import DuplicateKeyError
from app.models.manual_retry import ManualRetryRequestModel, ManualRetryResponseModel
from beanie import PydanticObjectId
from app.singletons.logs_manager import LogsManager
from app.models.state_status_enum import StateStatusEnum
from fastapi import HTTPException, status
from app.models.db.state import State


logger = LogsManager().get_logger()

async def manual_retry_state(namespace_name: str, state_id: PydanticObjectId, body: ManualRetryRequestModel, x_exosphere_request_id: str):
try:
logger.info(f"Manual retry state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)

state = await State.find_one(State.id == state_id, State.namespace_name == namespace_name)
if not state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="State not found")

try:
retry_state = State(
node_name=state.node_name,
namespace_name=state.namespace_name,
identifier=state.identifier,
graph_name=state.graph_name,
run_id=state.run_id,
status=StateStatusEnum.CREATED,
inputs=state.inputs,
outputs={},
error=None,
parents=state.parents,
does_unites=state.does_unites,
fanout_id=body.fanout_id # this will ensure that multiple unwanted retries are not formed because of index in database
)
Comment on lines +21 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Blocker: fingerprint collision for does_unites states; set retry_count on the new retry state.
Without bumping retry_count, the retry state’s fingerprint can equal the original (unique index uniq_state_fingerprint_unites), causing DuplicateKeyError even on first retry. Also aligns with the unique (node, ns, graph, identifier, run_id, retry_count, fanout_id) index.

Apply:

             retry_state = State(
                 node_name=state.node_name,
                 namespace_name=state.namespace_name,
                 identifier=state.identifier,
                 graph_name=state.graph_name,
                 run_id=state.run_id,
                 status=StateStatusEnum.CREATED,
                 inputs=state.inputs,
                 outputs={},
                 error=None,
                 parents=state.parents,
                 does_unites=state.does_unites,
+                retry_count=(getattr(state, "retry_count", 0) + 1),
                 fanout_id=body.fanout_id # this will ensure that multiple unwanted retries are not formed because of index in database
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
retry_state = State(
node_name=state.node_name,
namespace_name=state.namespace_name,
identifier=state.identifier,
graph_name=state.graph_name,
run_id=state.run_id,
status=StateStatusEnum.CREATED,
inputs=state.inputs,
outputs={},
error=None,
parents=state.parents,
does_unites=state.does_unites,
fanout_id=body.fanout_id # this will ensure that multiple unwanted retries are not formed because of index in database
)
retry_state = State(
node_name=state.node_name,
namespace_name=state.namespace_name,
identifier=state.identifier,
graph_name=state.graph_name,
run_id=state.run_id,
status=StateStatusEnum.CREATED,
inputs=state.inputs,
outputs={},
error=None,
parents=state.parents,
does_unites=state.does_unites,
retry_count=(getattr(state, "retry_count", 0) + 1),
fanout_id=body.fanout_id # this will ensure that multiple unwanted retries are not formed because of index in database
)
🤖 Prompt for AI Agents
In state-manager/app/controller/manual_retry_state.py around lines 21 to 34, the
newly created retry State does not set retry_count which can produce a
fingerprint collision against the uniq_state_fingerprint_unites index; set
retry_count on the new State to (state.retry_count or 0) + 1 so the retry
fingerprint differs from the original (handle None by treating as 0), e.g.
assign retry_count before persisting the new State.

retry_state = await retry_state.insert()
logger.info(f"Retry state {retry_state.id} created for state {state_id}", x_exosphere_request_id=x_exosphere_request_id)

state.status = StateStatusEnum.RETRY_CREATED
await state.save()

return ManualRetryResponseModel(id=str(retry_state.id), status=retry_state.status)
Comment on lines +36 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Consider atomicity between insert and original state update.
If save() fails after insert(), you end up with a retry state but original not marked RETRY_CREATED. Prefer a Mongo transaction or a compensating update with retry/backoff.

🤖 Prompt for AI Agents
In state-manager/app/controller/manual_retry_state.py around lines 36 to 41, the
code inserts a retry_state then sets state.status and calls state.save()
separately, which can leave data inconsistent if save() fails; wrap both the
insert and the original state update in a single MongoDB transaction (start a
client session and use with_transaction to insert the retry document and update
the original state status to RETRY_CREATED within the same transaction) so both
succeed or fail together; if transactions are not available in the deployment,
implement a compensating flow: after insert() attempt the state.save() with an
exponential backoff retry loop, and if all retries fail, delete the created
retry_state (or mark it as failed) to restore consistency, and ensure all
operations log errors with context and surface failures to the caller.

except DuplicateKeyError:
logger.info(f"Duplicate retry state detected for state {state_id}. A retry state with the same unique key already exists.", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Duplicate retry state detected")


except Exception as _:
logger.error(f"Error manual retry state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise
Comment on lines +47 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don’t log HTTPException as server errors; use logger.exception for unexpected failures.
Current blanket except re-logs 404/409 as errors. Keep HTTPException untouched and add stacktrace for real failures.

Apply:

-    except Exception as _:
-        logger.error(f"Error manual retry state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
-        raise
+    except HTTPException:
+        # propagate expected HTTP errors without error severity logging
+        raise
+    except Exception:
+        logger.exception(
+            f"Error creating manual retry state {state_id} for namespace {namespace_name}",
+            x_exosphere_request_id=x_exosphere_request_id,
+        )
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as _:
logger.error(f"Error manual retry state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise
except HTTPException:
# propagate expected HTTP errors without error severity logging
raise
except Exception:
logger.exception(
f"Error creating manual retry state {state_id} for namespace {namespace_name}",
x_exosphere_request_id=x_exosphere_request_id,
)
raise
🤖 Prompt for AI Agents
In state-manager/app/controller/manual_retry_state.py around lines 47 to 49, the
current blanket except catches HTTPException and logs 404/409 as server errors;
change the handler to let HTTPException pass through unmodified (re-raise
immediately) and for all other exceptions use logger.exception to log the error
with stacktrace and context (include state_id, namespace_name,
x_exosphere_request_id), then re-raise the exception.

11 changes: 11 additions & 0 deletions state-manager/app/models/manual_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pydantic import BaseModel, Field
from .state_status_enum import StateStatusEnum


class ManualRetryRequestModel(BaseModel):
fanout_id: str = Field(..., description="Fanout ID of the state")


class ManualRetryResponseModel(BaseModel):
id: str = Field(..., description="ID of the state")
status: StateStatusEnum = Field(..., description="Status of the state")
22 changes: 22 additions & 0 deletions state-manager/app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
from .models.signal_models import ReEnqueueAfterRequestModel
from .controller.re_queue_after_signal import re_queue_after_signal

# manual_retry
from .models.manual_retry import ManualRetryRequestModel, ManualRetryResponseModel
from .controller.manual_retry_state import manual_retry_state


logger = LogsManager().get_logger()

Expand Down Expand Up @@ -176,6 +180,24 @@ async def re_enqueue_after_state_route(namespace_name: str, state_id: str, body:

return await re_queue_after_signal(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id)

@router.post(
"/state/{state_id}/manual-retry",
response_model=ManualRetryResponseModel,
status_code=status.HTTP_200_OK,
response_description="State manual retry successfully",
tags=["state"]
)
async def manual_retry_state_route(namespace_name: str, state_id: str, body: ManualRetryRequestModel, request: Request, api_key: str = Depends(check_api_key)):
x_exosphere_request_id = getattr(request.state, "x_exosphere_request_id", str(uuid4()))

if api_key:
logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
else:
logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key")

return await manual_retry_state(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id)


@router.put(
"/graph/{graph_name}",
Expand Down
17 changes: 16 additions & 1 deletion state-manager/tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tests/
│ ├── test_errored_state.py
│ ├── test_get_graph_template.py
│ ├── test_get_secrets.py
│ ├── test_manual_retry_state.py
│ ├── test_register_nodes.py
│ └── test_upsert_graph_template.py
└── README.md
Expand Down Expand Up @@ -80,7 +81,21 @@ The unit tests cover all controller functions in the state-manager:
- ✅ Complex schema handling
- ✅ Database error handling

### 8. `upsert_graph_template.py`
### 8. `manual_retry_state.py`
- ✅ Successful manual retry state creation
- ✅ State not found scenarios
- ✅ Duplicate retry state detection (DuplicateKeyError)
- ✅ Different fanout_id handling
- ✅ Complex inputs and multiple parents preservation
- ✅ Database errors during state lookup
- ✅ Database errors during state save
- ✅ Database errors during retry state insert
- ✅ Empty inputs and parents handling
- ✅ Namespace mismatch scenarios
- ✅ Field preservation and reset logic
- ✅ Logging verification

Comment on lines +84 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Fix markdownlint errors around the new section.
Add blank lines around the heading and the list to satisfy MD022/MD032.

Apply:

-### 8. `manual_retry_state.py`
+ 
+### 8. `manual_retry_state.py`
+
 - ✅ Successful manual retry state creation
 - ✅ State not found scenarios
 - ✅ Duplicate retry state detection (DuplicateKeyError)
 - ✅ Different fanout_id handling
 - ✅ Complex inputs and multiple parents preservation
 - ✅ Database errors during state lookup
 - ✅ Database errors during state save
 - ✅ Database errors during retry state insert
 - ✅ Empty inputs and parents handling
 - ✅ Namespace mismatch scenarios
 - ✅ Field preservation and reset logic
 - ✅ Logging verification
+
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
### 8. `manual_retry_state.py`
- ✅ Successful manual retry state creation
- ✅ State not found scenarios
- ✅ Duplicate retry state detection (DuplicateKeyError)
- ✅ Different fanout_id handling
- ✅ Complex inputs and multiple parents preservation
- ✅ Database errors during state lookup
- ✅ Database errors during state save
- ✅ Database errors during retry state insert
- ✅ Empty inputs and parents handling
- ✅ Namespace mismatch scenarios
- ✅ Field preservation and reset logic
- ✅ Logging verification
### 8. `manual_retry_state.py`
- ✅ Successful manual retry state creation
- ✅ State not found scenarios
- ✅ Duplicate retry state detection (DuplicateKeyError)
- ✅ Different fanout_id handling
- ✅ Complex inputs and multiple parents preservation
- ✅ Database errors during state lookup
- ✅ Database errors during state save
- ✅ Database errors during retry state insert
- ✅ Empty inputs and parents handling
- ✅ Namespace mismatch scenarios
- ✅ Field preservation and reset logic
- ✅ Logging verification
🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

84-84: Headings should be surrounded by blank lines
Expected: 1; Actual: 0; Below

(MD022, blanks-around-headings)


85-85: Lists should be surrounded by blank lines

(MD032, blanks-around-lists)

🤖 Prompt for AI Agents
In state-manager/tests/README.md around lines 84 to 97, the new "### 8.
`manual_retry_state.py`" section is triggering markdownlint errors MD022/MD032
because there are no blank lines around the heading and the following list; add
a single blank line immediately before the "### 8. `manual_retry_state.py`"
heading, add one blank line between the heading and the start of the list, and
ensure there is a blank line after the end of the list so the heading and list
are each separated by one empty line from surrounding content.

### 9. `upsert_graph_template.py`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick

Add a blank line before this heading.
Prevents MD022 violation.

Apply:

-### 9. `upsert_graph_template.py`
+
+### 9. `upsert_graph_template.py`
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
### 9. `upsert_graph_template.py`
### 9. `upsert_graph_template.py`
🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

98-98: Headings should be surrounded by blank lines
Expected: 1; Actual: 0; Below

(MD022, blanks-around-headings)

🤖 Prompt for AI Agents
In state-manager/tests/README.md around line 98, there is a missing blank line
immediately before the "### 9. `upsert_graph_template.py`" heading which
triggers an MD022 lint violation; insert a single empty line above that heading
so it is separated from the previous content.

- ✅ Existing template updates
- ✅ New template creation
- ✅ Empty nodes handling
Expand Down
Loading
Loading