Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example for NI FPGA #1085

Merged
merged 8 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions examples/nifpga/read_write_array_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
r""" Read and Write to indicators, controls, Arrays and FIFO.
amehra-ni marked this conversation as resolved.
Show resolved Hide resolved

The gRPC API is built from the C API. NI-FPGA documentation is installed with the driver at:
C:\Program Files (x86)\National Instruments\FPGA Interface C API\capi.chm

Getting Started:

To run this example with your VI, you need to configure the values below.
Additionally, ensure you have a VI that includes arrays and update the references for them below.
Make sure the R-Series drivers are installed on the system where the gRPC-device server is running.
You will also need LABVIEW and the LABVIEW FPGA module to configure the VIs.

For instructions on how to use protoc to generate gRPC client interfaces, see our "Creating a gRPC
Client" wiki page:
https://github.com/ni/grpc-device/wiki/Creating-a-gRPC-Client

Server machine's IP address, port number, and resource name can be passed as separate command line
arguments.
> python read_write_example.py <server_address> <port_number> <resource_name>
If they are not passed in as command line arguments, then by default the server address will be
"localhost:31763", with "FPGA" as the resource name.

This example attempts to read and write values to an array.
"""

import sys

import grpc
import nifpga_pb2 as nifpga_types
import nifpga_pb2_grpc as grpc_nifpga

SERVER_ADDRESS = "localhost"
SERVER_PORT = "31763"
SESSION_NAME = "NI-FPGA-Session"
RESOURCE = "FPGA"

# Configure these values.
NI_FPGA_EXAMPLE_BITFILE_PATH = "C:\\DemoExample.lvbitx"
NI_FPGA_EXAMPLE_SIGNATURE = "11F1FF1D11F1FF1F1F111FF11F11F111"
EXAMPLE_ARRAY_CONTROL = 0x10004
EXAMPLE_ARRAY_CONTROL_SIZE = 4

# Read in cmd args
if len(sys.argv) >= 2:
SERVER_ADDRESS = sys.argv[1]
if len(sys.argv) >= 3:
SERVER_PORT = sys.argv[2]
if len(sys.argv) >= 4:
RESOURCE = sys.argv[3]

channel = grpc.insecure_channel(f"{SERVER_ADDRESS}:{SERVER_PORT}")
client = grpc_nifpga.NiFpgaStub(channel)


def check_for_warning(response):
"""Print to console if the status indicates a warning."""
if response.status > 0:
sys.stderr.write(f"Warning status: {response.status}\n")


new_session = None
try:
open_session_response = client.Open(
nifpga_types.OpenRequest(
session_name=SESSION_NAME,
bitfile=NI_FPGA_EXAMPLE_BITFILE_PATH,
signature=NI_FPGA_EXAMPLE_SIGNATURE,
resource=RESOURCE,
attribute_mapped=nifpga_types.OpenAttribute.OPEN_ATTRIBUTE_NO_RUN,
)
)
new_session = open_session_response.session
check_for_warning(open_session_response)

run_response = client.Run(
nifpga_types.RunRequest(
session=new_session, attribute=nifpga_types.RunAttribute.RUN_ATTRIBUTE_UNSPECIFIED
)
)
check_for_warning(run_response)

# Read and Write to an Array
updated_array = [1, -221, -1111, -21]
writei32_array_response = client.WriteArrayI32(
nifpga_types.WriteArrayI32Request(
session=new_session, control=EXAMPLE_ARRAY_CONTROL, array=updated_array
)
)
check_for_warning(writei32_array_response)

readi32_array_response = client.ReadArrayI32(
nifpga_types.ReadArrayI32Request(
session=new_session,
indicator=EXAMPLE_ARRAY_CONTROL,
size=EXAMPLE_ARRAY_CONTROL_SIZE,
)
)
print(f"Update array value: {readi32_array_response.array}\n")
amehra-ni marked this conversation as resolved.
Show resolved Hide resolved

abort_response = client.Abort(nifpga_types.AbortRequest(session=new_session))
check_for_warning(abort_response)

close_response = client.Close(
nifpga_types.CloseRequest(
session=new_session,
attribute=nifpga_types.CloseAttribute.CLOSE_ATTRIBUTE_UNSPECIFIED,
)
)
check_for_warning(close_response)

except grpc.RpcError as rpc_error:
error_message = str(rpc_error.details() or "")
for entry in rpc_error.trailing_metadata() or []:
if entry.key == "ni-error":
value = entry.value if isinstance(entry.value, str) else entry.value.decode("utf-8")
error_message += f"\nError status: {value}"
if rpc_error.code() == grpc.StatusCode.UNAVAILABLE:
error_message = f"Failed to connect to server on {SERVER_ADDRESS}:{SERVER_PORT}"
elif rpc_error.code() == grpc.StatusCode.UNIMPLEMENTED:
error_message = (
"The operation is not implemented or is not supported/enabled in this service"
)
print(f"{error_message}")
159 changes: 159 additions & 0 deletions examples/nifpga/read_write_fifo_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
r""" Read and Write to indicators, controls, Arrays and FIFO.
amehra-ni marked this conversation as resolved.
Show resolved Hide resolved

The gRPC API is built from the C API. NI-FPGA documentation is installed with the driver at:
C:\Program Files (x86)\National Instruments\FPGA Interface C API\capi.chm

Getting Started:

To run this example with your VI, you need to configure the values below.
Additionally, you should have a VI that includes controls and FIFOs, and update the
references for them below. Ensure that the R-Series drivers are installed on the system
where the gRPC-device server is running. You will also need LABVIEW and the LABVIEW FPGA
module to configure the VIs.

For instructions on how to use protoc to generate gRPC client interfaces, see our "Creating a gRPC
Client" wiki page:
https://github.com/ni/grpc-device/wiki/Creating-a-gRPC-Client

Server machine's IP address, port number, and resource name can be passed as separate command line
arguments.
> python read_write_example.py <server_address> <port_number> <resource_name>
If they are not passed in as command line arguments, then by default the server address will be
"localhost:31763", with "FPGA" as the resource name.

In this example, we specify the number of elements to write to the FIFO and provide
a multiplier as input. We then read elements from another FIFO, which contains the multiplied
values of the elements from the first FIFO.
"""

