Source code for console.spcm_control.rx_device

"""Implementation of receive card."""
import logging
import threading
from ctypes import POINTER, addressof, byref, c_short, cast, sizeof
from dataclasses import dataclass
from decimal import Decimal, getcontext
from itertools import compress

import numpy as np

import console.spcm_control.spcm.pyspcm as sp
from console.spcm_control.abstract_device import SpectrumDevice
from console.spcm_control.spcm.tools import create_dma_buffer, translate_status, type_to_name

# Define registers lists
CH_SELECT = [
    sp.CHANNEL0,
    sp.CHANNEL1,
    sp.CHANNEL2,
    sp.CHANNEL3,
    sp.CHANNEL4,
    sp.CHANNEL5,
    sp.CHANNEL6,
    sp.CHANNEL7,
]
AMP_SELECT = [
    sp.SPC_AMP0,
    sp.SPC_AMP1,
    sp.SPC_AMP2,
    sp.SPC_AMP3,
    sp.SPC_AMP4,
    sp.SPC_AMP5,
    sp.SPC_AMP6,
    sp.SPC_AMP7,
]
IMP_SELECT = [
    sp.SPC_50OHM0,
    sp.SPC_50OHM1,
    sp.SPC_50OHM2,
    sp.SPC_50OHM3,
    sp.SPC_50OHM4,
    sp.SPC_50OHM5,
    sp.SPC_50OHM6,
    sp.SPC_50OHM7,
]


# Set precision for precise gate samples calculation
getcontext().prec = 28


