Skip to content

Commit 6cc3490

Browse files
authored
Merge branch 'main' into fix/1188-converter-py313-isinstance
2 parents fe5069b + 1b70f07 commit 6cc3490

File tree

19 files changed

+785
-56
lines changed

19 files changed

+785
-56
lines changed

.github/workflows/nightly-throughput-stress.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ on:
2525
default: 360
2626
type: number
2727

28+
permissions:
29+
contents: read
30+
2831
env:
2932
# Workflow configuration
3033
TEST_DURATION: ${{ inputs.duration || vars.NIGHTLY_TEST_DURATION || '5h' }}
@@ -164,7 +167,7 @@ jobs:
164167
"type": "section",
165168
"text": {
166169
"type": "mrkdwn",
167-
"text": "*Nightly Throughput Stress Failed* :x:\n\n*Duration:* ${{ env.TEST_DURATION }}\n*Run:* <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|View Logs>\n*Triggered by:* ${{ github.event_name == 'schedule' && 'Scheduled' || github.actor }}"
170+
"text": "*Nightly Throughput Stress Failed* :x:\n\n*Repository:* ${{ github.repository }}\n*Duration:* ${{ env.TEST_DURATION }}\n*Run:* <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|View Logs>\n*Triggered by:* ${{ github.event_name == 'schedule' && 'Scheduled' || github.actor }}"
168171
}
169172
}
170173
]

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ temporalio/bridge/temporal_sdk_bridge*
1010
/sdk-python.iml
1111
/.zed
1212
*.DS_Store
13+
tags

scripts/gen_payload_visitor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ def walk(self, desc: Descriptor) -> bool:
172172
if key in self.generated:
173173
return self.generated[key]
174174
if key in self.in_progress:
175-
# Break cycles; if another path proves this node needed, we'll revisit
176-
return False
175+
# Break cycles; Assume the child will be needed (Used by Failure -> Cause)
176+
return True
177177

178178
has_payload = False
179179
self.in_progress.add(key)

temporalio/bridge/_visitor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ async def _visit_temporal_api_failure_v1_ResetWorkflowFailureInfo(self, fs, o):
7474
async def _visit_temporal_api_failure_v1_Failure(self, fs, o):
7575
if o.HasField("encoded_attributes"):
7676
await self._visit_temporal_api_common_v1_Payload(fs, o.encoded_attributes)
77+
if o.HasField("cause"):
78+
await self._visit_temporal_api_failure_v1_Failure(fs, o.cause)
7779
if o.HasField("application_failure_info"):
7880
await self._visit_temporal_api_failure_v1_ApplicationFailureInfo(
7981
fs, o.application_failure_info

temporalio/contrib/openai_agents/_invoke_model_activity.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,15 +248,18 @@ def make_tool(tool: ToolInput) -> Tool:
248248
) from e
249249

250250
# Specifically retryable status codes
251-
if e.response.status_code in [408, 409, 429, 500]:
251+
if (
252+
e.response.status_code in [408, 409, 429]
253+
or e.response.status_code >= 500
254+
):
252255
raise ApplicationError(
253-
"Retryable OpenAI status code",
256+
f"Retryable OpenAI status code: {e.response.status_code}",
254257
non_retryable=False,
255258
next_retry_delay=retry_after,
256259
) from e
257260

258261
raise ApplicationError(
259-
"Non retryable OpenAI status code",
262+
f"Non retryable OpenAI status code: {e.response.status_code}",
260263
non_retryable=True,
261264
next_retry_delay=retry_after,
262265
) from e

temporalio/contrib/openai_agents/_model_parameters.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,6 @@ class ModelActivityParameters:
7070

7171
priority: Priority = Priority.default
7272
"""Priority for the activity execution."""
73+
74+
use_local_activity: bool = False
75+
"""Whether to use a local activity. If changed during a workflow execution, that would break determinism."""

temporalio/contrib/openai_agents/_temporal_model_stub.py

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,20 +154,32 @@ def make_tool_info(tool: Tool) -> ToolInput:
154154
else:
155155
summary = None
156156

157-
return await workflow.execute_activity_method(
158-
ModelActivity.invoke_model_activity,
159-
activity_input,
160-
summary=summary,
161-
task_queue=self.model_params.task_queue,
162-
schedule_to_close_timeout=self.model_params.schedule_to_close_timeout,
163-
schedule_to_start_timeout=self.model_params.schedule_to_start_timeout,
164-
start_to_close_timeout=self.model_params.start_to_close_timeout,
165-
heartbeat_timeout=self.model_params.heartbeat_timeout,
166-
retry_policy=self.model_params.retry_policy,
167-
cancellation_type=self.model_params.cancellation_type,
168-
versioning_intent=self.model_params.versioning_intent,
169-
priority=self.model_params.priority,
170-
)
157+
if self.model_params.use_local_activity:
158+
return await workflow.execute_local_activity_method(
159+
ModelActivity.invoke_model_activity,
160+
activity_input,
161+
summary=summary,
162+
schedule_to_close_timeout=self.model_params.schedule_to_close_timeout,
163+
schedule_to_start_timeout=self.model_params.schedule_to_start_timeout,
164+
start_to_close_timeout=self.model_params.start_to_close_timeout,
165+
retry_policy=self.model_params.retry_policy,
166+
cancellation_type=self.model_params.cancellation_type,
167+
)
168+
else:
169+
return await workflow.execute_activity_method(
170+
ModelActivity.invoke_model_activity,
171+
activity_input,
172+
summary=summary,
173+
task_queue=self.model_params.task_queue,
174+
schedule_to_close_timeout=self.model_params.schedule_to_close_timeout,
175+
schedule_to_start_timeout=self.model_params.schedule_to_start_timeout,
176+
start_to_close_timeout=self.model_params.start_to_close_timeout,
177+
heartbeat_timeout=self.model_params.heartbeat_timeout,
178+
retry_policy=self.model_params.retry_policy,
179+
cancellation_type=self.model_params.cancellation_type,
180+
versioning_intent=self.model_params.versioning_intent,
181+
priority=self.model_params.priority,
182+
)
171183

