"""Implementation of transmit card."""
import ctypes
import logging
import threading
from dataclasses import dataclass
import numpy as np
import console.spcm_control.spcm.pyspcm as spcm
from console.interfaces.acquisition_parameter import Dimensions
from console.interfaces.unrolled_sequence import UnrolledSequence
from console.spcm_control.abstract_device import SpectrumDevice
from console.spcm_control.spcm.tools import create_dma_buffer, translate_status, type_to_name
[docs]
@dataclass
class TxCard(SpectrumDevice):
"""
Implementation of TX device.
Implements abstract base class SpectrumDevice, which requires the abstract methods get_status(),
setup_card() and operate(). The TxCard is automatically instantiated by a yaml-loader when
loading the configuration file.
The implementation was done and tested with card M2p6546-x4, which has an onboard
memory size of 512 MSample, 2 Bytes/sample => 1024 MB.
Overview:
---------
The TX card operates with a ring buffer on the spectrum card, defined by ring_buffer_size.
The ring buffer is filled in fractions of notify_size.
"""
path: str
max_amplitude: list[int]
filter_type: list[int]
sample_rate: int
notify_rate: int = 16
__name__: str = "TxCard"
def __post_init__(self):
"""Post init function which is required to use dataclass arguments."""
self.log = logging.getLogger(self.__name__)
super().__init__(self.path, log=self.log)
# Number of output channels is fixed
self.num_ch = 4
# Size of the current sequence
self.data_buffer_size = int(0)
# Define ring buffer and notify size, 512 MSamples * 2 Bytes = 1024 MB
self.ring_buffer_size: spcm.uint64 = spcm.uint64(1024**3)
self.card_type = spcm.int32(0)
try:
# Check if ring buffer size is multiple of 2*num_ch (2 bytes per sample per channel)
if self.ring_buffer_size.value % (self.num_ch * 2) != 0:
raise MemoryError(
"Ring buffer size is not a multiple of channel sample product \
(number of enables channels times 2 byte per sample)"
)
except MemoryError as err:
self.log.exception(err, exc_info=True)
raise err
if self.ring_buffer_size.value % self.notify_rate == 0:
self.notify_size = spcm.int32(int(self.ring_buffer_size.value / self.notify_rate))
else:
# Set default fraktion to 16, notify size equals 1/16 of ring buffer size
self.notify_size = spcm.int32(int(self.ring_buffer_size.value / 16))
self.log.debug(
"Ring buffer size: %s; Notify size: %s",
self.ring_buffer_size.value,
self.notify_size.value,
)
# Threading class attributes
self.worker: threading.Thread | None = None
self.is_running = threading.Event()
[docs]
def dict(self) -> dict:
"""Returnt class variables which are json serializable as dictionary.
Returns
-------
Dictionary containing class variables.
"""
return super().dict()
[docs]
def setup_card(self) -> None:
"""Set up spectrum card in transmit (TX) mode.
At the very beginning, a card reset is performed. The clock mode is set according to the sample rate,
defined by the class attribute.
All 4 channels are enables and configured by max. amplitude and filter values from class attributes.
Digital outputs X0, X1 and X2 are defined which are controlled by the 15th bit of analog outputs 1, 2 and 3.
Raises
------
Warning
The actual set sample rate deviates from the corresponding class attribute to be set,
class attribute is overwritten.
"""
# Reset card
spcm.spcm_dwSetParam_i64(self.card, spcm.SPC_M2CMD, spcm.M2CMD_CARD_RESET)
spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_PCITYP, ctypes.byref(self.card_type))
try:
if "M2p.65" not in (device_type := type_to_name(self.card_type.value)):
raise ConnectionError(
"Device with path %s is of type %s, no transmit card..." % (self.path, device_type)
)
except ConnectionError as err:
self.log.exception(err, exc_info=True)
raise err
# >> TODO: At this point, card alread has M2STAT_CARD_PRETRIGGER and M2STAT_CARD_TRIGGER set, correct?
# Set trigger
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_TRIG_ORMASK, spcm.SPC_TMASK_SOFTWARE)
# Set clock mode, internal PLL and clock output enable
# spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CLOCKMODE, spcm.SPC_CM_INTPLL)
# spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CLOCKOUT, 1)
# Configure external clock, TX master clock
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CLOCKMODE, spcm.SPC_CM_EXTERNAL)
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CLOCK50OHM, 1)
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CLOCK_THRESHOLD, 1500)
# set card sampling rate in MHz
spcm.spcm_dwSetParam_i64(self.card, spcm.SPC_SAMPLERATE, spcm.MEGA(self.sample_rate))
# Check actual sampling rate
sample_rate = spcm.int64(0)
spcm.spcm_dwGetParam_i64(self.card, spcm.SPC_SAMPLERATE, ctypes.byref(sample_rate))
self.log.info("Device sampling rate: %s MHz", sample_rate.value * 1e-6)
if sample_rate.value != spcm.MEGA(self.sample_rate):
self.log.warning(
"Tx device sample rate %s MHz does not match set sample rate of %s MHz",
sample_rate.value * 1e-6,
self.sample_rate,
)
self.sample_rate = int(sample_rate.value * 1e-6)
# Enable and setup channels
spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPC_CHENABLE,
spcm.CHANNEL0 | spcm.CHANNEL1 | spcm.CHANNEL2 | spcm.CHANNEL3,
)
# Use loop to enable and setup active channels
# Channel 0: RF
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_ENABLEOUT0, 1)
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_AMP0, self.max_amplitude[0])
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_FILTER0, self.filter_type[0])
# Channel 1: Gradient x, synchronus digital output: gate trigger
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_ENABLEOUT1, 1)
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_AMP1, self.max_amplitude[1])
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_FILTER1, self.filter_type[1])
# Channel 2: Gradient y, synchronus digital output: un-blanking
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_ENABLEOUT2, 1)
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_AMP2, self.max_amplitude[2])
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_FILTER2, self.filter_type[2])
# Channel 3: Gradient z
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_ENABLEOUT3, 1)
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_AMP3, self.max_amplitude[3])
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_FILTER3, self.filter_type[3])
# Setup the card in FIFO mode
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CARDMODE, spcm.SPC_REP_FIFO_SINGLE)
# >> Setup digital output channels
# Channel X1: dig. ADC gate (15th bit from analog channel 1)
spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPCM_X1_MODE,
(spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH1 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15),
)
# Replicate ADC gate on extension port X12
spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPCM_X12_MODE,
(spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH1 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15),
)
# Channel X2: dig. reference signal (15th bit from analog channel 2)
spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPCM_X2_MODE,
(spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH2 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15),
)
# Channel X3, X12: dig. unblanking signal (15th bit of analog channel 3)
spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPCM_X3_MODE,
(spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH3 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15),
)
# Replicate unblanking signal at extension port X13
spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPCM_X13_MODE,
(spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH3 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15),
)
self.log.debug("Device setup completed")
self.log_card_status()
[docs]
def set_gradient_offsets(self, offsets: Dimensions, high_impedance: list[bool] = [True, True, True]) -> None:
"""Set offset values of the gradient output channels.
Parameters
----------
offsets
Offset values as Dimension datatype as defined in acquisition parameter
Returns
-------
List of integer values read from card for x, y and z offset values
"""
# Check card connection
try:
if not self.card:
raise ConnectionError("No connection to TX card.")
except ConnectionError as err:
self.log.exception(err, exc_info=True)
raise err
# Check offset values
if abs(offsets.x) > self.max_amplitude[1]:
self.log.error("Gradient offset of channel x exceeds maximum amplitude.")
if abs(offsets.y) > self.max_amplitude[2]:
self.log.error("Gradient offset of channel y exceeds maximum amplitude.")
if abs(offsets.z) > self.max_amplitude[3]:
self.log.error("Gradient offset of channel z exceeds maximum amplitude.")
# Extract per channel flag for high impedance termination
try:
x_high_imp, y_high_imp, z_high_imp = high_impedance
except ValueError as err:
# Raise error if list does not have 3 values to unpack
self.log.exception(err, exc_info=True)
raise err
# Set offset values, scale offset by 0.5 if channel is terminated into high impedance
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS1, int(offsets.x / 2) if x_high_imp else int(offsets.x))
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS2, int(offsets.y / 2) if y_high_imp else int(offsets.y))
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS3, int(offsets.z / 2) if z_high_imp else int(offsets.z))
# Write setup
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_CARD_WRITESETUP)
# Define variables to read offset values
offset_x = spcm.int32(0)
offset_y = spcm.int32(0)
offset_z = spcm.int32(0)
# Read offset values from card
spcm.spcm_dwGetParam_i64(self.card, spcm.SPC_OFFS1, ctypes.byref(offset_x))
spcm.spcm_dwGetParam_i64(self.card, spcm.SPC_OFFS2, ctypes.byref(offset_y))
spcm.spcm_dwGetParam_i64(self.card, spcm.SPC_OFFS3, ctypes.byref(offset_z))
set_offsets = [offset_x.value, offset_y.value, offset_z.value]
self.log.info("Set gradient values %s mV for x, y and z.", set_offsets)
# Check values read from card and log error if they are not correct
if not (set_offsets[0] == offsets.x and set_offsets[1] == offsets.y and set_offsets[2] == offsets.z):
self.log.error(
"Actual gradient offsets do not correspond to the values to be set (x=%s, y=%s, z=%s mV)",
offsets.x, offsets.y, offsets.z
)
[docs]
def start_operation(self, data: UnrolledSequence | None = None) -> None:
"""Start transmit (TX) card operation.
Steps:
(1) Setup the transmit card
(2) Clear emergency stop flag, reset to False
(3) Start worker thread (card streaming mode), with provided replay data
Parameters
----------
data
Sequence replay data as int16 numpy array in correct order.
Checkout `prepare_sequence` function for reference of correct replay data format.
This value is None per default
Raises
------
ValueError
Raised if replay data is not provided as numpy int16 values
"""
try:
# Data must have a default value as start_operation is an abstract method and data is optional
if not data:
raise ValueError("No unrolled sequence data provided.")
sqnc = np.concatenate(data.seq)
# Check if sequence datatype is valid
if sqnc.dtype != np.int16:
raise ValueError("Sequence replay data is not int16, please unroll sequence to int16.")
# Check if card connection is established
if not self.card:
raise ConnectionError("No connection to card established...")
# Extend the provided data array with zeros to obtain a multiple of ring buffer size in memory
if (rest := sqnc.nbytes % self.ring_buffer_size.value) != 0:
rest = self.ring_buffer_size.value - rest
if rest % 2 != 0:
raise MemoryError("Providet data array size is not a multiple of 2 bytes (size of one sample)")
fill_size = int((rest) / 2)
sqnc = np.append(sqnc, np.zeros(fill_size, dtype=np.int16))
self.log.debug("Appended %s zeros to data array", fill_size)
except Exception as exc:
self.log.exception(exc, exc_info=True)
raise exc
# Check if sequence dwell time is valid
if sqnc_sample_rate := 1 / (data.dwell_time * 1e6) != self.sample_rate:
self.log.warning(
"Sequence sample rate (%s MHz) differs from device sample rate (%s MHz).",
sqnc_sample_rate,
self.sample_rate,
)
# Setup card, clear emergency stop thread event and start thread
self.is_running.clear()
self.worker = threading.Thread(target=self._fifo_stream_worker, args=(sqnc,))
self.worker.start()
[docs]
def stop_operation(self) -> None:
"""Stop card operation by thread event and stop card."""
if self.worker is not None:
self.is_running.set()
self.worker.join()
error = spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPC_M2CMD,
spcm.M2CMD_CARD_STOP | spcm.M2CMD_DATA_STOPDMA,
)
self.handle_error(error)
self.worker = None
else:
print("No active replay thread found...")
def _fifo_stream_worker(self, data: np.ndarray) -> None:
"""Continuous FIFO mode examples.
Parameters
----------
data
Numpy array of data to be replayed by card.
Replay data should be in the format:
>>> [c0_0, c1_0, c2_0, c3_0, c0_1, c1_1, c2_1, c3_1, ..., cX_N]
Here, X denotes the channel and the subsequent index N the sample index.
"""
try:
# Get total size of data buffer to be played out
self.data_buffer_size = int(data.nbytes)
if self.data_buffer_size % (self.num_ch * 2) != 0:
raise MemoryError(
"Replay data size is not a multiple of enabled channels times 2 (bytes per sample)..."
)
except MemoryError as err:
self.log.exception(err, exc_info=True)
raise err
self.log.debug("Replay data buffer: %s bytes", self.data_buffer_size)
# >> Define software buffer
# Setup replay data buffer
data_buffer = data.ctypes.data_as(ctypes.POINTER(ctypes.c_int16))
# Allocate continuous ring buffer as defined by class attribute
ring_buffer = create_dma_buffer(self.ring_buffer_size.value)
try:
# Perform initial memory transfer: Fill the whole ring buffer
if _ring_buffer_pos := ctypes.cast(ring_buffer, ctypes.c_void_p).value:
if _data_buffer_pos := ctypes.cast(data_buffer, ctypes.c_void_p).value:
ctypes.memmove(_ring_buffer_pos, _data_buffer_pos, self.ring_buffer_size.value)
transferred_bytes = self.ring_buffer_size.value
else:
raise RuntimeError("Could not get data buffer position")
else:
raise RuntimeError("Could not get ring buffer position")
except RuntimeError as err:
self.log.exception(err, exc_info=True)
raise err
# Perform initial data transfer to completely fill continuous buffer
spcm.spcm_dwDefTransfer_i64(
self.card,
spcm.SPCM_BUF_DATA,
spcm.SPCM_DIR_PCTOCARD,
self.notify_size,
ring_buffer,
spcm.uint64(0),
self.ring_buffer_size,
)
spcm.spcm_dwSetParam_i64(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, self.ring_buffer_size)
self.log.debug("Starting card memory transfer")
error = spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPC_M2CMD,
spcm.M2CMD_DATA_STARTDMA | spcm.M2CMD_DATA_WAITDMA,
)
self.handle_error(error)
# Start card
self.log.debug("Starting card operation")
error = spcm.spcm_dwSetParam_i32(
self.card,
spcm.SPC_M2CMD,
spcm.M2CMD_CARD_START | spcm.M2CMD_CARD_ENABLETRIGGER,
)
self.handle_error(error)
avail_bytes = spcm.int32(0)
usr_position = spcm.int32(0)
transfer_count = 0
while (transferred_bytes < self.data_buffer_size) and not self.is_running.is_set():
# Read available bytes and user position
spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_DATA_AVAIL_USER_LEN, ctypes.byref(avail_bytes))
spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_DATA_AVAIL_USER_POS, ctypes.byref(usr_position))
# Calculate new data for the transfer, when notify_size is available on continous buffer
if avail_bytes.value >= self.notify_size.value:
transfer_count += 1
# Get new buffer positions
if ring_buffer_position := ctypes.cast(
(ctypes.c_char * (self.ring_buffer_size.value - usr_position.value)).from_buffer(
ring_buffer, usr_position.value
),
ctypes.c_void_p,
).value:
if current_data_buffer := ctypes.cast(data_buffer, ctypes.c_void_p).value:
data_buffer_position = current_data_buffer + transferred_bytes
# Move memory: Current ring buffer position,
# position in sequence data and amount to transfer (=> notify size)
ctypes.memmove(
ring_buffer_position,
data_buffer_position,
self.notify_size.value,
)
spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, self.notify_size)
transferred_bytes += self.notify_size.value
error = spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_DATA_WAITDMA)
self.handle_error(error)
self.log.debug("Card operation stopped")
[docs]
def get_status(self) -> int:
"""Get the current card status.
Returns
-------
String with status description.
"""
try:
if not self.card:
raise ConnectionError("No device found")
except ConnectionError as err:
self.log.exception(err, exc_info=True)
raise err
status = spcm.int32(0)
spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_M2STATUS, ctypes.byref(status))
return status.value
[docs]
def log_card_status(self, include_desc: bool = False) -> None:
"""Log current card status.
The status is represented by a list. Each entry represents a possible card status in form
of a (sub-)list. It contains the status code, name and (optional) description of the spectrum
instrumentation manual.
Parameters
----------
include_desc, optional
Flag which indicates if description string should be contained in status entry, by default False
"""
msg, _ = translate_status(self.get_status(), include_desc=include_desc)
status = {key: val for val, key in msg.values()}
self.log.debug("Card status:\n%s", status)