Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds "head" flag to flow-run logs #8003

Merged
merged 34 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d212a38
feat: Adds "head" flag to `flow-run logs`
ohad-lynx Dec 29, 2022
8c4f2c1
fix: Provides test with correct head value
ohad-lynx Dec 29, 2022
85e94bd
refactor: renames remaining_logs to user_specified_num_logs
ohad-lynx Dec 29, 2022
988a342
Merge branch 'main' into flow-run-logs-head
ohadch Dec 29, 2022
70c7e0e
feat: Uses a flow_run_factory fixture
ohad-lynx Dec 29, 2022
a668fed
Merge remote-tracking branch 'origin/flow-run-logs-head' into flow-ru…
ohad-lynx Dec 29, 2022
3761095
Merge branch 'main' into flow-run-logs-head
ohadch Dec 30, 2022
bef6e08
refactor: uses constants
ohad-lynx Dec 30, 2022
d59851d
feat: Adds --num-lines flag
ohad-lynx Dec 30, 2022
acdc5db
fix: Default head value is 20 lines
ohad-lynx Dec 30, 2022
a2d2af6
Merge branch 'main' into flow-run-logs-head
zanieb Jan 3, 2023
2d87101
Merge branch 'main' into flow-run-logs-head
serinamarie Jan 3, 2023
c0e0bf2
fix: Limits num lines if -n is passed without --head
ohad-lynx Jan 4, 2023
18f8187
Merge branch 'main' into flow-run-logs-head
ohadch Jan 4, 2023
78d7228
Merge branch 'main' into flow-run-logs-head
ohadch Jan 5, 2023
b871e1b
Update src/prefect/cli/flow_run.py
ohadch Jan 5, 2023
b37e90b
Update src/prefect/cli/flow_run.py
ohadch Jan 5, 2023
2edf922
refactor: renames constants to follow the same convention
ohad-lynx Jan 5, 2023
6f66e0c
Update tests/cli/test_flow_run.py
ohadch Jan 5, 2023
6109acf
Update tests/cli/test_flow_run.py
ohadch Jan 5, 2023
045a8ab
refactor: renames test constant to be clearer
ohad-lynx Jan 5, 2023
eb45465
refactor: renames num-lines to num-logs
ohad-lynx Jan 5, 2023
c57e7c8
refactor: better naming of constant
ohad-lynx Jan 6, 2023
89be621
Merge branch 'main' into flow-run-logs-head
ohadch Jan 6, 2023
5b6bef9
Merge branch 'main' into flow-run-logs-head
serinamarie Jan 6, 2023
ca18c51
fix: renames occurrences of 'lines' to 'logs'
ohad-lynx Jan 6, 2023
25f0ada
Merge branch 'main' into flow-run-logs-head
serinamarie Jan 6, 2023
57e4523
Merge branch 'main' into flow-run-logs-head
ohadch Jan 6, 2023
6a23a5d
Merge branch 'main' into flow-run-logs-head
ohadch Jan 8, 2023
98995f2
Merge branch 'main' into flow-run-logs-head
ohadch Jan 10, 2023
3f77df6
Merge branch 'main' into flow-run-logs-head
ohadch Jan 11, 2023
f31f393
Merge branch 'main' into flow-run-logs-head
ohadch Jan 17, 2023
cfdbc04
Update tests/cli/test_flow_run.py
ohadch Jan 17, 2023
b32e1db
Merge branch 'main' into flow-run-logs-head
ohadch Jan 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions src/prefect/cli/flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
)
app.add_typer(flow_run_app, aliases=["flow-runs"])

LOGS_DEFAULT_PAGE_SIZE = 200
LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS = 20


@flow_run_app.command()
async def inspect(id: UUID):
Expand Down Expand Up @@ -139,13 +142,36 @@ async def cancel(id: UUID):


@flow_run_app.command()
async def logs(id: UUID):
async def logs(
id: UUID,
head: bool = typer.Option(
False,
"--head",
"-h",
help=f"Show the first {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of all logs.",
),
num_logs: int = typer.Option(
None,
"--num-logs",
"-n",
help=f"Number of logs to show when using the --head flag. If None, defaults to {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS}.",
min=1,
),
):
"""
View logs for a flow run.
"""
page_size = 200
# Pagination - API returns max 200 (LOGS_DEFAULT_PAGE_SIZE) logs at a time
offset = 0
more_logs = True
num_logs_returned = 0
ohadch marked this conversation as resolved.
Show resolved Hide resolved

# If head is specified, we need to stop after we've retrieved enough logs
if head or num_logs:
user_specified_num_logs = num_logs or LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS
else:
user_specified_num_logs = None

log_filter = LogFilter(flow_run_id={"any_": [id]})

async with get_client() as client:
Expand All @@ -156,9 +182,15 @@ async def logs(id: UUID):
exit_with_error(f"Flow run {str(id)!r} not found!")

while more_logs:
num_logs_to_return_from_page = (
LOGS_DEFAULT_PAGE_SIZE
if user_specified_num_logs is None
else min(LOGS_DEFAULT_PAGE_SIZE, user_specified_num_logs)
)

# Get the next page of logs
page_logs = await client.read_logs(
log_filter=log_filter, limit=page_size, offset=offset
log_filter=log_filter, limit=num_logs_to_return_from_page, offset=offset
)

# Print the logs
Expand All @@ -169,8 +201,11 @@ async def logs(id: UUID):
soft_wrap=True,
)

if len(page_logs) == page_size:
offset += page_size
# Update the number of logs retrieved
num_logs_returned += num_logs_to_return_from_page

