Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update py examples #851

Merged
merged 10 commits into from
Jul 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions examples_linux/acknowledgement_payloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,22 @@ def master():
end_timer = time.monotonic_ns() # stop timer
if result:
# print timer results upon transmission success
decoded = buffer[:6].decode("utf-8")
print(
"Transmission successful! Time to transmit: "
"{} us. Sent: {}{}".format(
int((end_timer - start_timer) / 1000),
buffer[:6].decode("utf-8"),
counter[0],
),
"Transmission successful! Time to transmit:",
f"{int((end_timer - start_timer) / 1000)} us.",
f"Sent: {decoded}{counter[0]}",
end=" ",
)
has_payload, pipe_number = radio.available_pipe()
if has_payload:
# print the received ACK that was automatically sent
length = radio.getDynamicPayloadSize()
response = radio.read(length)
decoded = bytes(response[:6]).decode("utf-8")
print(
"Received {} on pipe {}: {}{}".format(
length,
pipe_number,
bytes(response[:6]).decode("utf-8"),
response[7:8][0],
)
f"Received {length} on pipe {pipe_number}:",
f"{decoded}{response[7:8][0]}",
)
# increment counter from received payload
if response[7:8][0] < 255:
Expand All @@ -99,7 +94,7 @@ def master():
print(failures, "failures detected. Leaving TX role.")


def slave(timeout=6):
def slave(timeout: int = 6):
"""Listen for any payloads and print the transaction

:param int timeout: The number of seconds to wait (with no transmission)
Expand All @@ -121,15 +116,12 @@ def slave(timeout=6):
received = radio.read(length) # fetch 1 payload from RX FIFO
# increment counter from received payload
counter[0] = received[7:8][0] + 1 if received[7:8][0] < 255 else 0
decoded = [bytes(received[:6]).decode("utf-8")]
decoded.append(buffer[:6].decode("utf-8"))
print(
"Received {} bytes on pipe {}: {}{} Sent: {}{}".format(
length,
pipe_number,
bytes(received[:6]).decode("utf-8"),
received[7:8][0],
buffer[:6].decode("utf-8"),
buffer[7:8][0],
)
f"Received {length} bytes on pipe {pipe_number}:",
f"{decoded[0]}{received[7:8][0]}",
f"Sent: {decoded[1]}{buffer[7:8][0]}",
)
buffer = b"World \x00" + bytes(counter) # build a new ACK payload
radio.writeAckPayload(1, buffer) # load ACK for next response
Expand All @@ -140,7 +132,7 @@ def slave(timeout=6):
radio.stopListening() # put radio in TX mode & flush unused ACK payloads


def set_role():
def set_role() -> bool:
"""Set the role using stdin stream. Timeout arg for slave() can be
specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)

Expand All @@ -163,10 +155,10 @@ def set_role():
else:
slave()
return True
elif user_input[0].upper().startswith("T"):
if user_input[0].upper().startswith("T"):
master()
return True
elif user_input[0].upper().startswith("Q"):
if user_input[0].upper().startswith("Q"):
radio.powerDown()
return False
print(user_input[0], "is an unrecognized input. Please try again.")
Expand Down Expand Up @@ -226,7 +218,10 @@ def set_role():
pass # continue example until 'Q' is entered
else: # if role was set using CLI args
# run role once and exit
master() if bool(args.role) else slave()
if bool(args.role):
master()
else:
slave()
except KeyboardInterrupt:
print(" Keyboard Interrupt detected. Exiting...")
radio.powerDown()
Expand Down
27 changes: 14 additions & 13 deletions examples_linux/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def master():
failures += 1
else:
print(
"Transmission successful! Time to Transmit: "
"{} us. Sent: {}".format((end_timer - start_timer) / 1000, payload[0])
"Transmission successful! Time to Transmit:",
f"{(end_timer - start_timer) / 1000} us. Sent: {payload[0]}",
)
payload[0] += 0.01
time.sleep(1)
Expand All @@ -93,9 +93,8 @@ def slave(timeout=6):
payload[0] = struct.unpack("<f", buffer[:4])[0]
# print details about the received packet
print(
"Received {} bytes on pipe {}: {}".format(
radio.payloadSize, pipe_number, payload[0]
)
f"Received {radio.payloadSize} bytes",
f"on pipe {pipe_number}: {payload[0]}",
)
start_timer = time.monotonic() # reset the timeout timer

Expand All @@ -104,7 +103,7 @@ def slave(timeout=6):
radio.stopListening() # put the radio in TX mode


