Source code for console.spcm_control.rx_device

"""Implementation of receive card."""
import logging
import threading
from ctypes import POINTER, addressof, byref, c_short, cast, sizeof
from dataclasses import dataclass
from decimal import Decimal, getcontext
from itertools import compress

import numpy as np

import console.spcm_control.spcm.pyspcm as sp
from console.spcm_control.abstract_device import SpectrumDevice
from console.spcm_control.spcm.tools import create_dma_buffer, translate_status, type_to_name

# Define registers lists
CH_SELECT = [
    sp.CHANNEL0,
    sp.CHANNEL1,
    sp.CHANNEL2,
    sp.CHANNEL3,
    sp.CHANNEL4,
    sp.CHANNEL5,
    sp.CHANNEL6,
    sp.CHANNEL7,
]
AMP_SELECT = [
    sp.SPC_AMP0,
    sp.SPC_AMP1,
    sp.SPC_AMP2,
    sp.SPC_AMP3,
    sp.SPC_AMP4,
    sp.SPC_AMP5,
    sp.SPC_AMP6,
    sp.SPC_AMP7,
]
IMP_SELECT = [
    sp.SPC_50OHM0,
    sp.SPC_50OHM1,
    sp.SPC_50OHM2,
    sp.SPC_50OHM3,
    sp.SPC_50OHM4,
    sp.SPC_50OHM5,
    sp.SPC_50OHM6,
    sp.SPC_50OHM7,
]


# Set precision for precise gate samples calculation
getcontext().prec = 28