if len(page_logs) == LOGS_DEFAULT_PAGE_SIZE:
offset += LOGS_DEFAULT_PAGE_SIZE
else:
# No more logs to show, exit
more_logs = False
195 changes: 162 additions & 33 deletions tests/cli/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import prefect.exceptions
from prefect import flow
from prefect.cli.flow_run import LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS
from prefect.orion.schemas.actions import LogCreate
from prefect.states import (
AwaitingRetry,
Expand Down Expand Up @@ -279,19 +280,13 @@ def test_wrong_id_exits_with_error(self):
)


class TestFlowRunLogs:
PAGE_SIZE = 200

@pytest.mark.parametrize("state", [Completed, Failed, Crashed, Cancelled])
async def test_when_num_logs_smaller_than_page_size_then_no_pagination(
self, orion_client, state
):
# Given
@pytest.fixture()
def flow_run_factory(orion_client):
async def create_flow_run(num_logs: int):
flow_run = await orion_client.create_flow_run(
name="scheduled_flow_run", flow=hello_flow, state=state()
name="scheduled_flow_run", flow=hello_flow
)

# Create not enough flow run logs to result in pagination (page_size <= 200)
logs = [
LogCreate(
name="prefect.flow_runs",
Expand All @@ -300,10 +295,24 @@ async def test_when_num_logs_smaller_than_page_size_then_no_pagination(
timestamp=datetime.now(tz=timezone.utc),
flow_run_id=flow_run.id,
)
for i in range(self.PAGE_SIZE - 1)
for i in range(num_logs)
]
await orion_client.create_logs(logs)

return flow_run

return create_flow_run


class TestFlowRunLogs:
LOGS_DEFAULT_PAGE_SIZE = 200

async def test_when_num_logs_smaller_than_page_size_then_no_pagination(
self, flow_run_factory
):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE - 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
Expand All @@ -315,31 +324,15 @@ async def test_when_num_logs_smaller_than_page_size_then_no_pagination(
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(self.PAGE_SIZE - 1)
for i in range(self.LOGS_DEFAULT_PAGE_SIZE - 1)
],
)

@pytest.mark.parametrize("state", [Completed, Failed, Crashed, Cancelled])
async def test_when_num_logs_greater_than_page_size_then_pagination(
self, orion_client, state
self, flow_run_factory
):
# Given
flow_run = await orion_client.create_flow_run(
name="scheduled_flow_run", flow=hello_flow, state=state()
)

# Create enough flow run logs to result in pagination (page_size > 200)
logs = [
LogCreate(
name="prefect.flow_runs",
level=20,
message=f"Log {i} from flow_run {flow_run.id}.",
timestamp=datetime.now(tz=timezone.utc),
flow_run_id=flow_run.id,
)
for i in range(self.PAGE_SIZE + 1)
]
await orion_client.create_logs(logs)
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
Expand All @@ -352,12 +345,11 @@ async def test_when_num_logs_greater_than_page_size_then_pagination(
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(self.PAGE_SIZE + 1)
for i in range(self.LOGS_DEFAULT_PAGE_SIZE + 1)
],
)

@pytest.mark.parametrize("state", [Completed, Failed, Crashed, Cancelled])
async def test_when_flow_run_not_found_then_exit_with_error(self, state):
async def test_when_flow_run_not_found_then_exit_with_error(self, flow_run_factory):
# Given
bad_id = str(uuid4())

Expand All @@ -372,3 +364,140 @@ async def test_when_flow_run_not_found_then_exit_with_error(self, state):
expected_code=1,
expected_output_contains=f"Flow run '{bad_id}' not found!\n",
)

async def test_when_num_logs_smaller_than_page_size_with_head_then_no_pagination(
self, flow_run_factory
):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--head",
"--num-logs",
"10",
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(10)
],
expected_line_count=10,
)

async def test_when_num_logs_greater_than_page_size_with_head_then_pagination(
self, flow_run_factory
):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--head",
"--num-logs",
self.LOGS_DEFAULT_PAGE_SIZE + 1,
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(self.LOGS_DEFAULT_PAGE_SIZE + 1)
],
expected_line_count=self.LOGS_DEFAULT_PAGE_SIZE + 1,
)

async def test_default_head_returns_default_num_logs(self, flow_run_factory):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--head",
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS)
],
expected_line_count=LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS,
)

async def test_h_and_n_shortcuts_for_head_and_num_logs(self, flow_run_factory):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"-h",
"-n",
"10",
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(10)
],
expected_line_count=10,
)

async def test_num_logs_passed_standalone_returns_num_logs(self, flow_run_factory):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--num-logs",
"10",
],
expected_code=0,
expected_output_contains=[
f"Flow run '{flow_run.name}' - Log {i} from flow_run {flow_run.id}."
for i in range(10)
],
expected_line_count=10,
)

@pytest.mark.skip(reason="we need to disable colors for this test to pass")
async def test_when_num_logs_is_smaller_than_one_then_exit_with_error(
ohadch marked this conversation as resolved.
Show resolved Hide resolved
self, flow_run_factory
):
# Given
flow_run = await flow_run_factory(num_logs=self.LOGS_DEFAULT_PAGE_SIZE + 1)

# When/Then
await run_sync_in_worker_thread(
invoke_and_assert,
command=[
"flow-run",
"logs",
str(flow_run.id),
"--num-logs",
"0",
],
expected_code=2,
expected_output_contains="Invalid value for '--num-logs' / '-n': 0 is not in the range x>=1.",
)