Skip to content

Commit

Permalink
update logging in the orchestrator to help future debugging
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 606869097
  • Loading branch information
tfx-copybara committed Feb 14, 2024
1 parent 7bb809d commit 31a4d00
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
23 changes: 14 additions & 9 deletions tfx/orchestration/experimental/core/async_pipeline_task_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,21 @@ def __call__(self) -> List[task_lib.Task]:
task_lib.exec_node_task_id_from_node(self._pipeline, node)):
continue

result.extend(
self._generate_tasks_for_node(
self._mlmd_handle, node, node_state.backfill_token
)
logging.info(
'[AsyncPipelineTaskGenerator._generate_tasks_for_node] generating'
' tasks for node %s',
node.node_info.id,
)
tasks = self._generate_tasks_for_node(
self._mlmd_handle, node, node_state.backfill_token
)
logging.info(
'[AsyncPipelineTaskGenerator._generate_tasks_for_node] generated'
' tasks for node %s: %s',
node.node_info.id,
[t.task_id for t in tasks],
)
result.extend(tasks)
return result

def _generate_tasks_for_node(
Expand All @@ -216,11 +226,6 @@ def _generate_tasks_for_node(
Returns:
Returns a `Task` or `None` if task generation is deemed infeasible.
"""
logging.info(
'[AsyncPipelineTaskGenerator._generate_tasks_for_node] invoked for'
' node %s',
node.node_info.id,
)
result = []
node_uid = task_lib.NodeUid.from_node(self._pipeline, node)

Expand Down
7 changes: 6 additions & 1 deletion tfx/orchestration/experimental/core/pipeline_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1845,8 +1845,13 @@ def _filter_by_node_id(
),
)

logging.info('Generating tasks for pipeline %s', pipeline_state.pipeline_uid)
tasks = generator.generate(pipeline_state)
logging.info('Generated tasks: %s', [t.task_id for t in tasks])
logging.info(
'Generated tasks for pipeline %s: %s',
pipeline_state.pipeline_uid,
[t.task_id for t in tasks],
)

# If nodes reach a terminal state, call stop_node_services for pure/mixed
# service nodes, and cancel active executions.
Expand Down
16 changes: 11 additions & 5 deletions tfx/orchestration/experimental/core/sync_pipeline_task_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,18 @@ def __call__(self) -> List[task_lib.Task]:
unrunnable_node_ids
):
continue
logging.info(
'[SyncPipelineTaskGenerator._generate_tasks_for_node] generating'
' tasks for node %s',
node.node_info.id,
)
tasks = self._generate_tasks_for_node(node)
logging.info(
'[SyncPipelineTaskGenerator._generate_tasks_for_node] generated'
' tasks for node %s: %s',
node.node_info.id,
[t.task_id for t in tasks],
)
for task in tasks:
if isinstance(task, task_lib.UpdateNodeStateTask):
if pstate.is_node_state_success(
Expand Down Expand Up @@ -275,11 +286,6 @@ def __call__(self) -> List[task_lib.Task]:
def _generate_tasks_for_node(
self, node: node_proto_view.NodeProtoView) -> List[task_lib.Task]:
"""Generates list of tasks for the given node."""
logging.info(
'[SyncPipelineTaskGenerator._generate_tasks_for_node] invoked for'
' node %s',
node.node_info.id,
)
node_uid = task_lib.NodeUid.from_node(self._pipeline, node)
node_id = node.node_info.id
result = []
Expand Down

0 comments on commit 31a4d00

Please sign in to comment.