diff --git a/examples_linux/acknowledgement_payloads.py b/examples_linux/acknowledgement_payloads.py index eb32e6ded..a5b28ebbc 100644 --- a/examples_linux/acknowledgement_payloads.py +++ b/examples_linux/acknowledgement_payloads.py @@ -63,13 +63,11 @@ def master(): end_timer = time.monotonic_ns() # stop timer if result: # print timer results upon transmission success + decoded = buffer[:6].decode("utf-8") print( - "Transmission successful! Time to transmit: " - "{} us. Sent: {}{}".format( - int((end_timer - start_timer) / 1000), - buffer[:6].decode("utf-8"), - counter[0], - ), + "Transmission successful! Time to transmit:", + f"{int((end_timer - start_timer) / 1000)} us.", + f"Sent: {decoded}{counter[0]}", end=" ", ) has_payload, pipe_number = radio.available_pipe() @@ -77,13 +75,10 @@ def master(): # print the received ACK that was automatically sent length = radio.getDynamicPayloadSize() response = radio.read(length) + decoded = bytes(response[:6]).decode("utf-8") print( - "Received {} on pipe {}: {}{}".format( - length, - pipe_number, - bytes(response[:6]).decode("utf-8"), - response[7:8][0], - ) + f"Received {length} on pipe {pipe_number}:", + f"{decoded}{response[7:8][0]}", ) # increment counter from received payload if response[7:8][0] < 255: @@ -99,7 +94,7 @@ def master(): print(failures, "failures detected. Leaving TX role.") -def slave(timeout=6): +def slave(timeout: int = 6): """Listen for any payloads and print the transaction :param int timeout: The number of seconds to wait (with no transmission) @@ -121,15 +116,12 @@ def slave(timeout=6): received = radio.read(length) # fetch 1 payload from RX FIFO # increment counter from received payload counter[0] = received[7:8][0] + 1 if received[7:8][0] < 255 else 0 + decoded = [bytes(received[:6]).decode("utf-8")] + decoded.append(buffer[:6].decode("utf-8")) print( - "Received {} bytes on pipe {}: {}{} Sent: {}{}".format( - length, - pipe_number, - bytes(received[:6]).decode("utf-8"), - received[7:8][0], - buffer[:6].decode("utf-8"), - buffer[7:8][0], - ) + f"Received {length} bytes on pipe {pipe_number}:", + f"{decoded[0]}{received[7:8][0]}", + f"Sent: {decoded[1]}{buffer[7:8][0]}", ) buffer = b"World \x00" + bytes(counter) # build a new ACK payload radio.writeAckPayload(1, buffer) # load ACK for next response @@ -140,7 +132,7 @@ def slave(timeout=6): radio.stopListening() # put radio in TX mode & flush unused ACK payloads -def set_role(): +def set_role() -> bool: """Set the role using stdin stream. Timeout arg for slave() can be specified using a space delimiter (e.g. 'R 10' calls `slave(10)`) @@ -163,10 +155,10 @@ def set_role(): else: slave() return True - elif user_input[0].upper().startswith("T"): + if user_input[0].upper().startswith("T"): master() return True - elif user_input[0].upper().startswith("Q"): + if user_input[0].upper().startswith("Q"): radio.powerDown() return False print(user_input[0], "is an unrecognized input. Please try again.") @@ -226,7 +218,10 @@ def set_role(): pass # continue example until 'Q' is entered else: # if role was set using CLI args # run role once and exit - master() if bool(args.role) else slave() + if bool(args.role): + master() + else: + slave() except KeyboardInterrupt: print(" Keyboard Interrupt detected. Exiting...") radio.powerDown() diff --git a/examples_linux/getting_started.py b/examples_linux/getting_started.py index 90d993052..c9695e7fd 100644 --- a/examples_linux/getting_started.py +++ b/examples_linux/getting_started.py @@ -65,8 +65,8 @@ def master(): failures += 1 else: print( - "Transmission successful! Time to Transmit: " - "{} us. Sent: {}".format((end_timer - start_timer) / 1000, payload[0]) + "Transmission successful! Time to Transmit:", + f"{(end_timer - start_timer) / 1000} us. Sent: {payload[0]}", ) payload[0] += 0.01 time.sleep(1) @@ -93,9 +93,8 @@ def slave(timeout=6): payload[0] = struct.unpack(" bool: """Set the role using stdin stream. Timeout arg for slave() can be specified using a space delimiter (e.g. 'R 10' calls `slave(10)`) @@ -127,10 +126,10 @@ def set_role(): else: slave() return True - elif user_input[0].upper().startswith("T"): + if user_input[0].upper().startswith("T"): master() return True - elif user_input[0].upper().startswith("Q"): + if user_input[0].upper().startswith("Q"): radio.powerDown() return False print(user_input[0], "is an unrecognized input. Please try again.") @@ -174,8 +173,8 @@ def set_role(): # To save time during transmission, we'll set the payload size to be only # what we need. A float value occupies 4 bytes in memory using - # struct.pack(); " bool: """Set the role using stdin stream. Timeout arg for slave() can be specified using a space delimiter (e.g. 'R 10' calls `slave(10)`) @@ -210,15 +210,14 @@ def set_role(): else: slave() return True - elif user_input[0].upper().startswith("T"): + if user_input[0].upper().startswith("T"): master() return True - elif user_input[0].upper().startswith("Q"): + if user_input[0].upper().startswith("Q"): radio.powerDown() return False - else: - print(user_input[0], "is an unrecognized input. Please try again.") - return set_role() + print(user_input[0], "is an unrecognized input. Please try again.") + return set_role() if __name__ == "__main__": @@ -275,8 +274,10 @@ def set_role(): pass # continue example until 'Q' is entered else: # if role was set using CLI args # run role once and exit - master() if bool(args.role) else slave() + if bool(args.role): + master() + else: + slave() except KeyboardInterrupt: - print(" Keyboard Interrupt detected. Exiting...") + print(" Keyboard Interrupt detected. Powering down radio.") radio.powerDown() - sys.exit() diff --git a/examples_linux/manual_acknowledgements.py b/examples_linux/manual_acknowledgements.py index c2ec930c2..e5539df47 100644 --- a/examples_linux/manual_acknowledgements.py +++ b/examples_linux/manual_acknowledgements.py @@ -77,10 +77,9 @@ def master(): pass # wait for incoming payload or timeout radio.stopListening() # put radio in TX mode end_timer = time.monotonic_ns() # end timer + decoded = buffer[:6].decode("utf-8") print( - "Transmission successful. Sent: {}{}.".format( - buffer[:6].decode("utf-8"), counter[0] - ), + f"Transmission successful. Sent: {decoded}{counter[0]}.", end=" ", ) has_payload, pipe_number = radio.available_pipe() @@ -89,15 +88,11 @@ def master(): received = radio.read(radio.payloadSize) # NOTE received[7:8] discards NULL terminating 0 counter[0] = received[7:8][0] # save the counter + decoded = bytes(received[:6]).decode("utf-8") print( - "Received {} bytes on pipe {}: {}{}. " - "Round-trip delay: {} us.".format( - radio.payloadSize, - pipe_number, - bytes(received[:6]).decode("utf-8"), - counter[0], - (end_timer - start_timer) / 1000, - ) + f"Received {radio.payloadSize} bytes", + f"on pipe {pipe_number}: {decoded}{counter[0]}.", + f"Round-trip delay: {(end_timer - start_timer) / 1000} us.", ) else: print("No response received.") @@ -105,7 +100,7 @@ def master(): print(failures, "failures detected. Leaving TX role.") -def slave(timeout=6): +def slave(timeout: int = 6): """Listen for any payloads and print the transaction :param int timeout: The number of seconds to wait (with no transmission) @@ -132,18 +127,16 @@ def slave(timeout=6): # NOTE txStandBy() flushes TX FIFO on transmission failure radio.startListening() # put radio back in RX mode # print the payload received payload + decoded = bytes(received[:6]).decode("utf-8") print( - "Received {} bytes on pipe {}: {}{}.".format( - radio.payloadSize, - pipe_number, - bytes(received[:6]).decode("utf-8"), - received[7:8][0], - ), + f"Received {radio.payloadSize} bytes" + f"on pipe {pipe_number}: {decoded}{received[7:8][0]}.", end=" ", ) if result: # did response succeed? # print response's payload - print("Sent: {}{}".format(buffer[:6].decode("utf-8"), counter[0])) + decoded = buffer[:6].decode("utf-8") + print(f"Sent: {decoded}{counter[0]}") else: print("Response failed or timed out") start_timer = time.monotonic() # reset the timeout timer @@ -153,7 +146,7 @@ def slave(timeout=6): radio.stopListening() # put the radio in TX mode -def set_role(): +def set_role() -> bool: """Set the role using stdin stream. Timeout arg for slave() can be specified using a space delimiter (e.g. 'R 10' calls `slave(10)`) @@ -176,10 +169,10 @@ def set_role(): else: slave() return True - elif user_input[0].upper().startswith("T"): + if user_input[0].upper().startswith("T"): master() return True - elif user_input[0].upper().startswith("Q"): + if user_input[0].upper().startswith("Q"): radio.powerDown() return False print(user_input[0], "is an unrecognized input. Please try again.") @@ -238,8 +231,10 @@ def set_role(): pass # continue example until 'Q' is entered else: # if role was set using CLI args # run role once and exit - master() if bool(args.role) else slave() + if bool(args.role): + master() + else: + slave() except KeyboardInterrupt: - print(" Keyboard Interrupt detected. Exiting...") + print(" Keyboard Interrupt detected. Powering down radio.") radio.powerDown() - sys.exit() diff --git a/examples_linux/multiceiver_demo.py b/examples_linux/multiceiver_demo.py index 9dc732bdb..fe7902e26 100644 --- a/examples_linux/multiceiver_demo.py +++ b/examples_linux/multiceiver_demo.py @@ -80,14 +80,12 @@ def master(node_number): end_timer = time.monotonic_ns() # show something to see it isn't frozen print( - "Transmission of payloadID {} as node {}".format(counter, node_number), + f"Transmission of payloadID {counter} as node {node_number}", end=" ", ) if report: print( - "successfull! Time to transmit = {} us".format( - (end_timer - start_timer) / 1000 - ) + f"successful! Time to transmit = {(end_timer - start_timer) / 1000} us" ) else: failures += 1 @@ -96,8 +94,8 @@ def master(node_number): print(failures, "failures detected. Leaving TX role.") -def slave(timeout=10): - """Use the radio as a base station for lisening to all nodes +def slave(timeout: int = 10): + """Use the radio as a base station for listening to all nodes :param int timeout: The number of seconds to wait (with no transmission) until exiting function. @@ -111,11 +109,12 @@ def slave(timeout=10): has_payload, pipe_number = radio.available_pipe() if has_payload: # unpack payload - nodeID, payloadID = struct.unpack(" bool: """Set the role using stdin stream. Timeout arg for slave() can be specified using a space delimiter (e.g. 'R 10' calls `slave(10)`) @@ -147,10 +146,10 @@ def set_role(): else: slave() return True - elif user_input[0].isdigit() and 0 <= int(user_input[0]) <= 5: + if user_input[0].isdigit() and 0 <= int(user_input[0]) <= 5: master(int(user_input[0])) return True - elif user_input[0].upper().startswith("Q"): + if user_input[0].upper().startswith("Q"): radio.powerDown() return False print(user_input[0], "is an unrecognized input. Please try again.") @@ -173,9 +172,9 @@ def set_role(): # To save time during transmission, we'll set the payload size to be only what # we need. - # 2 int occupy 8 bytes in memory using len(struct.pack()) - # " bool: + """Scan a specified channel and report if a signal was detected.""" + radio.channel = channel + radio.startListening() + time.sleep(0.00013) + result = radio.testRPD() + radio.stopListening() + return result + + +def scan(duration: int = DURATION): + """Perform scan.""" + timeout = time.monotonic() + duration + console.print( + f"Scanning all channels using {SELECTED_RATE} for", + f"{duration} seconds. Channel labels are in MHz.", + ) + with Live(table, refresh_per_second=1000): + try: + while time.monotonic() < timeout: + for chl, p_bar in enumerate(progress_bars): + # save the latest in history (FIFO ordering) + history[chl] = history[chl][1:] + [signals[chl]] + + # refresh the latest + signals[chl] = scan_channel(chl) + + # update total signal count for the channel + totals[chl] += int(signals[chl]) + + p_bar.update( + p_bar.task_ids[0], + completed=history[chl].count(True), + signals="-" if not totals[chl] else totals[chl], + ) + except KeyboardInterrupt: + console.print(" Keyboard interrupt detected. Powering down radio.") + radio.powerDown() + + +if __name__ == "__main__": + scan() + radio.powerDown() +else: + console.print("Enter `scan()` to run a scan.") + console.print("Change data rate using `radio.setDataRate(RF24_**BPS)`") diff --git a/examples_linux/streaming_data.py b/examples_linux/streaming_data.py index 6bbe4dbc4..065f8fceb 100644 --- a/examples_linux/streaming_data.py +++ b/examples_linux/streaming_data.py @@ -48,7 +48,7 @@ SIZE = 32 # this is the default maximum payload size -def make_buffer(buf_iter): +def make_buffer(buf_iter: int) -> bytes: """Returns a dynamically created payloads :param int buf_iter: The position of the payload in the data stream @@ -65,7 +65,7 @@ def make_buffer(buf_iter): return buff -def master(count=1): +def master(count: int = 1): """Uses all 3 levels of the TX FIFO to send a stream of data :param int count: how many times to transmit the stream of data. @@ -91,13 +91,12 @@ def master(count=1): buf_iter += 1 end_timer = time.monotonic_ns() # end timer print( - "Time to transmit data = {} us. Detected {} failures.".format( - (end_timer - start_timer) / 1000, failures - ) + f"Time to transmit data = {(end_timer - start_timer) / 1000} us.", + f"Detected {failures} failures." ) -def slave(timeout=6): +def slave(timeout: int = 6): """Listen for any payloads and print them out (suffixed with received counter) @@ -110,19 +109,18 @@ def slave(timeout=6): while (time.monotonic() - start_timer) < timeout: if radio.available(): count += 1 - # retreive the received packet's payload + # retrieve the received packet's payload receive_payload = radio.read(radio.payloadSize) - print("Received: {} - {}".format(receive_payload, count)) + print("Received:", receive_payload, "-", count) start_timer = time.monotonic() # reset timer on every RX payload print("Nothing received in", timeout, "seconds. Leaving RX role") # recommended behavior is to keep in TX mode while idle radio.stopListening() # put the radio in TX mode - print("Nothing received in ", timeout, " seconds. Leaving RX role") -def set_role(): +def set_role() -> bool: """Set the role using stdin stream. Role args can be specified using space delimiters (e.g. 'R 10' calls `slave(10)` & 'T 3' calls `master(3)`) @@ -145,13 +143,13 @@ def set_role(): else: slave() return True - elif user_input[0].upper().startswith("T"): + if user_input[0].upper().startswith("T"): if len(user_input) > 1: master(int(user_input[1])) else: master() return True - elif user_input[0].upper().startswith("Q"): + if user_input[0].upper().startswith("Q"): radio.powerDown() return False print(user_input[0], "is an unrecognized input. Please try again.") @@ -209,8 +207,10 @@ def set_role(): pass # continue example until 'Q' is entered else: # if role was set using CLI args # run role once and exit - master() if bool(args.role) else slave() + if bool(args.role): + master() + else: + slave() except KeyboardInterrupt: - print(" Keyboard Interrupt detected. Exiting...") + print(" Keyboard Interrupt detected. Powering down radio.") radio.powerDown() - sys.exit()