Examples

This section provides runnable examples for using the XiBIF Connection package. These examples demonstrate how to use the core API and are intended to be executed in a Python environment with access to the XiBIF hardware.

All examples can be found in the examples directory of the repository.

basic.py

This file provides an easy example of basic usage of the XibifConnection API.

It shows how to:
  • establish a connection to a board

  • read/write registers

View file in repository

"""
This file provides an easy example of basic usage of the
XibifConnection API.

It shows how to:
    - establish a connection to a board
    - read/write registers
"""

from xibif_connection.api import XibifConnection

# you can obtain the IP and UUID of your board by
# observing the serial output during startup
IP = "192.168.1.10" # multiple boards can connect to one host
UUID = 0x83DB3005 # the UUID verifies that you connect to the right board

board = XibifConnection(ip=IP, uuid=UUID)

# connect to the board using a socket
board.connect()

print(board.read_board_status())

# The following registers are available by default
# when you set up a XiBIF project.
for address in range(4):
    print(f"Reading register at address 0x{address*4:02X}: {board.read(address=address*4)}")

board.write(address=0x0C, value=100)
print(f"Register 0x0C={board.read(address=0x0C)}")

# the register at address 0x08 has its input connected
# to the output of register 0x0C
print(f"Register 0x08={board.read(address=0x08)}")

# reset the programmable logic on the FPGA
board.reset()

# end the connection gracefully
board.disconnect()

benchmark.py

This file provides a more in-depth example of evaluating the network and hardware performance of a XiBIF board using the XibifConnection API.

Being a more advanced example, many explanations are omitted and it is assumed that the reader has a basic level of understanding of the API.

View file in repository

#!/usr/bin/env python3
"""
This file provides a more in-depth example
of evaluating the network and hardware performance
of a XiBIF board using the XibifConnection API.

Being a more advanced example, many explanations are
omitted and it is assumed that the reader has a basic
level of understanding of the API.
"""

from collections.abc import Iterable

import matplotlib.pyplot as plt
import numpy as np
import sys

from xibif_connection import api
from xibif_connection.exceptions import XibifError
from xibif_connection.protocol.data import Command

# -------------------------
# Board
# -------------------------
IP = "192.168.1.10"
UUID = 0x83DB3005

# -------------------------
# Configuration flags
# -------------------------
TEST_STREAM = True
TEST_MEMORY = True
TEST_COMMANDS = True

# this will cause the test to run very
# slowly and produces lots of output
# on the console
ENABLE_DEBUG = False

# -------------------------
# Plot helpers
# -------------------------
def plot_histogram(
    pings: Iterable[float],
    *,
    units: str = "s",
    title: str | None = None,
    figure_title: str | None = None,
    savepath: str | None = None,
) -> None:
    """Create a standalone histogram figure with stats overlay."""
    p = np.asarray(list(pings), dtype=float)

    fig, ax = plt.subplots(figsize=(7, 4.5))
    if figure_title:
        fig.canvas.manager.set_window_title(figure_title)

    if p.size == 0:
        ax.text(0.5, 0.5, "No data", ha="center", va="center", transform=ax.transAxes)
        ax.set_title(title or "No title")
        if savepath:
            fig.savefig(savepath)
        return

    mean = float(np.mean(p))
    median = float(np.median(p))
    mn = float(np.min(p))
    mx = float(np.max(p))
    std = float(np.std(p, ddof=0))

    ax.hist(p, bins="auto", color="#9fbfdf", edgecolor="#5b8fb6", alpha=0.95)
    ax.set_xlabel(f"Time ({units})")
    ax.set_ylabel("Count")
    if title:
        ax.set_title(title)

    ax.axvline(
        mean,
        color="C1",
        label=f"mean = {mean:.3g} {units}",
        linestyle="--",
        linewidth=1.6,
    )
    ax.axvline(
        median,
        color="C2",
        label=f"median = {median:.3g} {units}",
        linestyle="--",
        linewidth=1.6,
    )
    ax.axvline(
        mn, color="C3", label=f"min = {mn:.3g} {units}", linestyle=":", linewidth=1.2
    )
    ax.axvline(
        mx, color="C3", label=f"max = {mx:.3g} {units}", linestyle=":", linewidth=1.2
    )

    ax.axvspan(
        mean - std, mean + std, color="C1", alpha=0.12, label=f"±1σ = {std:.3g} {units}"
    )

    stats_lines = [
        f"mean  = {mean:.6g} {units}",
        f"median= {median:.6g} {units}",
        f"min   = {mn:.6g} {units}",
        f"max   = {mx:.6g} {units}",
        f"std   = {std:.6g} {units}",
        f"n     = {p.size}",
    ]
    textbox = "\n".join(stats_lines)
    props = {
        "boxstyle": "round",
        "facecolor": "white",
        "alpha": 0.9,
        "edgecolor": "#888888",
    }
    ax.text(
        0.97,
        0.95,
        textbox,
        transform=ax.transAxes,
        ha="right",
        va="top",
        fontsize=9,
        bbox=props,
    )

    ax.legend(loc="upper left", fontsize=8)
    fig.tight_layout()
    if savepath:
        fig.savefig(savepath)


