API
- class XibifConnection(ip: str, uuid: int, timeout: float | None = None, verbose: bool | None = None)
The base class to establish connection to a XiBIF board.
- benchmark_command(command: Command, repeats: int = 100) ndarray | None
Evaluate the performance of a Command.
The performance of a command on a XiBIF board is evaluated. When a command is dispatched in the PS of the FPGA, the number of clock cycles until its completion is measured and returned in the header. This benchmark function evaluates a repeated number of command executions to estimate the total performance of a given command implementation on the board.
The amount of cycles is measured between the time that a command has been fully received and decoded until the time that the command is seen as completed before the data is being sent back from the board.
The durations that this command returns do not include network latency.
- Parameters:
command (Command) – The command to evaluate
repeats (int = 100) – Number of times to execute the comamand
- Returns:
times (NDarray[float]) – Expired wall-clock time for each execution of the command.
- Raises:
XibifError: – The requested command is not implemented for evaluation yet.
- benchmark_memory_access(cycles: list[int] | ndarray, repeat: int = 5)
Benchmark the XiBIF Software
The software is benchmarked by executing a set of commands in the FPGA PS for cycles number of times. The FPGA responds to this command with the amount of so-called “Global Time” that has passed during execution of said commands.
The elapsed wallclock time is calculated from the “Global Time” returned by the FPGA.
- The commands executed are:
access to the user register area
access to the stream register area
read access to RAM
write access to RAM
- Parameters:
cycles (list[int] | NDArray[np.uint32]) – A list of the amount of cycles for which to evaluate the benchmark commands.
repeat (int = 5) – Number of times to repeat the memory access test
- Returns:
cycles, userTime, streamTimes, ddrWriteTimes, ddrReadTimes (list[int], list[float], list[float], list[float], list[float]) – A tuple of lists where each list has the same length as the cycles input parameter containing the wallclock execution times for the benchmark commands.
- benchmark_ping(repeats: int = 100, wait_time: float = 0.001) ndarray
Benchmark the network connection
The XiBIF board is pinged for a repeat number of times, followed by a wait time. After the test, the individual RTTs are returned as an array.
- Parameters:
repeats (int = 100) – Number of times to repeat the test
wait_time (float = 0.001) – Wait time in seconds to wait after each ping test
- benchmark_stream(n_bytes: list[int] | ndarray, repeats: int = 5) tuple[ndarray, ndarray, ndarray]
Benchmark the XiBIF Stream
The stream is benchmarked by writing and then reading data to a XiBIF board and measuring the time it takes to do so.
For each element in n_bytes, a number of bytes is written to and subsequently read from the FPGA. For each number of bytes, the test is repeated repeats times and the results are averaged. This helps to compensate for possible network fluctuations in non-directly connected boards.
Attention: for the benchmark to be successful, the write stream needs to be connected to the read stream, which is the case for the demo project just after xibif setup
- Parameters:
n_bytes (list[int]) – A list of the amount of bytes to use for the benchmark. For each element in the list, a separate read and write test pair is executed.
repeats (int) – The number of times to repeat each of the read-write pair tests.
- Returns:
n_bytes, speeds_rx, speeds_tx (list[int], list[float], list[float]) – A tuple of lists. The first element contains the list of number of bytes used for each test. The second and third element contain the read and write data rates for the stream actions as viewed from the computer running the benchmark. The data rates are in bits/s.
- benchmark_stream_read(n_bytes: int | None, repeats: int = 1, skip_write: bool = False, quiet: bool = False) float
Read a random array of n_bytes from the board and measure the time required.
Before reading, n_bytes of random data are written to the board unless skip_write = True. The actual data being sent is divided into words (4 bytes), i.e. there are a total number of n_bytes / 4 words being sent using write_stream.
The amount of data written to the board is then read and the time required for this action is measured.
The skip_write option is provided when performing complete benchmarks (i.e. read and write). It allows for previously written data from another benchmark to be read instead of explicitly writing data for the purpose of measuring the read performance.
- Parameters:
n_bytes (int) – number of bytes to send
repeats (int) – the number of times to repeat the test
skip_write (bool) – whether to write data to the board before reading
quiet (bool) – whether to print information during the test
- Returns:
duration (float) – the average duration of the read transactions
- benchmark_stream_write(n_bytes: int, repeats: int = 1, skip_flush: bool = False, quiet: bool = False) float
Write a random array of n_bytes to the board and measure the time required.
To measure performance, n_bytes are written to the board. The actual data being sent is divided into words (4 bytes), i.e. there are a total number of n_bytes / 4 words being sent using write_stream.
If desired, the write operation can be repeated for repeats number of times. For each iteration, the time is measured for the write portion only. The returned value is the average of the durations.
The skip_flush option is provided when performing complete benchmarks (i.e. read and write). It allows for the written data to persist on the board, meaning that the read test does not need to write data to the board before testing read speeds.
- Parameters:
n_bytes (int) – number of bytes to send
repeats (int) – the number of times to repeat the test
skip_flush (bool) – whether to skip flushing after the write test, useful for consecutive read tests
quiet (bool) – whether to print information during the test
- Returns:
duration (float) – the average duration of the write transactions
- connect() None
Connect to the board
- Raises:
XibifError: – Could not connect to board
- disconnect() None
Closes the connection to the board
- flush_stream() None
Flush the XiBIF Stream
Flush all data in the FPGA stream by resetting the head and tail pointers of the associated FIFOs. Any data that is flushed cannot be recovered!
- ping(payload: bytes | bytearray | list[int] | ndarray[tuple[Any, ...], dtype[uint32]] = b'PING', timeout: float | None = None, retries: int = 1, transport: TcpTransport | None = None) float
Ping a XiBIF board
The XiBIF board is pinged using the echo port (80) and the round-trip-time (RTT) is measured.
- Parameters:
payload (bytes | bytearray | List[int] | NDArray[np.uint32] = b"PING",) – The payload to send to the XiBIF board
timeout (float | None = None,) – Time in seconds until the connection is considered timed out
retries (int = 1,) – Number of times the ping is retried until a connection is considered dead.
transport (TcpTransport | None = None,) – Protocol to use
- Raises:
XibifError – Connection could not be opened Unsupported payload Received payload does not match sent payload Ping timed out after retries
- print_status_uart() None
Print the status of the XiBIF board over UART.
Contains information such as versions, UUIDs, stream FIFO stats and build times.
- read(address: int | list[int] | ndarray[tuple[Any, ...], dtype[uint32]]) int | list[int] | ndarray[tuple[Any, ...], dtype[integer]]
Read register values
This function reads a single or multiple register values from a board.
- Parameters:
address (int | List[int]) – A single address or an array of register addresses to read.
- Returns:
values (int | List[int]) – A single integer or a numpy array of integers corresponding to the addressed given as parameters.
- Raises:
ValueError: – The some addresses are not word (4-byte) aligned.
XibifError: – Failed to read register(self)
XibifError: – The value read from the register(self) is not valid.
- read_axi(address: int | list[int], length: int = 1)
Read data from an AXI address
This function is used to read data from one or more AXI addresses. When only one address is given, the length parameter allows to read multiple consecutive words from an AXI endpoint.
- Parameters:
address (int | List[int]) – One or multiple addresses to read from
length (int = 1) – Number of words to read from the given address up to address + 4 * (i-1)
- Returns:
data (int | List[int]) – One or multiple words read from the given AXI address
- Raises:
XibifError: – No payload received from read command Multiple addresses and length > 1 specified
- read_board_status() XibifStatus
Read the current board status
The board status contains static and runtime information of the board. Once received, the status is stored in the XibifSession.status property for later referral.
The status contains information about the register width which is updated for the protocol to read/write registers with widths > 32bit
This property does not update automatically.
- Returns:
status (XibifStatus) – The status of the board
- Raises:
XibifError: – The status cannot be decoded from the payload
- read_registermap()
Read the registermap from the XiBIF board.
Reads the embedded registermap from the XiBIF board as a JSON formatted string and returns the JSON object.
The JSON object contains information on the user registers and their fields that are accessible using the read and write methods.
- Returns:
map (json) – A JSON object containing the register map of the board
- Raises:
XibifError: – No registermap received
- read_stream(length: int | None = None) list[int] | None
Read stream data from the XiBIF board.
Attempts to read a number of stream data packets from the FPGA. If successful, the data read is returned as a numpy array with the length requested in length.
Attention: there is no guarantee that the number of entries read is the number that was requested.
If no data is obtained from the board, None is returned.
- Parameters:
length (int) – The number of stream entries to be read. For normal configurations, a stream entry is a word (4 bytes) If None is specified, all available data is read from the board.
- Returns:
rx (NDArray[int] | None) – An array of the stream data received from the board.
- reset() None
Perform a reset of the board.
- set_uart_debug(enable: bool) None
Set UART debug property
Sets the UART debug property in the PS software. When set to True, UART debugging is enabled, resulting in all socket transactions being printed over UART.
Be aware that enabling the UART debug feature drastically impacts software performance.
- write(address: int | list[int], value: int | list[int])
Write register values
Writes a single or multiple register values to a Xibif board by sending multiple pairs of (address, value) to the board.
When writing multiple register, the length and dimensions of the value and parameter registers must be equal.
- Parameters:
address (int | List[int]) – A single address or an array of register addresses to read.
value (int | List[int]) – A single value or an array of values to write to the registers.
- Raises:
ValueError: – The length of the address and value arrays does not match
ValueError: – The some addresses are not word (4-byte) aligned.
XibifError: – Failed to write register(self)
- write_axi(address: int | list[int], data: int | list[int], mask: int | list[int] = 4294967295)
Write data to an AXI address
This function is used to write data to one or multiple AXI addresses. When this function is invoked with only one address and multiple data words, an address array of increasing addresses with the length of the data is created. When multiple addresses are given, the number of entries in the address and data lists must match for a one-to-one mapping. The data mask allows to disable individual bits from being written. 0xFFFF_FFFF corresponds to all 32 bits of data being written.
- Parameters:
address (int | List[int]) – One or multiple addresses to write to
data (int | List[int]) – One or multiple data words to write
mask (int | List[int]) – One or multiple data masks to mask data to write
- Raises:
XibifError: – The returned AXI status is not AxiAccessStatus.AXI_ACCESS_OK The number of addresses is greater than one and does not match the number of data words The number of masks is greater than one and does not match the number of data words No payload was received after the AXI write
- write_bitstream(path: str) None
Write a bitstream to the XiBIF board.
- Parameters:
path (str) – path to the bitstream file to be flashed
- Raises:
XibifError: – Flashing the bitstream is not available for the connected board
XibifError from ValueError –
- write_stream(data: ndarray[tuple[Any, ...], dtype[uint32]]) None
Write stream data to the XiBIF board.
Writes the given data to the FPGA stream.
- Parameters:
data (list[int] | int) – A list of the data to write to the FPGA
- class XibifSession(ip: str, uuid: int, timeout: float | None = None, verbose: bool | None = None)
A XibifSession allows using a XibifConnection object with context management.
The XibifSession is a wrapper class for XibifConnection, inheriting all functions and extending it by providing __enter__ and __exit__ functions to use it in a context.