[docs] @dataclass class RxCard(SpectrumDevice): """Implementation of RX device.""" path: str sample_rate: int channel_enable: list[int] max_amplitude: list[int] impedance_50_ohms: list[int] __name__: str = "RxCard" def __post_init__(self): """Execute after init function to do further class setup.""" self.log = logging.getLogger(self.__name__) super().__init__(self.path, log=self.log) self.num_channels = sp.int32(0) self.card_type = sp.int32(0) self.worker: threading.Thread | None = None self.is_running = threading.Event() # Define pre and post trigger time. # Pre trigger is set to minimum and post trigger size is at least one notify size to avoid data loss. self.pre_trigger = 8 self.post_trigger = 4096 self.post_trigger_size = 0 # TODO: only use one variable for post trigger self.rx_data = [] self.rx_scaling = [amp / (2**15) for amp in self.max_amplitude]
[docs] def dict(self) -> dict: """Returnt class variables which are json serializable as dictionary. Returns ------- Dictionary containing class variables. """ return super().dict()
[docs] def setup_card(self): """Set up spectrum card in transmit (TX) mode. At the very beginning, a card reset is performed. The clock mode is set according to the sample rate, defined by the class attribute. Two receive channels are enables and configured by max. amplitude according to class variables and impedance. Raises ------ Warning The actual set sample rate deviates from the corresponding class attribute to be set, class attribute is overwritten. """ # Get the card type and reset card sp.spcm_dwGetParam_i32(self.card, sp.SPC_PCITYP, byref(self.card_type)) sp.spcm_dwSetParam_i64(self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_RESET) # Needed? try: if "M2p.59" not in (device_type := type_to_name(self.card_type.value)): raise ConnectionError("Device with path %s is of type %s, no receive card" % (self.path, device_type)) except ConnectionError as err: self.log.exception(err, exc_info=True) raise err # Setup the internal clockmode, clock output enable (use RX clock output to enable anti-alias filter) sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCKMODE, sp.SPC_CM_INTPLL) sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCKOUT, 1) # Use external clock: Terminate to 50 Ohms, set threshold to 1.5V, suitable for 3.3V clock # sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCKMODE, sp.SPC_CM_EXTERNAL) # sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCK50OHM, 1) # sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCK_THRESHOLD, 1500) # Set card sampling rate in MHz and read the actual sampling rate sp.spcm_dwSetParam_i64(self.card, sp.SPC_SAMPLERATE, sp.MEGA(self.sample_rate)) sample_rate = sp.int64(0) sp.spcm_dwGetParam_i64(self.card, sp.SPC_SAMPLERATE, byref(sample_rate)) self.log.info("Device sampling rate: %s MHz", sample_rate.value * 1e-6) if sample_rate.value != sp.MEGA(self.sample_rate): self.log.warning( "Actual device sample rate %s MHz does not match set sample rate of %s MHz; Updating class attribute", sample_rate.value * 1e-6, self.sample_rate, ) self.sample_rate = int(sample_rate.value * 1e-6) # Check channel enable, max. amplitude per channel and impedance values try: # if (num_enable := len(self.channel_enable)) < 1 or num_enable > 8: if (num_enable := len(self.channel_enable)) != 8: raise ValueError("Channel enable list is incomplete: %s/8" % num_enable) # Impedance and amplitude configuration lists must match the channel enable list len if (num_imp := len(self.impedance_50_ohms)) != num_enable: raise ValueError("Channel impedance list is incomplete: %s/8" % num_imp) if (num_amp := len(self.max_amplitude)) != num_enable: raise ValueError("channel max. amplitude list is incomplete: %s/8" % num_amp) if not np.log2(num_enable).is_integer(): raise ValueError("Invalid number of enabled channels, must be power of 2.") except ValueError as err: self.log.exception(err, exc_info=True) raise err # Enable receive channels, compress list of channel select registers to obtain list of channels to be enabled # Sum of the compressed list equals logical or operator # e.g. sp.CHANNEL0 | sp.CHANNEL1 | sp.CHANNEL5 = sum([sp.CHANNEL0, sp.CHANNEL1, sp.CHANNEL5]) = 35 channel_selection = sum(list(compress(CH_SELECT, map(bool, self.channel_enable)))) sp.spcm_dwSetParam_i32(self.card, sp.SPC_CHENABLE, channel_selection) # Set impedance and amplitude limits for each channel according to device configuration for k, enable in enumerate(map(bool, self.channel_enable)): if enable: self.log.info( "Channel %s enabled; 50 ohms impedance: %s; Max. amplitude: %s mV", k, self.impedance_50_ohms[k], self.max_amplitude[k], ) sp.spcm_dwSetParam_i32(self.card, IMP_SELECT[k], self.impedance_50_ohms[k]) sp.spcm_dwSetParam_i32(self.card, AMP_SELECT[k], self.max_amplitude[k]) # Get the number of actual active channels and compare against provided channel enable list sp.spcm_dwGetParam_i32(self.card, sp.SPC_CHCOUNT, byref(self.num_channels)) try: self.log.info( "Number of enabled receive channels (read from card): %s", self.num_channels.value, ) if not self.num_channels.value == sum(self.channel_enable): raise ValueError("Actual number of enabled channels does not match the provided channel enable list") except ValueError as err: self.log.exception(err, exc_info=True) raise err # Digital filter setting for receiver, 0 = disable digital bandwidth filter sp.spcm_dwSetParam_i32(self.card, sp.SPC_DIGITALBWFILTER, 0) # Setup digital input channels for reference signal sp.spcm_dwSetParam_i32(self.card, sp.SPCM_X2_MODE, sp.SPCM_XMODE_DIGIN) sp.spcm_dwSetParam_i32(self.card, sp.SPC_DIGMODE0, (sp.DIGMODEMASK_BIT15 & sp.SPCM_DIGMODE_X2)) # TODO: Double check, why is the post trigger divided by number of channels and multiplied by 2? self.post_trigger = 4096 // self.num_channels.value self.post_trigger_size = self.post_trigger * 2 # Set the memory size, pre and post trigger and loop paramaters, SPC_LOOPS = 0 => runs infinitely long sp.spcm_dwSetParam_i32(self.card, sp.SPC_POSTTRIGGER, self.post_trigger) sp.spcm_dwSetParam_i32(self.card, sp.SPC_PRETRIGGER, self.pre_trigger) sp.spcm_dwSetParam_i32(self.card, sp.SPC_LOOPS, 0) # Setup timestamp mode to read number of samples per gate if available sp.spcm_dwSetParam_i32( self.card, sp.SPC_TIMESTAMP_CMD, sp.SPC_TSMODE_STARTRESET | sp.SPC_TSCNT_INTERNAL, ) sp.spcm_dwSetParam_i32(self.card, sp.SPC_TRIG_EXT1_MODE, sp.SPC_TM_POS) sp.spcm_dwSetParam_i32(self.card, sp.SPC_TRIG_ORMASK, sp.SPC_TMASK_EXT1) # Setup gated fifo mode sp.spcm_dwSetParam_i32(self.card, sp.SPC_CARDMODE, sp.SPC_REC_FIFO_GATE) # Set timeout to 10ms (used for DMA wait) sp.spcm_dwSetParam_i32(self.card, sp.SPC_TIMEOUT, 10) self.log.debug("Device setup completed") self.log_card_status()
[docs] def start_operation(self): """Start card operation.""" # Clear the emergency stop flag self.is_running.clear() self.rx_data = [] # Start card thread. if time stamp mode is not available use the example function. self.worker = threading.Thread(target=self._gated_timestamps_stream) self.worker.start()
[docs] def stop_operation(self): """Stop card thread.""" # Check if thread is running if self.worker is not None: self.is_running.set() self.worker.join() # Stop the card. We will stop the card in two steps. # First we will stop the data transfer and then we will stop the card. # If time stamp mode is enabled, we need to stop the extra data transfer as well. error = sp.spcm_dwSetParam_i32( self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_STOP | sp.M2CMD_DATA_STOPDMA | sp.M2CMD_EXTRA_STOPDMA, ) self.handle_error(error) self.worker = None else: # No thread is running self.log.error("No active process found")
def _gated_timestamps_stream(self): # >> Define RX data buffer # RX buffer size must be a multiple of notify size. Min. notify size is 4096 bytes/4 kBytes. rx_notify = sp.int32(sp.KILO_B(4)) # Buffer size set to maximum. Todo check one ADC window is not exceeding the limit rx_size = 1024**3 rx_buffer_size = sp.uint64(rx_size) rx_buffer = create_dma_buffer(rx_buffer_size.value) sp.spcm_dwDefTransfer_i64( self.card, sp.SPCM_BUF_DATA, sp.SPCM_DIR_CARDTOPC, rx_notify, rx_buffer, sp.uint64(0), rx_buffer_size, ) # >> Define TS buffer # Define the timestamps notify size. Min. notify size is 4096 bytes. ts_notify = sp.int32(sp.KILO_B(4)) # Define timestamp buffer, must be multiple of timestamps notify size ts_buffer_size = sp.uint64(2 * 4096) ts_buffer = create_dma_buffer(ts_buffer_size.value) sp.spcm_dwDefTransfer_i64( self.card, sp.SPCM_BUF_TIMESTAMP, sp.SPCM_DIR_CARDTOPC, ts_notify, ts_buffer, sp.uint64(0), ts_buffer_size, ) pll_data = cast(ts_buffer, sp.ptr64) # cast to pointer to 64bit integer rx_data = cast(rx_buffer, sp.ptr16) # cast to pointer to 16bit integer # Setup polling mode sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_EXTRA_POLL) # TODO: Move all the stuff up to here to setup function? # >> Start everything err = sp.spcm_dwSetParam_i32( self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_START | sp.M2CMD_CARD_ENABLETRIGGER | sp.M2CMD_DATA_STARTDMA, ) self.handle_error(err) available_timestamp_bytes = sp.int32(0) available_timestamp_postion = sp.int32(0) available_user_databytes = sp.int32(0) data_user_position = sp.int32(0) total_gates = 0 bytes_leftover = 0 total_leftover = 0 # Start receiver self.log.debug("Starting receive") while not self.is_running.is_set(): sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_DATA_WAITDMA) sp.spcm_dwGetParam_i64(self.card, sp.SPC_TS_AVAIL_USER_LEN, byref(available_timestamp_bytes)) if available_timestamp_bytes.value >= 32: # read position sp.spcm_dwGetParam_i64( self.card, sp.SPC_TS_AVAIL_USER_POS, byref(available_timestamp_postion), ) # Read two timestamps timestamp_0 = pll_data[int(available_timestamp_postion.value / 8)] / (self.sample_rate * 1e6) timestamp_1 = pll_data[int(available_timestamp_postion.value / 8) + 2] / (self.sample_rate * 1e6) # Calculate gate duration gate_length = Decimal(str(timestamp_1)) - Decimal(str(timestamp_0)) # Calculate the number of adc gate sample points (per channel) gate_sample = int(round(gate_length * (Decimal(str(self.sample_rate)) * Decimal("1e6")))) self.log.info( "Gate: (%s s, %s s); ADC duration: %s ms ; Samples/gate/channel: % s", timestamp_0, timestamp_1, float(gate_length) * 1e3, # Can be trimmed. gate_sample, ) sp.spcm_dwSetParam_i32(self.card, sp.SPC_TS_AVAIL_CARD_LEN, 32) sp.spcm_dwGetParam_i64( self.card, sp.SPC_TS_AVAIL_USER_LEN, byref(available_timestamp_bytes), ) # Check for rounding errors total_bytes = (gate_sample + self.pre_trigger) * 2 * self.num_channels.value bytes_sequence = (gate_sample + self.pre_trigger + self.post_trigger) * 2 * self.num_channels.value # Read/update available user bytes sp.spcm_dwGetParam_i32( self.card, sp.SPC_DATA_AVAIL_USER_LEN, byref(available_user_databytes), ) sp.spcm_dwGetParam_i32(self.card, sp.SPC_DATA_AVAIL_USER_POS, byref(data_user_position)) # Debug log statements # self.log.debug("Available timestamp buffer size: %s", available_timestamp_bytes.value) self.log.debug("Expected adc data in bytes: %s", total_bytes) self.log.debug("User position (adc buffer): %s", data_user_position.value) self.log.debug("Number of segments in notify size: %s", total_bytes // rx_notify.value) self.log.debug("Left over in bytes: %s", bytes_leftover) while not self.is_running.is_set(): # Read/update available user bytes sp.spcm_dwGetParam_i32( self.card, sp.SPC_DATA_AVAIL_USER_LEN, byref(available_user_databytes), ) self.log.debug("Available user length in bytes (adc buffer): %s", available_user_databytes.value) if available_user_databytes.value >= total_bytes: total_gates += 1 sp.spcm_dwGetParam_i32( self.card, sp.SPC_DATA_AVAIL_USER_POS, byref(data_user_position), ) byte_position = data_user_position.value // 2 total_bytes_to_read = available_user_databytes.value index_0 = byte_position + total_leftover // 2 if total_bytes_to_read + data_user_position.value >= rx_size: # >> We need two indices in case of memory position overflows the total memory length # Get the last position available and subtract it from current byte position index_1 = rx_size // 2 - index_0 # Get the remaining length after overflow. Then subtract it from the total bytes. index_2 = total_bytes // 2 - index_1 # Numpy array conversation. Get the first part of the slice offset_bytes_1 = index_1 * sizeof(c_short) ptr_to_slice_1 = cast(addressof(rx_data.contents) + offset_bytes_1, POINTER(c_short)) slice_1 = np.ctypeslib.as_array(ptr_to_slice_1, ((index_1),)) # Get the second part of the numpy slice offset_bytes_2 = index_2 * sizeof(c_short) ptr_to_slice_2 = cast(addressof(rx_data.contents) + offset_bytes_2, POINTER(c_short)) slice_2 = np.ctypeslib.as_array(ptr_to_slice_2, ((index_2),)) # Combine the slices gate_data = np.concatenate((slice_1, slice_2)) else: # If there is no memory position overflow, just get the data. offset_bytes = index_0 * sizeof(c_short) ptr_to_slice = cast(addressof(rx_data.contents) + offset_bytes, POINTER(c_short)) gate_data = np.ctypeslib.as_array(ptr_to_slice, ((total_bytes // 2),)) # Cut the pretrigger, we do not need it. pre_trigger_cut = (self.pre_trigger) * self.num_channels.value gate_data = gate_data[pre_trigger_cut:] self.rx_data.append(gate_data.reshape((self.num_channels.value, gate_sample), order="F")) # Most probably we have not filled the whole page. # There should be some bytes in the buffer, which are not readable yet. bytes_leftover = (total_bytes + self.post_trigger_size * self.num_channels.value) \ % rx_notify.value # Calculate the accumulation of the leftover bytes. # If it is bigger than the notify value read the page. total_leftover += bytes_leftover if total_leftover >= rx_notify.value: total_leftover = total_leftover - rx_notify.value available_card_len = bytes_sequence - (bytes_leftover) + rx_notify.value else: available_card_len = bytes_sequence - (bytes_leftover) # Tell the card that we have read the data. # It is better for tracking if the card length is in the order of notify (page) size. sp.spcm_dwSetParam_i32(self.card, sp.SPC_DATA_AVAIL_CARD_LEN, available_card_len) break self.log.debug("Card operation stopped")
[docs] def get_status(self) -> int: """Get the current card status. Returns ------- String with status description. """ try: if not self.card: raise ConnectionError("No device found") except ConnectionError as err: self.log.exception(err, exc_info=True) raise err status = sp.int32(0) sp.spcm_dwGetParam_i32(self.card, sp.SPC_M2STATUS, byref(status)) return status.value
[docs] def log_card_status(self, include_desc: bool = False) -> None: """Log current card status. The status is represented by a list. Each entry represents a possible card status in form of a (sub-)list. It contains the status code, name and (optional) description of the spectrum instrumentation manual. Parameters ---------- include_desc, optional Flag which indicates if description string should be contained in status entry, by default False """ msg, _ = translate_status(self.get_status(), include_desc=include_desc) status = {key: val for val, key in msg.values()} self.log.debug("Card status:\n%s", status)