def set_role():
def set_role() -> bool:
"""Set the role using stdin stream. Timeout arg for slave() can be
specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)

Expand All @@ -127,10 +126,10 @@ def set_role():
else:
slave()
return True
elif user_input[0].upper().startswith("T"):
if user_input[0].upper().startswith("T"):
master()
return True
elif user_input[0].upper().startswith("Q"):
if user_input[0].upper().startswith("Q"):
radio.powerDown()
return False
print(user_input[0], "is an unrecognized input. Please try again.")
Expand Down Expand Up @@ -174,8 +173,8 @@ def set_role():

# To save time during transmission, we'll set the payload size to be only
# what we need. A float value occupies 4 bytes in memory using
# struct.pack(); "<f" means a little endian unsigned float
radio.payloadSize = len(struct.pack("<f", payload[0]))
# struct.pack(); "f" means an unsigned float
radio.payloadSize = struct.calcsize("f")

# for debugging, we have 2 options that print a large block of details
# (smaller) function that prints raw register values
Expand All @@ -189,8 +188,10 @@ def set_role():
pass # continue example until 'Q' is entered
else: # if role was set using CLI args
# run role once and exit
master() if bool(args.role) else slave()
if bool(args.role):
master()
else:
slave()
except KeyboardInterrupt:
print(" Keyboard Interrupt detected. Exiting...")
print(" Keyboard Interrupt detected. Powering down radio.")
radio.powerDown()
sys.exit()
37 changes: 19 additions & 18 deletions examples_linux/interrupt_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ def interrupt_handler(channel):
tx_ds, tx_df, rx_dr = radio.whatHappened() # get IRQ status flags
if tx_df:
radio.flush_tx()
print("\ttx_ds: {}, tx_df: {}, rx_dr: {}".format(tx_ds, tx_df, rx_dr))
print(f"\ttx_ds: {tx_ds}, tx_df: {tx_df}, rx_dr: {rx_dr}")
if pl_iterator[0] == 0:
print(" 'data ready' event test {}".format("passed" if rx_dr else "failed"))
print(" 'data ready' event test", ("passed" if rx_dr else "failed"))
elif pl_iterator[0] == 1:
print(" 'data sent' event test {}".format("passed" if tx_ds else "failed"))
print(" 'data sent' event test", ("passed" if tx_ds else "failed"))
elif pl_iterator[0] == 3:
print(" 'data fail' event test {}".format("passed" if tx_df else "failed"))
print(" 'data fail' event test", ("passed" if tx_df else "failed"))


# setup IRQ GPIO pin
Expand Down Expand Up @@ -98,8 +98,8 @@ def _ping_n_wait(pl_iter):
time.sleep(0.1) # wait 100 ms for interrupt_handler() to complete


def print_rx_fifo(pl_size):
"""fush RX FIFO by printing all available payloads with 1 buffer
def print_rx_fifo(pl_size: int):
"""Flush RX FIFO by printing all available payloads with 1 buffer

:param int pl_size: the expected size of each payload
"""
Expand Down Expand Up @@ -147,7 +147,7 @@ def master():
print("RX node's FIFO is full; it is not listening any more")
else:
print(
"Transmission successful, but the RX node might still be " "listening."
"Transmission successful, but the RX node might still be listening."
)
else:
radio.flush_tx()
Expand All @@ -166,14 +166,14 @@ def master():
print_rx_fifo(len(ack_payloads[0])) # empty RX FIFO


def slave(timeout=6): # will listen for 6 seconds before timing out
def slave(timeout: int = 6):
"""Only listen for 3 payload from the master node

:param int timeout: The number of seconds to wait (with no transmission)
until exiting function.
"""
pl_iterator[0] = 0 # reset this to indicate event is a 'data_ready' event
# setup radio to recieve pings, fill TX FIFO with ACK payloads
# setup radio to receive pings, fill TX FIFO with ACK payloads
radio.writeAckPayload(1, ack_payloads[0])
radio.writeAckPayload(1, ack_payloads[1])
radio.writeAckPayload(1, ack_payloads[2])
Expand All @@ -187,7 +187,7 @@ def slave(timeout=6): # will listen for 6 seconds before timing out
print_rx_fifo(len(tx_payloads[0]))


