-
Notifications
You must be signed in to change notification settings - Fork 55
/
DurableOrchestrationContext.py
707 lines (601 loc) · 27.2 KB
/
DurableOrchestrationContext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
from collections import defaultdict
from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction
from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction
from azure.durable_functions.models.Task import TaskBase, TimerTask
from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction
from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest
from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \
CallSubOrchestratorWithRetryAction
from azure.durable_functions.models.actions.CallActivityWithRetryAction import \
CallActivityWithRetryAction
from azure.durable_functions.models.actions.ContinueAsNewAction import \
ContinueAsNewAction
from azure.durable_functions.models.actions.WaitForExternalEventAction import \
WaitForExternalEventAction
from azure.durable_functions.models.actions.CallSubOrchestratorAction import \
CallSubOrchestratorAction
from azure.durable_functions.models.actions.CreateTimerAction import CreateTimerAction
from azure.durable_functions.models.Task import WhenAllTask, WhenAnyTask, AtomicTask, \
RetryAbleTask
from azure.durable_functions.models.actions.CallActivityAction import CallActivityAction
from azure.durable_functions.models.ReplaySchema import ReplaySchema
import json
import datetime
import inspect
from typing import DefaultDict, List, Any, Dict, Optional, Tuple, Union, Callable
from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID
from datetime import timezone
from .RetryOptions import RetryOptions
from .FunctionContext import FunctionContext
from .history import HistoryEvent, HistoryEventType
from .actions import Action
from ..models.TokenSource import TokenSource
from .utils.entity_utils import EntityId
from azure.functions._durable_functions import _deserialize_custom_object
from azure.durable_functions.constants import DATETIME_STRING_FORMAT
from azure.durable_functions.decorators.metadata import OrchestrationTrigger, ActivityTrigger
from azure.functions.decorators.function_app import FunctionBuilder
class DurableOrchestrationContext:
"""Context of the durable orchestration execution.
Parameter data for orchestration bindings that can be used to schedule
function-based activities.
"""
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
def __init__(self,
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs):
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
self._instance_id: str = instanceId
self._is_replaying: bool = isReplaying
self._parent_instance_id: str = parentInstanceId
self._custom_status: Any = None
self._new_uuid_counter: int = 0
self._sub_orchestrator_counter: int = 0
self._continue_as_new_flag: bool = False
self.decision_started_event: HistoryEvent = \
[e_ for e_ in self.histories
if e_.event_type == HistoryEventType.ORCHESTRATOR_STARTED][0]
self._current_utc_datetime: datetime.datetime = \
self.decision_started_event.timestamp
self._new_uuid_counter = 0
self._function_context: FunctionContext = FunctionContext(**kwargs)
self._sequence_number = 0
self._replay_schema = ReplaySchema(upperSchemaVersion)
self._action_payload_v1: List[List[Action]] = []
self._action_payload_v2: List[Action] = []
# make _input always a string
# (consistent with Python Functions generic trigger/input bindings)
if (isinstance(input, Dict)):
input = json.dumps(input)
self._input: Any = input
self.open_tasks: DefaultDict[Union[int, str], Union[List[TaskBase], TaskBase]]
self.open_tasks = defaultdict(list)
self.deferred_tasks: Dict[Union[int, str], Tuple[HistoryEvent, bool, str]] = {}
@classmethod
def from_json(cls, json_string: str):
"""Convert the value passed into a new instance of the class.
Parameters
----------
json_string: str
Context passed a JSON serializable value to be converted into an instance of the class
Returns
-------
DurableOrchestrationContext
New instance of the durable orchestration context class
"""
# We should consider parsing the `Input` field here as well,
# instead of doing so lazily when `get_input` is called.
json_dict = json.loads(json_string)
return cls(**json_dict)
def _generate_task(self, action: Action,
retry_options: Optional[RetryOptions] = None,
id_: Optional[Union[int, str]] = None,
parent: Optional[TaskBase] = None,
task_constructor=AtomicTask) -> Union[AtomicTask, RetryAbleTask, TimerTask]:
"""Generate an atomic or retryable Task based on an input.
Parameters
----------
action : Action
The action backing the Task.
retry_options : Optional[RetryOptions]
RetryOptions for a with-retry task, by default None
Returns
-------
Union[AtomicTask, RetryAbleTask]
Either an atomic task or a retry-able task
"""
# Create an atomic task
task: Union[AtomicTask, RetryAbleTask]
action_payload: Union[Action, List[Action]]
# TODO: find cleanear way to do this
if self._replay_schema is ReplaySchema.V1:
action_payload = [action]
else:
action_payload = action
task = task_constructor(id_, action_payload)
task.parent = parent
# if task is retryable, provide the retryable wrapper class
if not (retry_options is None):
task = RetryAbleTask(task, retry_options, self)
return task
def _set_is_replaying(self, is_replaying: bool):
"""Set the internal `is_replaying` flag.
Parameters
----------
is_replaying : bool
New value of the `is_replaying` flag
"""
self._is_replaying = is_replaying
def call_activity(self, name: Union[str, Callable], input_: Optional[Any] = None) -> TaskBase:
"""Schedule an activity for execution.
Parameters
----------
name: str | Callable
Either the name of the activity function to call, as a string or,
in the Python V2 programming model, the activity function itself.
input_: Optional[Any]
The JSON-serializable input to pass to the activity function.
Returns
-------
Task
A Durable Task that completes when the called activity function completes or fails.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)
if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, ActivityTrigger)
action = CallActivityAction(name, input_)
task = self._generate_task(action)
return task
def call_activity_with_retry(self,
name: Union[str, Callable], retry_options: RetryOptions,
input_: Optional[Any] = None) -> TaskBase:
"""Schedule an activity for execution with retry options.
Parameters
----------
name: str | Callable
Either the name of the activity function to call, as a string or,
in the Python V2 programming model, the activity function itself.
retry_options: RetryOptions
The retry options for the activity function.
input_: Optional[Any]
The JSON-serializable input to pass to the activity function.
Returns
-------
Task
A Durable Task that completes when the called activity function completes or
fails completely.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)
if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, ActivityTrigger)
action = CallActivityWithRetryAction(name, retry_options, input_)
task = self._generate_task(action, retry_options)
return task
def call_http(self, method: str, uri: str, content: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
token_source: TokenSource = None) -> TaskBase:
"""Schedule a durable HTTP call to the specified endpoint.
Parameters
----------
method: str
The HTTP request method.
uri: str
The HTTP request uri.
content: Optional[str]
The HTTP request content.
headers: Optional[Dict[str, str]]
The HTTP request headers.
token_source: TokenSource
The source of OAuth token to add to the request.
Returns
-------
Task
The durable HTTP request to schedule.
"""
json_content: Optional[str] = None
if content and content is not isinstance(content, str):
json_content = json.dumps(content)
else:
json_content = content
request = DurableHttpRequest(method, uri, json_content, headers, token_source)
action = CallHttpAction(request)
task = self._generate_task(action)
return task
def call_sub_orchestrator(self,
name: Union[str, Callable], input_: Optional[Any] = None,
instance_id: Optional[str] = None) -> TaskBase:
"""Schedule sub-orchestration function named `name` for execution.
Parameters
----------
name: Union[str, Callable]
The name of the orchestrator function to call.
input_: Optional[Any]
The JSON-serializable input to pass to the orchestrator function.
instance_id: Optional[str]
A unique ID to use for the sub-orchestration instance.
Returns
-------
Task
A Durable Task that completes when the called sub-orchestrator completes or fails.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)
if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, OrchestrationTrigger)
action = CallSubOrchestratorAction(name, input_, instance_id)
task = self._generate_task(action)
return task
def call_sub_orchestrator_with_retry(self,
name: Union[str, Callable], retry_options: RetryOptions,
input_: Optional[Any] = None,
instance_id: Optional[str] = None) -> TaskBase:
"""Schedule sub-orchestration function named `name` for execution, with retry-options.
Parameters
----------
name: Union[str, Callable]
The name of the activity function to schedule.
retry_options: RetryOptions
The settings for retrying this sub-orchestrator in case of a failure.
input_: Optional[Any]
The JSON-serializable input to pass to the activity function. Defaults to None.
instance_id: str
The instance ID of the sub-orchestrator to call.
Returns
-------
Task
A Durable Task that completes when the called sub-orchestrator completes or fails.
"""
if isinstance(name, Callable) and not isinstance(name, FunctionBuilder):
error_message = "The `call_activity` API received a `Callable` without an "\
"associated Azure Functions trigger-type. "\
"Please ensure you're using the Python programming model V2 "\
"and that your activity function is annotated with the `activity_trigger`"\
"decorator. Otherwise, provide in the name of the activity as a string."
raise ValueError(error_message)
if isinstance(name, FunctionBuilder):
name = self._get_function_name(name, OrchestrationTrigger)
action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id)
task = self._generate_task(action, retry_options)
return task
def get_input(self) -> Optional[Any]:
"""Get the orchestration input."""
return None if self._input is None else json.loads(self._input,
object_hook=_deserialize_custom_object)
def new_uuid(self) -> str:
"""Create a new UUID that is safe for replay within an orchestration or operation.
The default implementation of this method creates a name-based UUID
using the algorithm from RFC 4122 §4.3. The name input used to generate
this value is a combination of the orchestration instance ID and an
internally managed sequence number.
Returns
-------
str
New UUID that is safe for replay within an orchestration or operation.
"""
URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875"
uuid_name_value = \
f"{self._instance_id}" \
f"_{self.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \
f"_{self._new_uuid_counter}"
self._new_uuid_counter += 1
namespace_uuid = uuid5(NAMESPACE_OID, URL_NAMESPACE)
return str(uuid5(namespace_uuid, uuid_name_value))
def task_all(self, activities: List[TaskBase]) -> TaskBase:
"""Schedule the execution of all activities.
Similar to Promise.all. When called with `yield` or `return`, returns an
array containing the results of all [[Task]]s passed to it. It returns
when all of the [[Task]] instances have completed.
Throws an exception if any of the activities fails
Parameters
----------
activities: List[Task]
List of activities to schedule
Returns
-------
TaskSet
The results of all activities.
"""
return WhenAllTask(activities, replay_schema=self._replay_schema)
def task_any(self, activities: List[TaskBase]) -> TaskBase:
"""Schedule the execution of all activities.
Similar to Promise.race. When called with `yield` or `return`, returns
the first [[Task]] instance to complete.
Throws an exception if all of the activities fail
Parameters
----------
activities: List[Task]
List of activities to schedule
Returns
-------
TaskSet
The first [[Task]] instance to complete.
"""
return WhenAnyTask(activities, replay_schema=self._replay_schema)
def set_custom_status(self, status: Any):
"""Set the customized orchestration status for your orchestrator function.
This status is also returned by the orchestration client through the get_status API
Parameters
----------
status : str
Customized status provided by the orchestrator
"""
self._custom_status = status
@property
def custom_status(self):
"""Get customized status of current orchestration."""
return self._custom_status
@property
def histories(self):
"""Get running history of tasks that have been scheduled."""
return self._histories
@property
def instance_id(self) -> str:
"""Get the ID of the current orchestration instance.
The instance ID is generated and fixed when the orchestrator function
is scheduled. It can be either auto-generated, in which case it is
formatted as a GUID, or it can be user-specified with any format.
Returns
-------
str
The ID of the current orchestration instance.
"""
return self._instance_id
@property
def is_replaying(self) -> bool:
"""Get the value indicating orchestration replaying itself.
This property is useful when there is logic that needs to run only when
the orchestrator function is _not_ replaying. For example, certain
types of application logging may become too noisy when duplicated as
part of orchestrator function replay. The orchestrator code could check
to see whether the function is being replayed and then issue the log
statements when this value is `false`.
Returns
-------
bool
Value indicating whether the orchestrator function is currently replaying.
"""
return self._is_replaying
@property
def parent_instance_id(self) -> str:
"""Get the ID of the parent orchestration.
The parent instance ID is generated and fixed when the parent
orchestrator function is scheduled. It can be either auto-generated, in
which case it is formatted as a GUID, or it can be user-specified with
any format.
Returns
-------
str
ID of the parent orchestration of the current sub-orchestration instance
"""
return self._parent_instance_id
@property
def current_utc_datetime(self) -> datetime.datetime:
"""Get the current date/time.
This date/time value is derived from the orchestration history. It
always returns the same value at specific points in the orchestrator
function code, making it deterministic and safe for replay.
Returns
-------
datetime
The current date/time in a way that is safe for use by orchestrator functions
"""
return self._current_utc_datetime
@current_utc_datetime.setter
def current_utc_datetime(self, value: datetime.datetime):
self._current_utc_datetime = value
@property
def function_context(self) -> FunctionContext:
"""Get the function level attributes not used by durable orchestrator.
Returns
-------
FunctionContext
Object containing function level attributes not used by durable orchestrator.
"""
return self._function_context
def call_entity(self, entityId: EntityId,
operationName: str, operationInput: Optional[Any] = None):
"""Get the result of Durable Entity operation given some input.
Parameters
----------
entityId: EntityId
The ID of the entity to call
operationName: str
The operation to execute
operationInput: Optional[Any]
The input for tne operation, defaults to None.
Returns
-------
Task
A Task of the entity call
"""
action = CallEntityAction(entityId, operationName, operationInput)
task = self._generate_task(action)
return task
def _record_fire_and_forget_action(self, action: Action):
"""Append a responseless-API action object to the actions array.
Parameters
----------
action : Action
The action to append
"""
new_action: Union[List[Action], Action]
if self._replay_schema is ReplaySchema.V2:
new_action = action
else:
new_action = [action]
self._add_to_actions(new_action)
self._sequence_number += 1
def signal_entity(self, entityId: EntityId,
operationName: str, operationInput: Optional[Any] = None):
"""Send a signal operation to Durable Entity given some input.
Parameters
----------
entityId: EntityId
The ID of the entity to call
operationName: str
The operation to execute
operationInput: Optional[Any]
The input for tne operation, defaults to None.
Returns
-------
Task
A Task of the entity signal
"""
action = SignalEntityAction(entityId, operationName, operationInput)
task = self._generate_task(action)
self._record_fire_and_forget_action(action)
return task
@property
def will_continue_as_new(self) -> bool:
"""Return true if continue_as_new was called."""
return self._continue_as_new_flag
def create_timer(self, fire_at: datetime.datetime) -> TaskBase:
"""Create a Timer Task to fire after at the specified deadline.
Parameters
----------
fire_at : datetime.datetime
The time for the timer to trigger
Returns
-------
TaskBase
A Durable Timer Task that schedules the timer to wake up the activity
"""
action = CreateTimerAction(fire_at)
task = self._generate_task(action, task_constructor=TimerTask)
return task
def wait_for_external_event(self, name: str) -> TaskBase:
"""Wait asynchronously for an event to be raised with the name `name`.
Parameters
----------
name : str
The event name of the event that the task is waiting for.
Returns
-------
Task
Task to wait for the event
"""
action = WaitForExternalEventAction(name)
task = self._generate_task(action, id_=name)
return task
def continue_as_new(self, input_: Any):
"""Schedule the orchestrator to continue as new.
Parameters
----------
input_ : Any
The new starting input to the orchestrator.
"""
continue_as_new_action: Action = ContinueAsNewAction(input_)
self._record_fire_and_forget_action(continue_as_new_action)
self._continue_as_new_flag = True
def new_guid(self) -> UUID:
"""Generate a replay-safe GUID.
Returns
-------
UUID
A new globally-unique ID
"""
guid_name = f"{self.instance_id}_{self.current_utc_datetime}"\
f"_{self._new_uuid_counter}"
self._new_uuid_counter += 1
guid = uuid5(NAMESPACE_URL, guid_name)
return guid
@property
def _actions(self) -> List[List[Action]]:
"""Get the actions payload of this context, for replay in the extension.
Returns
-------
List[List[Action]]
The actions of this context
"""
if self._replay_schema is ReplaySchema.V1:
return self._action_payload_v1
else:
return [self._action_payload_v2]
def _add_to_actions(self, action_repr: Union[List[Action], Action]):
"""Add a Task's actions payload to the context's actions array.
Parameters
----------
action_repr : Union[List[Action], Action]
The tasks to add
"""
# Do not add further actions after `continue_as_new` has been
# called
if self.will_continue_as_new:
return
if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list):
self._action_payload_v1.append(action_repr)
elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action):
self._action_payload_v2.append(action_repr)
else:
raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}"
f"is not compatible on ReplaySchema {self._replay_schema.name}. ")
def _pretty_print_history(self) -> str:
"""Get a pretty-printed version of the orchestration's internal history."""
def history_to_string(event):
json_dict = {}
for key, val in inspect.getmembers(event):
if not key.startswith('_') and not inspect.ismethod(val):
if isinstance(val, datetime.date):
val = val.replace(tzinfo=timezone.utc).timetuple()
json_dict[key] = val
return json.dumps(json_dict)
return str(list(map(history_to_string, self._histories)))
def _add_to_open_tasks(self, task: TaskBase):
if task._is_scheduled:
return
if isinstance(task, AtomicTask):
if task.id is None:
task.id = self._sequence_number
self._sequence_number += 1
self.open_tasks[task.id] = task
elif task.id != -1:
self.open_tasks[task.id].append(task)
if task.id in self.deferred_tasks:
task_update_action = self.deferred_tasks[task.id]
task_update_action()
else:
for child in task.children:
self._add_to_open_tasks(child)
def _get_function_name(self, name: FunctionBuilder,
trigger_type: Union[OrchestrationTrigger, ActivityTrigger]):
try:
if (isinstance(name._function._trigger, trigger_type)):
name = name._function._name
return name
else:
if (trigger_type == OrchestrationTrigger):
trigger_type = "OrchestrationTrigger"
else:
trigger_type = "ActivityTrigger"
error_message = "Received function with Trigger-type `"\
+ name._function._trigger.type\
+ "` but expected `" + trigger_type + "`. Ensure your "\
"function is annotated with the `" + trigger_type +\
"` decorator or directly pass in the name of the "\
"function as a string."
raise ValueError(error_message)
except AttributeError as e:
e.message = "Durable Functions SDK internal error: an "\
"expected attribute is missing from the `FunctionBuilder` "\
"object in the Python V2 programming model. Please report "\
"this bug in the Durable Functions Python SDK repo: "\
"https://github.com/Azure/azure-functions-durable-python.\n"\
"Error trace: " + e.message
raise e