Skip to content

Commit

Permalink
feat: Adds "head" flag to flow-run logs (#8003)
Browse files Browse the repository at this point in the history
Co-authored-by: ohadchaet <ohad.chaet@lynx.md>
Co-authored-by: Michael Adkins <michael@prefect.io>
Co-authored-by: Serina Grill <42048900+serinamarie@users.noreply.github.com>
  • Loading branch information
4 people authored and github-actions[bot] committed Jan 19, 2023
1 parent ae10a0c commit ddc6347
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 38 deletions.
45 changes: 40 additions & 5 deletions src/prefect/cli/flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
)
app.add_typer(flow_run_app, aliases=["flow-runs"])

LOGS_DEFAULT_PAGE_SIZE = 200
LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS = 20


@flow_run_app.command()
async def inspect(id: UUID):
Expand Down Expand Up @@ -139,13 +142,36 @@ async def cancel(id: UUID):


@flow_run_app.command()
async def logs(id: UUID):
async def logs(
id: UUID,
head: bool = typer.Option(
False,
"--head",
"-h",
help=f"Show the first {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of all logs.",
),
num_logs: int = typer.Option(
None,
"--num-logs",
"-n",
help=f"Number of logs to show when using the --head flag. If None, defaults to {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS}.",
min=1,
),
):
"""
View logs for a flow run.
"""
page_size = 200
# Pagination - API returns max 200 (LOGS_DEFAULT_PAGE_SIZE) logs at a time
offset = 0
more_logs = True
num_logs_returned = 0

# If head is specified, we need to stop after we've retrieved enough logs
if head or num_logs:
user_specified_num_logs = num_logs or LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS
else:
user_specified_num_logs = None

log_filter = LogFilter(flow_run_id={"any_": [id]})

async with get_client() as client:
Expand All @@ -156,9 +182,15 @@ async def logs(id: UUID):
exit_with_error(f"Flow run {str(id)!r} not found!")

while more_logs:
num_logs_to_return_from_page = (
LOGS_DEFAULT_PAGE_SIZE
if user_specified_num_logs is None
else min(LOGS_DEFAULT_PAGE_SIZE, user_specified_num_logs)
)

# Get the next page of logs
page_logs = await client.read_logs(
log_filter=log_filter, limit=page_size, offset=offset
log_filter=log_filter, limit=num_logs_to_return_from_page, offset=offset
)

# Print the logs
Expand All @@ -169,8 +201,11 @@ async def logs(id: UUID):
soft_wrap=True,
)

if len(page_logs) == page_size:
offset += page_size
# Update the number of logs retrieved
num_logs_returned += num_logs_to_return_from_page

if len(page_logs) == LOGS_DEFAULT_PAGE_SIZE:
offset += LOGS_DEFAULT_PAGE_SIZE
else:
# No more logs to show, exit
more_logs = False
195 changes: 162 additions & 33 deletions tests/cli/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import prefect.exceptions
from prefect import flow
from prefect.cli.flow_run import LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS
from prefect.orion.schemas.actions import LogCreate
from prefect.states import (
AwaitingRetry,
Expand Down Expand Up @@ -279,19 +280,13 @@ def test_wrong_id_exits_with_error(self):
)


class TestFlowRunLogs:
PAGE_SIZE = 200

@pytest.mark.parametrize("state", [Completed, Failed, Crashed, Cancelled])
async def test_when_num_logs_smaller_than_page_size_then_no_pagination(
self, orion_client, state
):
# Given
@pytest.fixture()
def flow_run_factory(orion_client):
async def create_flow_run(num_logs: int):
flow_run = await orion_client.create_flow_run(
name="scheduled_flow_run", flow=hello_flow, state=state()
name="scheduled_flow_run", flow=hello_flow
)

# Create not enough flow run logs to result in pagination (page_size <= 200)
logs = [
LogCreate(
name="prefect.flow_runs",
Expand All @@ -300,10 +295,24 @@ async def test_when_num_logs_smaller_than_page_size_then_no_pagination(
timestamp=datetime.now(tz=timezone.utc),
flow_run_id=flow_run.id,
)
for i in range(self.PAGE_SIZE - 1)
for i in range(num_logs)
]
await orion_client.create_logs(logs)

return flow_run

return create_flow_run


class TestFlowRunLogs:
LOGS_DEFAULT_PAGE_SIZE = 200

async def test_when_num_logs_smaller_than_page_size_then_no_pagination(
self, flow_run_factory
):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE - 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
Expand All @@ -315,31 +324,15 @@ async def test_when_num_logs_smaller_than_page_size_then_no_pagination(
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(self.PAGE_SIZE - 1)
for i in range(self.LOGS_DEFAULT_PAGE_SIZE - 1)
],
)

@pytest.mark.parametrize("state", [Completed, Failed, Crashed, Cancelled])
async def test_when_num_logs_greater_than_page_size_then_pagination(
self, orion_client, state
self, flow_run_factory
):
# Given
flow_run = await orion_client.create_flow_run(
name="scheduled_flow_run", flow=hello_flow, state=state()
)

# Create enough flow run logs to result in pagination (page_size > 200)
logs = [
LogCreate(
name="prefect.flow_runs",
level=20,
message=f"Log {i} from flow_run {flow_run.id}.",
timestamp=datetime.now(tz=timezone.utc),
flow_run_id=flow_run.id,
)
for i in range(self.PAGE_SIZE + 1)
]
await orion_client.create_logs(logs)
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
Expand All @@ -352,12 +345,11 @@ async def test_when_num_logs_greater_than_page_size_then_pagination(
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(self.PAGE_SIZE + 1)
for i in range(self.LOGS_DEFAULT_PAGE_SIZE + 1)
],
)

@pytest.mark.parametrize("state", [Completed, Failed, Crashed, Cancelled])
async def test_when_flow_run_not_found_then_exit_with_error(self, state):
async def test_when_flow_run_not_found_then_exit_with_error(self, flow_run_factory):
# Given
bad_id = str(uuid4())

Expand All @@ -372,3 +364,140 @@ async def test_when_flow_run_not_found_then_exit_with_error(self, state):
expected_code=1,
expected_output_contains=f"Flow run '{bad_id}' not found!\n",
)

async def test_when_num_logs_smaller_than_page_size_with_head_then_no_pagination(
self, flow_run_factory
):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--head",
"--num-logs",
"10",
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(10)
],
expected_line_count=10,
)

async def test_when_num_logs_greater_than_page_size_with_head_then_pagination(
self, flow_run_factory
):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--head",
"--num-logs",
self.LOGS_DEFAULT_PAGE_SIZE + 1,
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(self.LOGS_DEFAULT_PAGE_SIZE + 1)
],
expected_line_count=self.LOGS_DEFAULT_PAGE_SIZE + 1,
)

async def test_default_head_returns_default_num_logs(self, flow_run_factory):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--head",
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS)
],
expected_line_count=LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS,
)

async def test_h_and_n_shortcuts_for_head_and_num_logs(self, flow_run_factory):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"-h",
"-n",
"10",
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(10)
],
expected_line_count=10,
)

async def test_num_logs_passed_standalone_returns_num_logs(self, flow_run_factory):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--num-logs",
"10",
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(10)
],
expected_line_count=10,
)

@pytest.mark.skip(reason="we need to disable colors for this test to pass")
async def test_when_num_logs_is_smaller_than_one_then_exit_with_error(
self, flow_run_factory
):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--num-logs",
"0",
],
expected_code=2,
expected_output_contains="Invalid value for '--num-logs' / '-n': 0 is not in the range x>=1.",
)

0 comments on commit ddc6347

Please sign in to comment.