def set_role():
def set_role() -> bool:
"""Set the role using stdin stream. Timeout arg for slave() can be
specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)

Expand All @@ -210,15 +210,14 @@ def set_role():
else:
slave()
return True
elif user_input[0].upper().startswith("T"):
if user_input[0].upper().startswith("T"):
master()
return True
elif user_input[0].upper().startswith("Q"):
if user_input[0].upper().startswith("Q"):
radio.powerDown()
return False
else:
print(user_input[0], "is an unrecognized input. Please try again.")
return set_role()
print(user_input[0], "is an unrecognized input. Please try again.")
return set_role()


if __name__ == "__main__":
Expand Down Expand Up @@ -275,8 +274,10 @@ def set_role():
pass # continue example until 'Q' is entered
else: # if role was set using CLI args
# run role once and exit
master() if bool(args.role) else slave()
if bool(args.role):
master()
else:
slave()
except KeyboardInterrupt:
print(" Keyboard Interrupt detected. Exiting...")
print(" Keyboard Interrupt detected. Powering down radio.")
radio.powerDown()
sys.exit()
45 changes: 20 additions & 25 deletions examples_linux/manual_acknowledgements.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ def master():
pass # wait for incoming payload or timeout
radio.stopListening() # put radio in TX mode
end_timer = time.monotonic_ns() # end timer
decoded = buffer[:6].decode("utf-8")
print(
"Transmission successful. Sent: {}{}.".format(
buffer[:6].decode("utf-8"), counter[0]
),
f"Transmission successful. Sent: {decoded}{counter[0]}.",
end=" ",
)
has_payload, pipe_number = radio.available_pipe()
Expand All @@ -89,23 +88,19 @@ def master():
received = radio.read(radio.payloadSize)
# NOTE received[7:8] discards NULL terminating 0
counter[0] = received[7:8][0] # save the counter
decoded = bytes(received[:6]).decode("utf-8")
print(
"Received {} bytes on pipe {}: {}{}. "
"Round-trip delay: {} us.".format(
radio.payloadSize,
pipe_number,
bytes(received[:6]).decode("utf-8"),
counter[0],
(end_timer - start_timer) / 1000,
)
f"Received {radio.payloadSize} bytes",
f"on pipe {pipe_number}: {decoded}{counter[0]}.",
f"Round-trip delay: {(end_timer - start_timer) / 1000} us.",
)
else:
print("No response received.")
time.sleep(1) # make example readable by slowing down transmissions
print(failures, "failures detected. Leaving TX role.")


def slave(timeout=6):
def slave(timeout: int = 6):
"""Listen for any payloads and print the transaction

:param int timeout: The number of seconds to wait (with no transmission)
Expand All @@ -132,18 +127,16 @@ def slave(timeout=6):
# NOTE txStandBy() flushes TX FIFO on transmission failure
radio.startListening() # put radio back in RX mode
# print the payload received payload
decoded = bytes(received[:6]).decode("utf-8")
print(
"Received {} bytes on pipe {}: {}{}.".format(
radio.payloadSize,
pipe_number,
bytes(received[:6]).decode("utf-8"),
received[7:8][0],
),
f"Received {radio.payloadSize} bytes"
f"on pipe {pipe_number}: {decoded}{received[7:8][0]}.",
end=" ",
)
if result: # did response succeed?
# print response's payload
print("Sent: {}{}".format(buffer[:6].decode("utf-8"), counter[0]))
decoded = buffer[:6].decode("utf-8")
print(f"Sent: {decoded}{counter[0]}")
else:
print("Response failed or timed out")
start_timer = time.monotonic() # reset the timeout timer
Expand All @@ -153,7 +146,7 @@ def slave(timeout=6):
radio.stopListening() # put the radio in TX mode


def set_role():
def set_role() -> bool:
"""Set the role using stdin stream. Timeout arg for slave() can be
specified using a space delimiter (e.g. 'R 10' calls `slave(10)`)

Expand All @@ -176,10 +169,10 @@ def set_role():
else:
slave()
return True
elif user_input[0].upper().startswith("T"):
if user_input[0].upper().startswith("T"):
master()
return True
elif user_input[0].upper().startswith("Q"):
if user_input[0].upper().startswith("Q"):
radio.powerDown()
return False
print(user_input[0], "is an unrecognized input. Please try again.")
Expand Down Expand Up @@ -238,8 +231,10 @@ def set_role():
pass # continue example until 'Q' is entered
else: # if role was set using CLI args
# run role once and exit
master() if bool(args.role) else slave()
if bool(args.role):
master()
else:
slave()
except KeyboardInterrupt:
print(" Keyboard Interrupt detected. Exiting...")
print(" Keyboard Interrupt detected. Powering down radio.")
radio.powerDown()
sys.exit()
Loading