def plot_stream(
    amounts: np.ndarray,
    speeds_tx: np.ndarray,
    speeds_rx: np.ndarray,
    savepath: str | None = None,
) -> None:
    """Create a standalone figure for stream TX/RX speeds (bits/s)."""
    fig, ax = plt.subplots(figsize=(8, 4.5))
    fig.canvas.manager.set_window_title("Stream Benchmark")
    ax.plot(amounts, speeds_tx / 1e6, label="TX (Mb/s)", marker="o", linewidth=1.2)
    ax.plot(amounts, speeds_rx / 1e6, label="RX (Mb/s)", marker="o", linewidth=1.2)
    ax.set_xlabel("Bytes")
    ax.set_ylabel("Throughput (Mb/s)")
    ax.set_title("Stream throughput")
    ax.grid(True, linestyle=":", alpha=0.5)
    ax.legend()
    fig.tight_layout()
    if savepath:
        fig.savefig(savepath)


def plot_memory(
    cycles: np.ndarray,
    user: Iterable[float],
    stream: Iterable[float],
    ddrWrite: Iterable[float],
    ddrRead: Iterable[float],
    savepath: str | None = None,
) -> None:
    """Create a standalone figure for memory/PS benchmark timings."""
    fig, ax = plt.subplots(figsize=(8, 4.5))
    fig.canvas.manager.set_window_title("PS / Memory Benchmark")
    ax.plot(cycles, user, label="User Reg", marker=".", linewidth=0.8)
    ax.plot(cycles, stream, label="Stream Reg", marker=".", linewidth=0.8)
    ax.plot(cycles, ddrWrite, label="DDR Write", marker=".", linewidth=0.8)
    ax.plot(cycles, ddrRead, label="DDR Read", marker=".", linewidth=0.8)
    ax.set_xlabel("Cycles")
    ax.set_ylabel("Wallclock time (s)")
    ax.set_title("PS Benchmark results")
    ax.legend()
    ax.grid(True, linestyle=":", alpha=0.5)
    fig.tight_layout()
    if savepath:
        fig.savefig(savepath)

# -------------------------
# Benchmark
# -------------------------
any_test = TEST_STREAM or TEST_MEMORY or TEST_COMMANDS
if not any_test:
    print(
        "No tests enabled. Flip TEST_STREAM / TEST_MEMORY / TEST_COMMANDS to run benchmarks."
    )
    sys.exit(0)