172184
def stream_response(
173185
self,

temporalio/contrib/opentelemetry.py

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -172,29 +172,34 @@ def _start_as_current_span(
172172
kind: opentelemetry.trace.SpanKind,
173173
context: Optional[Context] = None,
174174
) -> Iterator[None]:
175-
with self.tracer.start_as_current_span(
176-
name,
177-
attributes=attributes,
178-
kind=kind,
179-
context=context,
180-
set_status_on_exception=False,
181-
) as span:
182-
if input:
183-
input.headers = self._context_to_headers(input.headers)
184-
try:
185-
yield None
186-
except Exception as exc:
187-
if (
188-
not isinstance(exc, ApplicationError)
189-
or exc.category != ApplicationErrorCategory.BENIGN
190-
):
191-
span.set_status(
192-
Status(
193-
status_code=StatusCode.ERROR,
194-
description=f"{type(exc).__name__}: {exc}",
175+
token = opentelemetry.context.attach(context) if context else None
176+
try:
177+
with self.tracer.start_as_current_span(
178+
name,
179+
attributes=attributes,
180+
kind=kind,
181+
context=context,
182+
set_status_on_exception=False,
183+
) as span:
184+
if input:
185+
input.headers = self._context_to_headers(input.headers)
186+
try:
187+
yield None
188+
except Exception as exc:
189+
if (
190+
not isinstance(exc, ApplicationError)
191+
or exc.category != ApplicationErrorCategory.BENIGN
192+
):
193+
span.set_status(
194+
Status(
195+
status_code=StatusCode.ERROR,
196+
description=f"{type(exc).__name__}: {exc}",
197+
)
195198
)
196-
)
197-
raise
199+
raise
200+
finally:
201+
if token and context is opentelemetry.context.get_current():
202+
opentelemetry.context.detach(token)
198203

199204
def _completed_workflow_span(
200205
self, params: _CompletedWorkflowSpanParams

temporalio/worker/workflow_sandbox/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,13 @@
5858
RestrictedWorkflowAccessError,
5959
SandboxMatcher,
6060
SandboxRestrictions,
61+
UnintentionalPassthroughError,
6162
)
6263
from ._runner import SandboxedWorkflowRunner
6364

6465
__all__ = [
6566
"RestrictedWorkflowAccessError",
67+
"UnintentionalPassthroughError",
6668
"SandboxedWorkflowRunner",
6769
"SandboxMatcher",
6870
"SandboxRestrictions",

temporalio/worker/workflow_sandbox/_importer.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
RestrictedWorkflowAccessError,
4343
RestrictionContext,
4444
SandboxRestrictions,
45+
UnintentionalPassthroughError,
4546
)
4647

4748
logger = logging.getLogger(__name__)
@@ -200,6 +201,17 @@ def _import(
200201

201202
# Check module restrictions and passthrough modules
202203
if full_name not in sys.modules:
204+
# Issue a warning if appropriate
205+
if (
206+
self.restriction_context.in_activation
207+
and self._is_import_notification_policy_applied(
208+
temporalio.workflow.SandboxImportNotificationPolicy.WARN_ON_DYNAMIC_IMPORT
209+
)
210+
):
211+
warnings.warn(
212+
f"Module {full_name} was imported after initial workflow load."
213+
)
214+
203215
# Make sure not an entirely invalid module
204216
self._assert_valid_module(full_name)
205217

@@ -282,13 +294,36 @@ def module_configured_passthrough(self, name: str) -> bool:
282294
break
283295
return True
284296

297+
def _is_import_notification_policy_applied(
298+
self, policy: temporalio.workflow.SandboxImportNotificationPolicy
299+
) -> bool:
300+
override_policy = (
301+
temporalio.workflow.unsafe.current_import_notification_policy_override()
302+
)
303+
if override_policy:
304+
return policy in override_policy
305+
306+
return policy in self.restrictions.import_notification_policy
307+
285308
def _maybe_passthrough_module(self, name: str) -> Optional[types.ModuleType]:
286309
# If imports not passed through and all modules are not passed through
287310
# and name not in passthrough modules, check parents
288311
if (
289312
not temporalio.workflow.unsafe.is_imports_passed_through()
290313
and not self.module_configured_passthrough(name)
291314
):
315+
if self._is_import_notification_policy_applied(
316+
temporalio.workflow.SandboxImportNotificationPolicy.RAISE_ON_UNINTENTIONAL_PASSTHROUGH
317+
):
318+
raise UnintentionalPassthroughError(name)
319+
320+
if self._is_import_notification_policy_applied(
321+
temporalio.workflow.SandboxImportNotificationPolicy.WARN_ON_UNINTENTIONAL_PASSTHROUGH
322+
):
323+
warnings.warn(
324+
f"Module {name} was not intentionally passed through to the sandbox."
325+
)
326+
292327
return None
293328
# Do the pass through
294329
with self._unapplied():

0 commit comments

Comments
 (0)