import random
import sys

import grpc
import nifpga_pb2 as nifpga_types
import nifpga_pb2_grpc as grpc_nifpga

SERVER_ADDRESS = "localhost"
SERVER_PORT = "31763"
SESSION_NAME = "NI-FPGA-Session"
RESOURCE = "FPGA"

# Configure these values.
EXAMPLE_BITFILE_PATH = "C:\\DemoExample.lvbitx"
EXAMPLE_SIGNATURE = "11F1FF1D11F1FF1F1F111FF11F11F111"
EXAMPLE_NUMERIC_CONTROL = 0x1000A
EXAMPLE_HOST_TO_TARGET_FIFO = 0
amehra-ni marked this conversation as resolved.
Show resolved Hide resolved
EXAMPLE_HOST_TO_TARGET = 1

# Read in cmd args
if len(sys.argv) >= 2:
SERVER_ADDRESS = sys.argv[1]
if len(sys.argv) >= 3:
SERVER_PORT = sys.argv[2]
if len(sys.argv) >= 4:
RESOURCE = sys.argv[3]

channel = grpc.insecure_channel(f"{SERVER_ADDRESS}:{SERVER_PORT}")
client = grpc_nifpga.NiFpgaStub(channel)


def check_for_warning(response):
"""Print to console if the status indicates a warning."""
if response.status > 0:
sys.stderr.write(f"Warning status: {response.status}\n")


new_session = None
try:
open_session_response = client.Open(
nifpga_types.OpenRequest(
session_name=SESSION_NAME,
bitfile=EXAMPLE_BITFILE_PATH,
signature=EXAMPLE_SIGNATURE,
resource=RESOURCE,
attribute_mapped=nifpga_types.OpenAttribute.OPEN_ATTRIBUTE_NO_RUN,
)
)
new_session = open_session_response.session
check_for_warning(open_session_response)

run_response = client.Run(
nifpga_types.RunRequest(
session=new_session, attribute=nifpga_types.RunAttribute.RUN_ATTRIBUTE_UNSPECIFIED
)
)
check_for_warning(run_response)

# Read and Write to FIFO
# These two FIFOs are configured such that we specify the number of elements
# to add to the FIFO and the multiplier.
# The other FIFO will contain the multiplied value of the elements of the firsts FIFO.
user_input = int(
input("Enter number of elements you want to add in the FIFO (between 1 and 20): ")
)
random_numbers = []
if 1 <= user_input <= 20:
for _ in range(user_input):
random_number = random.randint(1, 100)
random_numbers.append(random_number)
print(f"Random numbers: {random_numbers}\n")
else:
print("Invalid input. Please enter a number between 1 and 20.")

multiplier_input = int(
input("Enter the multiplier you want the FIFO elements to get multplied with: ")
)

i16_write_response = client.WriteI16(
nifpga_types.WriteI16Request(
session=new_session,
control=EXAMPLE_NUMERIC_CONTROL,
value=multiplier_input,
)
)
InfiniteTimeout = 0xFFFFFFFF
writefifoi16_response = client.WriteFifoI16(
nifpga_types.WriteFifoI16Request(
session=new_session,
fifo=EXAMPLE_HOST_TO_TARGET,
data=random_numbers,
timeout=InfiniteTimeout,
)
)
check_for_warning(writefifoi16_response)

readfifoi16_response = client.ReadFifoI16(
nifpga_types.ReadFifoI16Request(
session=new_session,
fifo=EXAMPLE_HOST_TO_TARGET_FIFO,
number_of_elements=user_input,
timeout=InfiniteTimeout,
)
)
print(f"Elements in FIFO: {readfifoi16_response.data}\n")
print(f"Elements remaining to read from FIFO: {readfifoi16_response.elements_remaining}\n")

abort_response = client.Abort(nifpga_types.AbortRequest(session=new_session))
check_for_warning(abort_response)

close_response = client.Close(
nifpga_types.CloseRequest(
session=new_session,
attribute=nifpga_types.CloseAttribute.CLOSE_ATTRIBUTE_UNSPECIFIED,
)
)
check_for_warning(close_response)

except grpc.RpcError as rpc_error:
error_message = str(rpc_error.details() or "")
for entry in rpc_error.trailing_metadata() or []:
if entry.key == "ni-error":
value = entry.value if isinstance(entry.value, str) else entry.value.decode("utf-8")
error_message += f"\nError status: {value}"
if rpc_error.code() == grpc.StatusCode.UNAVAILABLE:
error_message = f"Failed to connect to server on {SERVER_ADDRESS}:{SERVER_PORT}"
elif rpc_error.code() == grpc.StatusCode.UNIMPLEMENTED:
error_message = (
"The operation is not implemented or is not supported/enabled in this service"
)
print(f"{error_message}")
Loading