try:
    with api.XibifSession(IP, UUID, verbose=ENABLE_DEBUG) as s:
        try:
            board_status = s.read_board_status()
            s.set_uart_debug(ENABLE_DEBUG)
        except XibifError as e:
            print("Warning: failed to read board status:", e)

        # --- STREAM benchmark ---
        if TEST_STREAM:
            try:
                n_bytes = np.linspace(100000, 2_000_000, 40).astype(int)
                print("Running stream benchmark (this may take several minutes)...")
                amounts, speeds_tx, speeds_rx = s.benchmark_stream(
                    n_bytes=n_bytes, repeats=50
                )
                plot_stream(amounts, speeds_tx, speeds_rx)
            except (XibifError, RuntimeError, ValueError) as e:
                print("Stream benchmark failed:", e)

        # --- MEMORY / PS benchmark ---
        if TEST_MEMORY:
            try:
                cycles = np.linspace(1, 200, num=100).astype(int)
                print("Running PS / memory benchmark (this will take time)...")
                cycles_out, user, stream, ddrWrite, ddrRead = (
                    s.benchmark_memory_access(cycles, repeat=100)
                )
                plot_memory(cycles_out, user, stream, ddrWrite, ddrRead)
            except (XibifError, RuntimeError, ValueError) as e:
                print("Memory benchmark failed:", e)

        # --- COMMAND micro-benchmarks & histograms ---
        if TEST_COMMANDS:
            try:
                repeats = 100000
                print(
                    f"Running command micro-benchmarks (x{repeats}) for READ/WRITE/STATUS..."
                )
                rrt = s.benchmark_command(Command.READ_REGISTER, repeats=repeats)
                wrt = s.benchmark_command(Command.WRITE_REGISTER, repeats=repeats)
                srt = s.benchmark_command(Command.STATUS, repeats=repeats)

                plot_histogram(
                    rrt * 1e6,
                    units="us",
                    title="Read Register (exec time)",
                    figure_title="Read Register",
                    savepath=None,
                )
                plot_histogram(
                    wrt * 1e6,
                    units="us",
                    title="Write Register (exec time)",
                    figure_title="Write Register",
                    savepath=None,
                )
                plot_histogram(
                    srt * 1e6,
                    units="us",
                    title="Get Status (exec time)",
                    figure_title="Get Status",
                    savepath=None,
                )
            except (XibifError, RuntimeError, ValueError) as e:
                print("Command benchmarks failed:", e)

except (XibifError, RuntimeError, ValueError, OSError) as e:
    raise e

plt.show()

session.py

This file provides an easy example of basic usage of the XibifSession API

It shows how to:
  • establish a connection to a board

  • read/write registers

View file in repository

"""
This file provides an easy example of basic usage of the
XibifSession API

It shows how to:
    - establish a connection to a board
    - read/write registers
"""

from xibif_connection.api import XibifSession

# you can obtain the IP and UUID of your board by
# observing the serial output during startup
IP = "192.168.1.10" # multiple boards can connect to one host
UUID = 0x83DB3005 # the UUID verifies that you connect to the right board


with XibifSession(ip=IP, uuid=UUID) as board:
    # the connection to the board is automatically
    # established when entering a context

    for address in range(4):
        print(f"Reading register at address 0x{address*4:02X}: {board.read(address=address*4)}")

    board.write(address=0x0C, value=100)
    print(f"Register 0x0C={board.read(address=0x0C)}")
    print(f"Register 0x08={board.read(address=0x08)}")


    # the session is always disconnected automatically when
    # existing the context

stream.py

This file provides an easy example of data streaming using the XibifConnection API.

View file in repository


"""
This file provides an easy example of data streaming
using the XibifConnection API.
"""

from xibif_connection.api import XibifConnection
import numpy as np

# you can obtain the IP and UUID of your board by
# observing the serial output during startup
IP = "192.168.1.10" # multiple boards can connect to one host
UUID = 0x83DB3005 # the UUID verifies that you connect to the right board

board = XibifConnection(ip=IP, uuid=UUID)

# connect to the board using a socket
board.connect()

# ensure that there is no data in the stream FIFOs
board.flush_stream()

# generate a NumPy vector of N_WORDS random 32-bit integers
# the stream API operates on the uint32 datatype
N_WORDS = 100_000
tx = np.random.randint(0, 2**32, N_WORDS, dtype=np.uint32)
print(f"Writing {N_WORDS*4} bytes to the FPGA")
board.write_stream(tx)

# the default XiBIF project connects the read and write streams
# to form a loopback interface
print(f"Reading {N_WORDS*4} bytes from the FPGA")
rx = np.array(board.read_stream(length=N_WORDS))

# compare TX and RX data
if np.any(rx != tx):
    print("Mismatch in transmitted and received data")
    print(rx[rx!=tx])
else:
    print("Transmitted and received data match!")

# end the connection gracefully
board.disconnect()