[docs] @dataclass class RxCard(SpectrumDevice): """Implementation of RX device.""" path: str sample_rate: int channel_enable: list[int] max_amplitude: list[int] impedance_50_ohms: list[int] __name__: str = "RxCard" def __post_init__(self): """Execute after init function to do further class setup.""" self.log = logging.getLogger(self.__name__) super().__init__(self.path, log=self.log) self.num_channels = sp.int32(0) self.card_type = sp.int32(0) self.worker: threading.Thread | None = None self.is_running = threading.Event() # Pre trigger is set to minimum and post trigger size is at least one notify size to avoid data loss. self.pre_trigger = 8 self.post_trigger = 4096 self.rx_data = [] self.rx_scaling = [amp / (2**15) for amp in self.max_amplitude]
[docs] def dict(self) -> dict: """Returnt class variables which are json serializable as dictionary. Returns ------- Dictionary containing class variables. """ return super().dict()
[docs] def setup_card(self): """Set up spectrum card in transmit (TX) mode. At the very beginning, a card reset is performed. The clock mode is set according to the sample rate, defined by the class attribute. Two receive channels are enables and configured by max. amplitude according to class variables and impedance. Raises ------ Warning The actual set sample rate deviates from the corresponding class attribute to be set, class attribute is overwritten. """ # Get the card type and reset card sp.spcm_dwGetParam_i32(self.card, sp.SPC_PCITYP, byref(self.card_type)) sp.spcm_dwSetParam_i64(self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_RESET) # Needed? try: if "M2p.59" not in (device_type := type_to_name(self.card_type.value)): raise ConnectionError("Device with path %s is of type %s, no receive card" % (self.path, device_type)) except ConnectionError as err: self.log.exception(err, exc_info=True) raise err # Setup the internal clockmode, clock output enable (use RX clock output to enable anti-alias filter) sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCKMODE, sp.SPC_CM_INTPLL) sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCKOUT, 1) # Use external clock: Terminate to 50 Ohms, set threshold to 1.5V, suitable for 3.3V clock # sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCKMODE, sp.SPC_CM_EXTERNAL) # sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCK50OHM, 1) # sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCK_THRESHOLD, 1500) # Set card sampling rate in MHz and read the actual sampling rate sp.spcm_dwSetParam_i64(self.card, sp.SPC_SAMPLERATE, sp.MEGA(self.sample_rate)) sample_rate = sp.int64(0) sp.spcm_dwGetParam_i64(self.card, sp.SPC_SAMPLERATE, byref(sample_rate)) self.log.info("Device sampling rate: %s MHz", sample_rate.value * 1e-6) if sample_rate.value != sp.MEGA(self.sample_rate): self.log.warning( "Actual device sample rate %s MHz does not match set sample rate of %s MHz; Updating class attribute", sample_rate.value * 1e-6, self.sample_rate, ) self.sample_rate = int(sample_rate.value * 1e-6) # Check channel enable, max. amplitude per channel and impedance values try: # if (num_enable := len(self.channel_enable)) < 1 or num_enable > 8: if (num_enable := len(self.channel_enable)) != 8: raise ValueError("Channel enable list is incomplete: %s/8" % num_enable) # Impedance and amplitude configuration lists must match the channel enable list len if (num_imp := len(self.impedance_50_ohms)) != num_enable: raise ValueError("Channel impedance list is incomplete: %s/8" % num_imp) if (num_amp := len(self.max_amplitude)) != num_enable: raise ValueError("channel max. amplitude list is incomplete: %s/8" % num_amp) if not np.log2(num_enable).is_integer(): raise ValueError("Invalid number of enabled channels, must be power of 2.") except ValueError as err: self.log.exception(err, exc_info=True) raise err # Enable receive channels, compress list of channel select registers to obtain list of channels to be enabled # Sum of the compressed list equals logical or operator # e.g. sp.CHANNEL0 | sp.CHANNEL1 | sp.CHANNEL5 = sum([sp.CHANNEL0, sp.CHANNEL1, sp.CHANNEL5]) = 35 channel_selection = sum(list(compress(CH_SELECT, map(bool, self.channel_enable)))) sp.spcm_dwSetParam_i32(self.card, sp.SPC_CHENABLE, channel_selection) # Set impedance and amplitude limits for each channel according to device configuration for k, enable in enumerate(map(bool, self.channel_enable)): if enable: self.log.info( "Channel %s enabled; 50 ohms impedance: %s; Max. amplitude: %s mV", k, self.impedance_50_ohms[k], self.max_amplitude[k], ) sp.spcm_dwSetParam_i32(self.card, IMP_SELECT[k], self.impedance_50_ohms[k]) sp.spcm_dwSetParam_i32(self.card, AMP_SELECT[k], self.max_amplitude[k]) # Get the number of actual active channels and compare against provided channel enable list sp.spcm_dwGetParam_i32(self.card, sp.SPC_CHCOUNT, byref(self.num_channels)) try: self.log.info( "Number of enabled receive channels (read from card): %s", self.num_channels.value, ) if not self.num_channels.value == sum(self.channel_enable): raise ValueError("Actual number of enabled channels does not match the provided channel enable list") except ValueError as err: self.log.exception(err, exc_info=True) raise err # Digital filter setting for receiver, 0 = disable digital bandwidth filter sp.spcm_dwSetParam_i32(self.card, sp.SPC_DIGITALBWFILTER, 0) # Setup digital input channels for reference signal sp.spcm_dwSetParam_i32(self.card, sp.SPCM_X2_MODE, sp.SPCM_XMODE_DIGIN) sp.spcm_dwSetParam_i32(self.card, sp.SPC_DIGMODE0, (sp.DIGMODEMASK_BIT15 & sp.SPCM_DIGMODE_X2)) # Calculate actual post trigger size depending on the number of active channels self.post_trigger = 4096 // self.num_channels.value # Set the memory size, pre and post trigger and loop paramaters, SPC_LOOPS = 0 => runs infinitely long sp.spcm_dwSetParam_i32(self.card, sp.SPC_POSTTRIGGER, self.post_trigger) sp.spcm_dwSetParam_i32(self.card, sp.SPC_PRETRIGGER, self.pre_trigger) sp.spcm_dwSetParam_i32(self.card, sp.SPC_LOOPS, 0) # Setup timestamp mode to read number of samples per gate if available sp.spcm_dwSetParam_i32( self.card, sp.SPC_TIMESTAMP_CMD, sp.SPC_TSMODE_STARTRESET | sp.SPC_TSCNT_INTERNAL, ) sp.spcm_dwSetParam_i32(self.card, sp.SPC_TRIG_EXT1_MODE, sp.SPC_TM_POS) sp.spcm_dwSetParam_i32(self.card, sp.SPC_TRIG_ORMASK, sp.SPC_TMASK_EXT1) # Setup gated fifo mode sp.spcm_dwSetParam_i32(self.card, sp.SPC_CARDMODE, sp.SPC_REC_FIFO_GATE) # Set timeout used for DMA wait to 10 ms sp.spcm_dwSetParam_i32(self.card, sp.SPC_TIMEOUT, 10) self.log.debug("Device setup completed") self.log_card_status()
[docs] def start_operation(self): """Start card operation.""" # Clear the emergency stop flag self.is_running.clear() self.rx_data = [] # Start card thread. if time stamp mode is not available use the example function. self.worker = threading.Thread(target=self._gated_timestamps_stream) self.worker.start()
[docs] def stop_operation(self): """Stop card thread.""" # Check if thread is running if self.worker is not None: self.is_running.set() self.worker.join() # Stop the card. We will stop the card in two steps. # First we will stop the data transfer and then we will stop the card. # If time stamp mode is enabled, we need to stop the extra data transfer as well. error = sp.spcm_dwSetParam_i32( self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_STOP | sp.M2CMD_DATA_STOPDMA | sp.M2CMD_EXTRA_STOPDMA, ) self.handle_error(error) self.worker = None else: # No thread is running self.log.error("No active process found")
def _gated_timestamps_stream(self): # >> Define RX data buffer # RX buffer size must be a multiple of notify size. Min. notify size is 4096 bytes/4 kBytes. rx_notify = sp.int32(sp.KILO_B(4)) # Buffer size set to maximum. Todo check one ADC window is not exceeding the limit rx_size = 1024**3 rx_buffer_size = sp.uint64(rx_size) rx_buffer = create_dma_buffer(rx_buffer_size.value) sp.spcm_dwDefTransfer_i64( self.card, sp.SPCM_BUF_DATA, sp.SPCM_DIR_CARDTOPC, rx_notify, rx_buffer, sp.uint64(0), rx_buffer_size, ) # >> Define TS buffer # Define the timestamps notify size. Min. notify size is 4096 bytes. ts_notify = sp.int32(sp.KILO_B(4)) # Define timestamp buffer, must be multiple of timestamps notify size ts_buffer_size = sp.uint64(2 * 4096) ts_buffer = create_dma_buffer(ts_buffer_size.value) sp.spcm_dwDefTransfer_i64( self.card, sp.SPCM_BUF_TIMESTAMP, sp.SPCM_DIR_CARDTOPC, ts_notify, ts_buffer, sp.uint64(0), ts_buffer_size, ) pll_data = cast(ts_buffer, sp.ptr64) # cast to pointer to 64bit integer rx_data = cast(rx_buffer, sp.ptr16) # cast to pointer to 16bit integer # Setup polling mode self.handle_error(sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_EXTRA_POLL)) # Start DMA self.handle_error(sp.spcm_dwSetParam_i32( self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_START | sp.M2CMD_CARD_ENABLETRIGGER | sp.M2CMD_DATA_STARTDMA, )) # Define helpers/buffer to read card parameter available_timestamp_bytes = sp.int32(0) available_timestamp_postion = sp.int32(0) available_data_bytes = sp.int32(0) available_data_position = sp.int32(0) total_gates = 0 total_leftover = 0 # Start receiver self.log.debug("Starting receive") while not self.is_running.is_set(): try: self.handle_error(sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_DATA_WAITDMA)) except RuntimeError: # Reraise error for traceability raise RuntimeError # Read the available timestamp buffer size available_timestamp_bytes = sp.int32(0) sp.spcm_dwGetParam_i64(self.card, sp.SPC_TS_AVAIL_USER_LEN, byref(available_timestamp_bytes)) # Process, if buffer size is greater or equal 32 (corresponds to 2 timestamps) if available_timestamp_bytes.value >= 32: # Read timestamp position sp.spcm_dwGetParam_i32( self.card, sp.SPC_TS_AVAIL_USER_POS, byref(available_timestamp_postion), ) # self.log.info("Timestamp buffer position: %s", available_timestamp_postion.value) # Read exactly two timestamps timestamp_0 = pll_data[int(available_timestamp_postion.value / 8)] / (self.sample_rate * 1e6) timestamp_1 = pll_data[int(available_timestamp_postion.value / 8) + 2] / (self.sample_rate * 1e6) # Calculate gate duration and the number of adc gate sample points (per channel) gate_length = Decimal(str(timestamp_1)) - Decimal(str(timestamp_0)) gate_sample = int(round(gate_length * (Decimal(str(self.sample_rate)) * Decimal("1e6")))) self.log.info( "Gate: (%s s, %s s); ADC duration: %s ms ; Samples/gate/channel: %s", timestamp_0, timestamp_1, float(gate_length) * 1e3, # Can be trimmed. gate_sample, ) # Free timestamp buffer by writing available timestamp card length try: self.handle_error(sp.spcm_dwSetParam_i32(self.card, sp.SPC_TS_AVAIL_CARD_LEN, 32)) except RuntimeError: # Reraise error for traceability raise RuntimeError sp.spcm_dwGetParam_i64(self.card, sp.SPC_TS_AVAIL_USER_LEN, byref(available_timestamp_bytes)) # self.log.info("Available timestamp user length: %s", available_timestamp_bytes.value) # Check for rounding errors total_bytes_gate = (gate_sample + self.pre_trigger) * 2 * self.num_channels.value bytes_sequence = (gate_sample + self.pre_trigger + self.post_trigger) * 2 * self.num_channels.value # Read available data length and position # TODO: Double-check, why is this required? Values are read again after wait dma command. sp.spcm_dwGetParam_i32(self.card, sp.SPC_DATA_AVAIL_USER_LEN, byref(available_data_bytes)) sp.spcm_dwGetParam_i32(self.card, sp.SPC_DATA_AVAIL_USER_POS, byref(available_data_position)) # # Debug log statements # self.log.debug("Available timestamp buffer size: %s", available_timestamp_bytes.value) # self.log.debug("Expected adc data in bytes: %s", total_bytes) # self.log.debug("User position (adc buffer): %s", data_user_position.value) # self.log.debug("Number of segments in notify size: %s", total_bytes // rx_notify.value) while not self.is_running.is_set(): # sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_DATA_WAITDMA) try: self.handle_error(sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_DATA_WAITDMA)) except RuntimeError: # Reraise error for traceability raise RuntimeError # Read/update available user bytes sp.spcm_dwGetParam_i32(self.card, sp.SPC_DATA_AVAIL_USER_LEN, byref(available_data_bytes)) sp.spcm_dwGetParam_i32(self.card, sp.SPC_DATA_AVAIL_USER_POS, byref(available_data_position)) if available_data_bytes.value >= total_bytes_gate: # self.log.info("Available data length: %s", available_data_bytes.value) # self.log.info("Available data position: %s", available_data_position.value) total_gates += 1 byte_position = available_data_position.value // 2 # total_bytes_to_read = available_data_bytes.value index_0 = byte_position + (total_leftover // 2) if available_data_bytes.value + available_data_position.value >= rx_size: # >> We need two indices in case of memory position overflows the total memory length # Get the last position available and subtract it from current byte position index_1 = rx_size // 2 - index_0 # Get the remaining length after overflow. Then subtract it from the total bytes. index_2 = total_bytes_gate // 2 - index_1 # Get the first part of the slice offset_bytes_1 = index_1 * sizeof(c_short) ptr_to_slice_1 = cast(addressof(rx_data.contents) + offset_bytes_1, POINTER(c_short)) slice_1 = np.ctypeslib.as_array(ptr_to_slice_1, ((index_1),)) # Get the second part of the numpy slice offset_bytes_2 = index_2 * sizeof(c_short) ptr_to_slice_2 = cast(addressof(rx_data.contents) + offset_bytes_2, POINTER(c_short)) slice_2 = np.ctypeslib.as_array(ptr_to_slice_2, ((index_2),)) # Combine the slices gate_data = np.concatenate((slice_1, slice_2)) else: # If there is no memory position overflow, just get the data. offset_bytes = index_0 * sizeof(c_short) ptr_to_slice = cast(addressof(rx_data.contents) + offset_bytes, POINTER(c_short)) gate_data = np.ctypeslib.as_array(ptr_to_slice, ((total_bytes_gate // 2),)) # Cut the pretrigger, we do not need it. pre_trigger_cut = (self.pre_trigger) * self.num_channels.value gate_data = gate_data[pre_trigger_cut:] self.rx_data.append(gate_data.reshape((self.num_channels.value, gate_sample), order="F")) # The accumulation of the leftover bytes is positive, # if if the post-trigger event was not fully captured (accumulated sum increases), # or negative if more then the expected data could be read due to lefter bytes # from a previous acquisition (accumulated sum decreases). total_leftover += (bytes_sequence - available_data_bytes.value) # Tell the card that data has been read and the buffer can be reused. # Using the size of available data bytes prevents invalid values. try: self.handle_error( sp.spcm_dwSetParam_i32(self.card, sp.SPC_DATA_AVAIL_CARD_LEN, available_data_bytes) ) except RuntimeError: # Reraise error for traceability raise RuntimeError break self.log.debug("Card operation stopped")
[docs] def get_status(self) -> int: """Get the current card status. Returns ------- String with status description. """ try: if not self.card: raise ConnectionError("No device found") except ConnectionError as err: self.log.exception(err, exc_info=True) raise err status = sp.int32(0) sp.spcm_dwGetParam_i32(self.card, sp.SPC_M2STATUS, byref(status)) return status.value
[docs] def log_card_status(self, include_desc: bool = False) -> None: """Log current card status. The status is represented by a list. Each entry represents a possible card status in form of a (sub-)list. It contains the status code, name and (optional) description of the spectrum instrumentation manual. Parameters ---------- include_desc, optional Flag which indicates if description string should be contained in status entry, by default False """ msg, _ = translate_status(self.get_status(), include_desc=include_desc) status = {key: val for val, key in msg.values()} self.log.debug("Card status:\n%s", status)