Skip to content

Commit

Permalink
examples: Refactor stream error check
Browse files Browse the repository at this point in the history
- Extract a check_for_stream_error function.
- For non-aio, use stream.exception() to check for errors
- For aio, use stream.code() to check for errors because
  the aio interfaces don't support stream.exception()

Signed-off-by: Brad Keryan <brad.keryan@ni.com>
  • Loading branch information
bkeryan committed May 22, 2023
1 parent cee5c93 commit 5fc9c53
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
15 changes: 8 additions & 7 deletions examples/nidaqmx/analog-input-every-n-samples-aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ def check_for_warning(response):
f"{warning_message.error_string}\nWarning status: {response.status}\n"
)

async def check_for_stream_error(stream: grpc.aio.UnaryStreamCall) -> None:
"""Raise an exception if the stream was closed with an error."""
if stream.done() and await stream.code() != grpc.StatusCode.OK:
_ = await stream.read()

try:
create_task_response = await client.CreateTask(nidaqmx_types.CreateTaskRequest())
task = create_task_response.task
Expand Down Expand Up @@ -98,20 +103,16 @@ def check_for_warning(response):
)

# Wait for initial_metadata and check for stream errors to ensure that the callback is
# registered before starting the task.
# registered successfully before starting the task.
await every_n_samples_stream.initial_metadata()
if every_n_samples_stream.done():
every_n_samples_response = await every_n_samples_stream.read()
check_for_warning(every_n_samples_response)
await check_for_stream_error(every_n_samples_stream)

done_event_stream = client.RegisterDoneEvent(
nidaqmx_types.RegisterDoneEventRequest(task=task)
)

await done_event_stream.initial_metadata()
if done_event_stream.done():
done_response = await done_event_stream.read()
check_for_warning(done_response)
await check_for_stream_error(done_event_stream)

start_task_response = await client.StartTask(nidaqmx_types.StartTaskRequest(task=task))
check_for_warning(start_task_response)
Expand Down
15 changes: 8 additions & 7 deletions examples/nidaqmx/analog-input-every-n-samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def check_for_warning(response):
f"{warning_message.error_string}\nWarning status: {response.status}\n"
)

def check_for_stream_error(stream: grpc.Future) -> None:
"""Raise an exception if the stream was closed with an error."""
if stream.done() and stream.exception() is not None:
raise stream.exception()

try:
create_task_response = client.CreateTask(nidaqmx_types.CreateTaskRequest())
task = create_task_response.task
Expand Down Expand Up @@ -100,20 +105,16 @@ def check_for_warning(response):
)

# Wait for initial_metadata and check for stream errors to ensure that the callback is
# registered before starting the task.
# registered successfully before starting the task.
every_n_samples_stream.initial_metadata()
if every_n_samples_stream.done():
every_n_samples_response = next(every_n_samples_stream)
check_for_warning(every_n_samples_response)
check_for_stream_error(every_n_samples_stream)

done_event_stream = client.RegisterDoneEvent(
nidaqmx_types.RegisterDoneEventRequest(task=task)
)

done_event_stream.initial_metadata()
if done_event_stream.done():
done_response = next(done_event_stream)
check_for_warning(done_response)
check_for_stream_error(done_event_stream)

start_task_response = client.StartTask(nidaqmx_types.StartTaskRequest(task=task))
check_for_warning(start_task_response)
Expand Down

0 comments on commit 5fc9c53

Please sign in to comment.