Source code for console.spcm_control.tx_device

"""Implementation of transmit card."""
import ctypes
import logging
import threading

import numpy as np

import console.spcm_control.spcm.pyspcm as spcm
from console.interfaces.acquisition_parameter import Dimensions
from console.interfaces.unrolled_sequence import UnrolledSequence
from console.spcm_control.abstract_device import SpectrumDevice
from console.spcm_control.spcm.tools import create_dma_buffer, type_to_name


[docs] class TxCard(SpectrumDevice): """ Implementation of TX device. Implements abstract base class SpectrumDevice, which requires the abstract methods get_status(), setup_card() and operate(). The TxCard is automatically instantiated by a yaml-loader when loading the configuration file. The implementation was done and tested with card M2p6546-x4, which has an onboard memory size of 512 MSample, 2 Bytes/sample => 1024 MB. Overview: --------- The TX card operates with a ring buffer on the spectrum card, defined by ring_buffer_size. The ring buffer is filled in fractions of notify_size. """ __name__: str = "TxCard" def __init__( self, path: str, max_amplitude: list[int], filter_type: list[int], sample_rate: int, notify_rate: int = 16, ) -> None: self.log = logging.getLogger(self.__name__) super().__init__(path=path, log=self.log) self.max_amplitude = max_amplitude self.filter_type = filter_type self.sample_rate = sample_rate self.notify_rate = notify_rate # Number of output channels is fixed self.num_ch = 4 # Size of the current sequence self.data_buffer_size: int = 0 # Define maximum ring buffer size, 512 MSamples * 2 Bytes = 1024 MB self.max_ring_buffer_size: spcm.uint64 = spcm.uint64(1024**3) # Threading class attributes self.worker: threading.Thread | None = None self.is_running = threading.Event()
[docs] def setup_card(self) -> None: """Set up spectrum card in transmit (TX) mode. At the very beginning, a card reset is performed. The clock mode is set according to the sample rate, defined by the class attribute. All 4 channels are enables and configured by max. amplitude and filter values from class attributes. Digital outputs X0, X1 and X2 are defined which are controlled by the 15th bit of analog outputs 1, 2 and 3. Raises ------ Warning The actual set sample rate deviates from the corresponding class attribute to be set, class attribute is overwritten. """ # Reset card self.handle_error(spcm.spcm_dwSetParam_i64(self.card, spcm.SPC_M2CMD, spcm.M2CMD_CARD_RESET)) spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_PCITYP, ctypes.byref(self.card_type)) try: if "M2p.65" not in (device_type := type_to_name(self.card_type.value)): raise ConnectionError( "Device with path %s is of type %s, no transmit card..." % (self.path, device_type) ) except ConnectionError as err: self.log.exception(err, exc_info=True) raise err # Check for presence of IO expansion cards card_features = spcm.int32(0) spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_PCIFEATURES, ctypes.byref(card_features)) # Set trigger spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_TRIG_ORMASK, spcm.SPC_TMASK_SOFTWARE) # Configure external clock, TX master clock self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CLOCKMODE, spcm.SPC_CM_EXTERNAL)) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CLOCK50OHM, 1)) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CLOCK_THRESHOLD, 1500)) # set card sampling rate in MHz self.handle_error(spcm.spcm_dwSetParam_i64(self.card, spcm.SPC_SAMPLERATE, spcm.MEGA(self.sample_rate))) # Check actual sampling rate sample_rate = spcm.int64(0) spcm.spcm_dwGetParam_i64(self.card, spcm.SPC_SAMPLERATE, ctypes.byref(sample_rate)) self.log.info("Device sampling rate: %s MHz", sample_rate.value * 1e-6) if sample_rate.value != spcm.MEGA(self.sample_rate): self.log.warning( "Tx device sample rate %s MHz does not match set sample rate of %s MHz", sample_rate.value * 1e-6, self.sample_rate, ) self.sample_rate = int(sample_rate.value * 1e-6) # Enable and setup channels self.handle_error(spcm.spcm_dwSetParam_i32( self.card, spcm.SPC_CHENABLE, spcm.CHANNEL0 | spcm.CHANNEL1 | spcm.CHANNEL2 | spcm.CHANNEL3, )) self.log.info("Setup max. output amplitude: %s.", self.max_amplitude) # Use loop to enable and setup active channels # Channel 0: RF self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_ENABLEOUT0, 1)) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_AMP0, self.max_amplitude[0])) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_FILTER0, self.filter_type[0])) # Channel 1: Gradient x, synchronus digital output: gate trigger self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_ENABLEOUT1, 1)) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_AMP1, self.max_amplitude[1])) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_FILTER1, self.filter_type[1])) # Channel 2: Gradient y, synchronus digital output: un-blanking self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_ENABLEOUT2, 1)) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_AMP2, self.max_amplitude[2])) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_FILTER2, self.filter_type[2])) # Channel 3: Gradient z self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_ENABLEOUT3, 1)) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_AMP3, self.max_amplitude[3])) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_FILTER3, self.filter_type[3])) # Setup the card in FIFO mode self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_CARDMODE, spcm.SPC_REP_FIFO_SINGLE)) # >> Setup digital output channels # Channel X1: dig. ADC gate (15th bit from analog channel 1) self.handle_error(spcm.spcm_dwSetParam_i32( self.card, spcm.SPCM_X1_MODE, (spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH1 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15), )) # Channel X2: dig. reference signal (15th bit from analog channel 2) self.handle_error(spcm.spcm_dwSetParam_i32( self.card, spcm.SPCM_X2_MODE, (spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH2 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15), )) # Channel X3: dig. unblanking signal (15th bit of analog channel 3) self.handle_error(spcm.spcm_dwSetParam_i32( self.card, spcm.SPCM_X3_MODE, (spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH3 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15), )) # >> Setup additional GPIO ports, if extender is available if card_features.value & (spcm.SPCM_FEAT_DIG16_FX2 | spcm.SPCM_FEAT_DIG16_SMB): self.log.info("IO expansion card with FX2 connector detected, performing additional setup...") # Replicate ADC gate on extension port X12 self.handle_error(spcm.spcm_dwSetParam_i32( self.card, spcm.SPCM_X12_MODE, (spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH1 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15), )) # Replicate unblanking signal at extension port X13 self.handle_error(spcm.spcm_dwSetParam_i32( self.card, spcm.SPCM_X13_MODE, (spcm.SPCM_XMODE_DIGOUT | spcm.SPCM_XMODE_DIGOUTSRC_CH3 | spcm.SPCM_XMODE_DIGOUTSRC_BIT15), )) self.log.debug("Device setup completed")
# _ = self.get_status()
[docs] def set_gradient_offsets(self, offsets: Dimensions, high_impedance: list[bool] = [True, True, True]) -> None: """Set offset values of the gradient output channels. Parameters ---------- offsets Offset values as Dimension datatype as defined in acquisition parameter Returns ------- List of integer values read from card for x, y and z offset values """ # Check card connection try: if not self.card: raise ConnectionError("No connection to TX card.") except ConnectionError as err: self.log.exception(err, exc_info=True) raise err # Check offset values if abs(offsets.x) > self.max_amplitude[1]: self.log.error("Gradient offset of channel x exceeds maximum amplitude.") if abs(offsets.y) > self.max_amplitude[2]: self.log.error("Gradient offset of channel y exceeds maximum amplitude.") if abs(offsets.z) > self.max_amplitude[3]: self.log.error("Gradient offset of channel z exceeds maximum amplitude.") # Extract per channel flag for high impedance termination try: x_high_imp, y_high_imp, z_high_imp = high_impedance except ValueError as err: # Raise error if list does not have 3 values to unpack self.log.exception(err, exc_info=True) raise err # Set offset values, scale offset by 0.5 if channel is terminated into high impedance spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS1, int(offsets.x / 2) if x_high_imp else int(offsets.x)) spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS2, int(offsets.y / 2) if y_high_imp else int(offsets.y)) spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_OFFS3, int(offsets.z / 2) if z_high_imp else int(offsets.z)) # Write setup spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_CARD_WRITESETUP) # Define variables to read offset values offset_x = spcm.int32(0) offset_y = spcm.int32(0) offset_z = spcm.int32(0) # Read offset values from card spcm.spcm_dwGetParam_i64(self.card, spcm.SPC_OFFS1, ctypes.byref(offset_x)) spcm.spcm_dwGetParam_i64(self.card, spcm.SPC_OFFS2, ctypes.byref(offset_y)) spcm.spcm_dwGetParam_i64(self.card, spcm.SPC_OFFS3, ctypes.byref(offset_z)) set_offsets = [offset_x.value, offset_y.value, offset_z.value] self.log.info("Set gradient values %s mV for x, y and z.", set_offsets) # Check values read from card and log error if they are not correct if not (set_offsets[0] == offsets.x and set_offsets[1] == offsets.y and set_offsets[2] == offsets.z): self.log.error( "Actual gradient offsets do not correspond to the values to be set (x=%s, y=%s, z=%s mV)", offsets.x, offsets.y, offsets.z )
[docs] def start_operation(self, data: UnrolledSequence | None = None) -> None: """Start transmit (TX) card operation. Steps: (1) Setup the transmit card (2) Clear emergency stop flag, reset to False (3) Start worker thread (card streaming mode), with provided replay data Parameters ---------- data Sequence replay data as int16 numpy array in correct order. Checkout `prepare_sequence` function for reference of correct replay data format. This value is None per default Raises ------ ValueError Raised if replay data is not provided as numpy int16 values """ try: # Data must have a default value as start_operation is an abstract method and data is optional if not data: raise ValueError("No unrolled sequence data provided.") sqnc = data.seq # Check if sequence datatype is valid if sqnc.dtype != np.int16: raise ValueError("Sequence replay data is not int16, please unroll sequence to int16.") # Check if card connection is established if not self.card: raise ConnectionError("No connection to card established...") except Exception as exc: self.log.exception(exc, exc_info=True) raise exc # Check if sequence dwell time is valid if sqnc_sample_rate := 1 / (data.dwell_time * 1e6) != self.sample_rate: self.log.warning( "Sequence sample rate (%s MHz) differs from device sample rate (%s MHz).", sqnc_sample_rate, self.sample_rate, ) # Setup card, clear emergency stop thread event and start thread self.is_running.clear() self.worker = threading.Thread(target=self._fifo_stream_worker, args=(sqnc,)) self.worker.start()
[docs] def stop_operation(self) -> None: """Stop card operation by thread event and stop card.""" if self.worker is not None: self.is_running.set() self.worker.join() self.handle_error(spcm.spcm_dwSetParam_i32( self.card, spcm.SPC_M2CMD, spcm.M2CMD_CARD_STOP | spcm.M2CMD_DATA_STOPDMA, )) self.worker = None else: print("No active replay thread found...")
def _fifo_stream_worker(self, data: np.ndarray) -> None: """Continuous FIFO mode examples. Parameters ---------- data Numpy array of data to be replayed by card. Replay data should be in the format: >>> [c0_0, c1_0, c2_0, c3_0, c0_1, c1_1, c2_1, c3_1, ..., cX_N] Here, X denotes the channel and the subsequent index N the sample index. """ # Get total size of data buffer to be played out self.data_buffer_size = data.nbytes self.log.debug("Replay data buffer: %s bytes", self.data_buffer_size) # Calculate notify size is set to 1/16 of the replay buffer size # Ensure that minimum notify size is 4096 bytes notify_size = spcm.int32(max(4096, min( int(((self.data_buffer_size / self.notify_rate) // 4096) * 4096), int(((self.max_ring_buffer_size.value / self.notify_rate) // 4096) * 4096), ))) # >> Define software buffer # Setup replay data buffer data_buffer = data.ctypes.data_as(ctypes.POINTER(ctypes.c_int16)) # Allocate continuous ring buffer with minimimum necessary amount of memory, ensure multiple of notify size min_ring_buffer_size = int(np.ceil(self.data_buffer_size / notify_size.value) * notify_size.value) # Create page-aligned ring buffer ring_buffer = create_dma_buffer(min(self.max_ring_buffer_size.value, min_ring_buffer_size)) ring_buffer_size = spcm.uint64(len(ring_buffer)) try: # Check if ring buffer size is multiple of 2*num_ch (2 bytes per sample per channel) if ring_buffer_size.value % (self.num_ch * 2) != 0: raise MemoryError( "Ring buffer size is not a multiple of channel sample product \ (number of enables channels times 2 byte per sample)" ) # Check size of data buffer if self.data_buffer_size % (self.num_ch * 2) != 0: raise MemoryError( "Replay data size is not a multiple of enabled channels times 2 (bytes per sample)..." ) except MemoryError as err: self.log.exception(err, exc_info=True) raise err self.log.debug( "Ring buffer size: %s; Notify size: %s", ring_buffer_size.value, notify_size.value, ) try: # Perform initial memory transfer: Fill the whole ring buffer if (_ring_buffer_pos := ctypes.cast(ring_buffer, ctypes.c_void_p).value) and \ (_data_buffer_pos := ctypes.cast(data_buffer, ctypes.c_void_p).value): if self.data_buffer_size < ring_buffer_size.value: ctypes.memmove(_ring_buffer_pos, _data_buffer_pos, self.data_buffer_size) transferred_bytes = self.data_buffer_size else: ctypes.memmove(_ring_buffer_pos, _data_buffer_pos, ring_buffer_size.value) transferred_bytes = ring_buffer_size.value else: raise RuntimeError("Could not get ring or data buffer position.") except RuntimeError as err: self.log.exception(err, exc_info=True) raise err # Perform initial data transfer to completely fill continuous buffer spcm.spcm_dwDefTransfer_i64( self.card, spcm.SPCM_BUF_DATA, spcm.SPCM_DIR_PCTOCARD, notify_size, ring_buffer, spcm.uint64(0), ring_buffer_size, ) self.handle_error(spcm.spcm_dwSetParam_i64(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, ring_buffer_size)) self.log.debug("Starting card memory transfer") self.handle_error(spcm.spcm_dwSetParam_i32( self.card, spcm.SPC_M2CMD, spcm.M2CMD_DATA_STARTDMA | spcm.M2CMD_DATA_WAITDMA, )) # Start card self.log.debug("Starting card operation") self.handle_error(spcm.spcm_dwSetParam_i32( self.card, spcm.SPC_M2CMD, spcm.M2CMD_CARD_START | spcm.M2CMD_CARD_ENABLETRIGGER, )) avail_bytes = spcm.int32(0) usr_position = spcm.int32(0) transfer_count = 0 while (transferred_bytes < self.data_buffer_size) and not self.is_running.is_set(): # Read available bytes and user position spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_DATA_AVAIL_USER_LEN, ctypes.byref(avail_bytes)) spcm.spcm_dwGetParam_i32(self.card, spcm.SPC_DATA_AVAIL_USER_POS, ctypes.byref(usr_position)) # Calculate new data for the transfer, when notify_size is available on continous buffer if avail_bytes.value >= notify_size.value: transfer_count += 1 ring_buffer_position = ctypes.cast(( ctypes.c_char * (self.max_ring_buffer_size.value - usr_position.value)).from_buffer( ring_buffer, usr_position.value), ctypes.c_void_p ).value current_data_buffer = ctypes.cast(data_buffer, ctypes.c_void_p).value # Get new buffer positions if ring_buffer_position and current_data_buffer: data_buffer_position = current_data_buffer + transferred_bytes if (bytes_remaining := self.data_buffer_size - transferred_bytes) >= notify_size.value: # Enough data available -> copy notify size ctypes.memmove( ring_buffer_position, data_buffer_position, notify_size.value, ) else: # Not enough data availabe -> set remaining bytes to zero ctypes.memmove( ring_buffer_position, data_buffer_position, bytes_remaining, ) ctypes.memset( ring_buffer_position + bytes_remaining, 0, notify_size.value - bytes_remaining, ) self.handle_error( spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_DATA_AVAIL_CARD_LEN, notify_size) ) transferred_bytes += notify_size.value self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_DATA_WAITDMA)) self.handle_error(spcm.spcm_dwSetParam_i32(self.card, spcm.SPC_M2CMD, spcm.M2CMD_DATA_WAITDMA)) self.log.debug("Card operation stopped")