Skip to content

Commit

Permalink
Improve ray tests (#3846)
Browse files Browse the repository at this point in the history
* Make ray tests actually test something and show that actors are not supported
  • Loading branch information
antonpirker authored Dec 4, 2024
1 parent dfb84cc commit 3e43a91
Showing 1 changed file with 92 additions and 75 deletions.
167 changes: 92 additions & 75 deletions tests/integrations/ray/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,27 @@ def setup_sentry(transport=None):
)


def read_error_from_log(job_id):
log_dir = "/tmp/ray/session_latest/logs/"
log_file = [
f
for f in os.listdir(log_dir)
if "worker" in f and job_id in f and f.endswith(".out")
][0]
with open(os.path.join(log_dir, log_file), "r") as file:
lines = file.readlines()

try:
# parse error object from log line
error = json.loads(lines[4][:-1])
except IndexError:
error = None

return error


@pytest.mark.forked
def test_ray_tracing():
def test_tracing_in_ray_tasks():
setup_sentry()

ray.init(
Expand All @@ -50,6 +69,7 @@ def test_ray_tracing():
}
)

# Setup ray task
@ray.remote
def example_task():
with sentry_sdk.start_span(op="task", name="example task step"):
Expand All @@ -62,63 +82,42 @@ def example_task():

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()
assert client_transaction["transaction"] == "ray test transaction"
assert client_transaction["transaction_info"] == {"source": "custom"}

worker_envelope = worker_envelopes[0]
worker_transaction = worker_envelope.get_transaction_event()

assert (
client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
worker_transaction["transaction"]
== "tests.integrations.ray.test_ray.test_tracing_in_ray_tasks.<locals>.example_task"
)
assert worker_transaction["transaction_info"] == {"source": "task"}

for span in client_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)

for span in worker_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)


@pytest.mark.forked
def test_ray_spans():
setup_sentry()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry,
"working_dir": "./",
}
(span,) = client_transaction["spans"]
assert span["op"] == "queue.submit.ray"
assert span["origin"] == "auto.queue.ray"
assert (
span["description"]
== "tests.integrations.ray.test_ray.test_tracing_in_ray_tasks.<locals>.example_task"
)
assert span["parent_span_id"] == client_transaction["contexts"]["trace"]["span_id"]
assert span["trace_id"] == client_transaction["contexts"]["trace"]["trace_id"]

@ray.remote
def example_task():
return sentry_sdk.get_client().transport.envelopes
(span,) = worker_transaction["spans"]
assert span["op"] == "task"
assert span["origin"] == "manual"
assert span["description"] == "example task step"
assert span["parent_span_id"] == worker_transaction["contexts"]["trace"]["span_id"]
assert span["trace_id"] == worker_transaction["contexts"]["trace"]["trace_id"]

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
worker_envelopes = ray.get(example_task.remote())

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()
worker_envelope = worker_envelopes[0]
worker_transaction = worker_envelope.get_transaction_event()

for span in client_transaction["spans"]:
assert span["op"] == "queue.submit.ray"
assert span["origin"] == "auto.queue.ray"

for span in worker_transaction["spans"]:
assert span["op"] == "queue.task.ray"
assert span["origin"] == "auto.queue.ray"
assert (
client_transaction["contexts"]["trace"]["trace_id"]
== worker_transaction["contexts"]["trace"]["trace_id"]
)


@pytest.mark.forked
def test_ray_errors():
def test_errors_in_ray_tasks():
setup_sentry_with_logging_transport()

ray.init(
Expand All @@ -128,6 +127,7 @@ def test_ray_errors():
}
)

# Setup ray task
@ray.remote
def example_task():
1 / 0
Expand All @@ -138,30 +138,19 @@ def example_task():
ray.get(future)

job_id = future.job_id().hex()

# Read the worker log output containing the error
log_dir = "/tmp/ray/session_latest/logs/"
log_file = [
f
for f in os.listdir(log_dir)
if "worker" in f and job_id in f and f.endswith(".out")
][0]
with open(os.path.join(log_dir, log_file), "r") as file:
lines = file.readlines()
# parse error object from log line
error = json.loads(lines[4][:-1])
error = read_error_from_log(job_id)

assert error["level"] == "error"
assert (
error["transaction"]
== "tests.integrations.ray.test_ray.test_ray_errors.<locals>.example_task"
) # its in the worker, not the client thus not "ray test transaction"
== "tests.integrations.ray.test_ray.test_errors_in_ray_tasks.<locals>.example_task"
)
assert error["exception"]["values"][0]["mechanism"]["type"] == "ray"
assert not error["exception"]["values"][0]["mechanism"]["handled"]


@pytest.mark.forked
def test_ray_actor():
def test_tracing_in_ray_actors():
setup_sentry()

ray.init(
Expand All @@ -171,13 +160,14 @@ def test_ray_actor():
}
)

# Setup ray actor
@ray.remote
class Counter:
def __init__(self):
self.n = 0

def increment(self):
with sentry_sdk.start_span(op="task", name="example task step"):
with sentry_sdk.start_span(op="task", name="example actor execution"):
self.n += 1

return sentry_sdk.get_client().transport.envelopes
Expand All @@ -186,20 +176,47 @@ def increment(self):
counter = Counter.remote()
worker_envelopes = ray.get(counter.increment.remote())

# Currently no transactions/spans are captured in actors
assert worker_envelopes == []

client_envelope = sentry_sdk.get_client().transport.envelopes[0]
client_transaction = client_envelope.get_transaction_event()

assert (
client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
# Spans for submitting the actor task are not created (actors are not supported yet)
assert client_transaction["spans"] == []

# Transaction are not yet created when executing ray actors (actors are not supported yet)
assert worker_envelopes == []


@pytest.mark.forked
def test_errors_in_ray_actors():
setup_sentry_with_logging_transport()

ray.init(
runtime_env={
"worker_process_setup_hook": setup_sentry_with_logging_transport,
"working_dir": "./",
}
)

for span in client_transaction["spans"]:
assert (
span["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
== client_transaction["contexts"]["trace"]["trace_id"]
)
# Setup ray actor
@ray.remote
class Counter:
def __init__(self):
self.n = 0

def increment(self):
with sentry_sdk.start_span(op="task", name="example actor execution"):
1 / 0

return sentry_sdk.get_client().transport.envelopes

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
with pytest.raises(ZeroDivisionError):
counter = Counter.remote()
future = counter.increment.remote()
ray.get(future)

job_id = future.job_id().hex()
error = read_error_from_log(job_id)

# We do not capture errors in ray actors yet
assert error is None

0 comments on commit 3e43a91

Please sign in to comment.