Source code for console.spcm_control.rx_device

"""Implementation of receive card."""
import logging
import threading
import time
from ctypes import POINTER, addressof, byref, c_short, cast
from dataclasses import dataclass
from itertools import compress

import numpy as np

import console.spcm_control.spcm.pyspcm as sp
from console.interfaces.rx_data import RxData
from console.spcm_control.abstract_device import SpectrumDevice
from console.spcm_control.spcm.tools import create_dma_buffer, type_to_name

# Define registers lists
CH_SELECT = [
    sp.CHANNEL0,
    sp.CHANNEL1,
    sp.CHANNEL2,
    sp.CHANNEL3,
    sp.CHANNEL4,
    sp.CHANNEL5,
    sp.CHANNEL6,
    sp.CHANNEL7,
]
AMP_SELECT = [
    sp.SPC_AMP0,
    sp.SPC_AMP1,
    sp.SPC_AMP2,
    sp.SPC_AMP3,
    sp.SPC_AMP4,
    sp.SPC_AMP5,
    sp.SPC_AMP6,
    sp.SPC_AMP7,
]
IMP_SELECT = [
    sp.SPC_50OHM0,
    sp.SPC_50OHM1,
    sp.SPC_50OHM2,
    sp.SPC_50OHM3,
    sp.SPC_50OHM4,
    sp.SPC_50OHM5,
    sp.SPC_50OHM6,
    sp.SPC_50OHM7,
]


[docs] @dataclass class RxCard(SpectrumDevice): """Implementation of RX device.""" __name__: str = "RxCard" def __init__( self, path: str, sample_rate: int, channel_enable: list[int], max_amplitude: list[int], impedance_50_ohms: list[int] ) -> None: """Execute after init function to do further class setup.""" self.log = logging.getLogger(self.__name__) super().__init__(path, log=self.log) self.sample_rate = sample_rate self.channel_enable = channel_enable self.max_amplitude = max_amplitude self.impedance_50_ohms = impedance_50_ohms self.rx_data: None | list[RxData] = None self.num_channels = sp.int32(0) self.card_type = sp.int32(0) self.worker: threading.Thread | None = None self.is_running = threading.Event() self.is_receiving = threading.Event() self._total_gates: int = 0 # Pre trigger is set to minimum, post trigger depends on active channel count and is defined later. self.pre_trigger: int = 8 self.post_trigger: None | int = None self.rx_scaling = [amp / (2**16) for amp in self.max_amplitude] @property def total_gates(self) -> int: """"Helper function to return the number of gates that have been collected by the Rx Card.""" return self._total_gates
[docs] def setup_card(self): """Set up spectrum card in transmit (Rx) mode. At the very beginning, a card reset is performed. The clock mode is set according to the sample rate, defined by the class attribute. Two receive channels are enables and configured by max. amplitude according to class variables and impedance. Raises ------ Warning The actual set sample rate deviates from the corresponding class attribute to be set, class attribute is overwritten. """ # Get the card type and reset card sp.spcm_dwGetParam_i32(self.card, sp.SPC_PCITYP, byref(self.card_type)) sp.spcm_dwSetParam_i64(self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_RESET) # Needed? try: if "M2p.59" not in (device_type := type_to_name(self.card_type.value)): raise ConnectionError("Device with path %s is of type %s, no receive card" % (self.path, device_type)) except ConnectionError as err: self.log.exception(err, exc_info=True) raise err # Setup the internal clockmode, clock output enable (use RX clock output to enable anti-alias filter) sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCKMODE, sp.SPC_CM_INTPLL) sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCKOUT, 1) # Use external clock: Terminate to 50 Ohms, set threshold to 1.5V, suitable for 3.3V clock # sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCKMODE, sp.SPC_CM_EXTERNAL) # sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCK50OHM, 1) # sp.spcm_dwSetParam_i32(self.card, sp.SPC_CLOCK_THRESHOLD, 1500) # Set card sampling rate in MHz and read the actual sampling rate sp.spcm_dwSetParam_i64(self.card, sp.SPC_SAMPLERATE, sp.MEGA(self.sample_rate)) sample_rate = sp.int64(0) sp.spcm_dwGetParam_i64(self.card, sp.SPC_SAMPLERATE, byref(sample_rate)) self.log.info("Device sampling rate: %s MHz", sample_rate.value * 1e-6) if sample_rate.value != sp.MEGA(self.sample_rate): self.log.warning( "Actual device sample rate %s MHz does not match set sample rate of %s MHz; Updating class attribute", sample_rate.value * 1e-6, self.sample_rate, ) self.sample_rate = int(sample_rate.value * 1e-6) # Check channel enable, max. amplitude per channel and impedance values try: # Check that the length of the channel enable list is 8 # this has to be true for cards with fewer channels too if (num_enable := len(self.channel_enable)) != 8: raise ValueError("Channel enable list is incomplete: %s/8" % num_enable) # Impedance and amplitude configuration lists must also be of length 8 if (num_imp := len(self.impedance_50_ohms)) != 8: raise ValueError("Channel impedance list is incomplete: %s/8" % num_imp) if (num_amp := len(self.max_amplitude)) != 8: raise ValueError("channel max. amplitude list is incomplete: %s/8" % num_amp) # Number of enabled channels must be either 1, 2, 4 or 8 if not np.log2(sum(self.channel_enable)).is_integer(): raise ValueError("Invalid number of enabled channels, must be power of 2.") except ValueError as err: self.log.exception(err, exc_info=True) raise err # Enable receive channels, compress list of channel select registers to obtain list of channels to be enabled # Sum of the compressed list equals logical or operator # e.g. sp.CHANNEL0 | sp.CHANNEL1 | sp.CHANNEL5 = sum([sp.CHANNEL0, sp.CHANNEL1, sp.CHANNEL5]) = 35 channel_selection = sum(list(compress(CH_SELECT, map(bool, self.channel_enable)))) sp.spcm_dwSetParam_i32(self.card, sp.SPC_CHENABLE, channel_selection) # Set impedance and amplitude limits for each channel according to device configuration for k, enable in enumerate(map(bool, self.channel_enable)): if enable: self.log.info( "Channel %s enabled; 50 ohms impedance: %s; Max. amplitude: %s mV", k, self.impedance_50_ohms[k], self.max_amplitude[k], ) sp.spcm_dwSetParam_i32(self.card, IMP_SELECT[k], self.impedance_50_ohms[k]) sp.spcm_dwSetParam_i32(self.card, AMP_SELECT[k], self.max_amplitude[k]) # Get the number of actual active channels and compare against provided channel enable list sp.spcm_dwGetParam_i32(self.card, sp.SPC_CHCOUNT, byref(self.num_channels)) try: self.log.info( "Number of enabled receive channels (read from card): %s", self.num_channels.value, ) if not self.num_channels.value == sum(self.channel_enable): raise ValueError("Actual number of enabled channels does not match the provided channel enable list") except ValueError as err: self.log.exception(err, exc_info=True) raise err # Digital filter setting for receiver, 0 = disable digital bandwidth filter sp.spcm_dwSetParam_i32(self.card, sp.SPC_DIGITALBWFILTER, 0) # Calculate trigger size depending on the number of active channels # Since data can only be gathered in notify size chunks, post_trigger // channel_count should be at least one # notify size to ensure that we can always access the full gate data. self.post_trigger = 4096 // self.num_channels.value # Set the memory size, pre and post trigger and loop paramaters, SPC_LOOPS = 0 => runs infinitely long sp.spcm_dwSetParam_i32(self.card, sp.SPC_POSTTRIGGER, self.post_trigger) sp.spcm_dwSetParam_i32(self.card, sp.SPC_PRETRIGGER, self.pre_trigger) sp.spcm_dwSetParam_i32(self.card, sp.SPC_LOOPS, 0) # Setup timestamp mode to read number of samples per gate if available sp.spcm_dwSetParam_i32( self.card, sp.SPC_TIMESTAMP_CMD, sp.SPC_TSMODE_STARTRESET | sp.SPC_TSCNT_INTERNAL, ) # Configure trigger on EXT1 channe; and trigger on positive edge sp.spcm_dwSetParam_i32(self.card, sp.SPC_TRIG_EXT1_MODE, sp.SPC_TM_POS) sp.spcm_dwSetParam_i32(self.card, sp.SPC_TRIG_ORMASK, sp.SPC_TMASK_EXT1) # Setup gated FIFO mode sp.spcm_dwSetParam_i32(self.card, sp.SPC_CARDMODE, sp.SPC_REC_FIFO_GATE) # Get gate length alignment, number of samples must be integer multiple of this gate_alignment = sp.int64(0) sp.spcm_dwGetParam_i64(self.card, sp.SPC_GATE_LEN_ALIGNMENT, byref(gate_alignment)) self.gate_alignment = gate_alignment.value self.log.debug("Alignment samples: %d samples" % (self.gate_alignment)) # Set timeout used for DMA wait to 10 ms sp.spcm_dwSetParam_i32(self.card, sp.SPC_TIMEOUT, 10) self.log.debug("Device setup completed")
[docs] def start_operation(self): """Start card operation.""" # Clear the emergency stop flag self.is_running.clear() self.is_receiving.clear() # Start card thread. if time stamp mode is not available use the example function. self.worker = threading.Thread(target=self._gated_timestamps_stream) self.worker.start()
[docs] def stop_operation(self): """Stop card thread.""" # Check if thread is running if self.worker is not None: # Signal thread to stop self.is_running.set() # Wait for thread to complete self.worker.join() # Stop card operation with the following steps: # 1. Stop card acquisition # 2. Stop data DMA transfer # 3. Stop timestamp DMA transfer self.handle_error(sp.spcm_dwSetParam_i32( self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_STOP | sp.M2CMD_DATA_STOPDMA | sp.M2CMD_EXTRA_STOPDMA, )) else: # No thread is running self.log.error("No active process found")
def _gated_timestamps_stream(self): # Rx buffer size must be a multiple of notify size. Min. notify size is 4096 bytes/4 kBytes. rx_notify = sp.int32(sp.KILO_B(4)) # Buffer size set to maximum. rx_size = 1024**3 rx_buffer_size = sp.uint64(rx_size) # Create DMA buffer for receive data and tell the card to use it rx_buffer = create_dma_buffer(rx_buffer_size.value) sp.spcm_dwDefTransfer_i64( self.card, sp.SPCM_BUF_DATA, sp.SPCM_DIR_CARDTOPC, rx_notify, rx_buffer, sp.uint64(0), rx_buffer_size, ) # Define the timestamps notify size. Min. notify size is 4096 bytes. ts_notify = sp.int32(sp.KILO_B(4)) # Define timestamp buffer size, must be multiple of timestamps notify size ts_buffer_size = sp.uint64(2 * 4096) # Create DMA buffer for timestamp data and tell the card to use it ts_buffer = create_dma_buffer(ts_buffer_size.value) sp.spcm_dwDefTransfer_i64( self.card, sp.SPCM_BUF_TIMESTAMP, sp.SPCM_DIR_CARDTOPC, ts_notify, ts_buffer, sp.uint64(0), ts_buffer_size, ) pll_data = cast(ts_buffer, sp.ptr64) # cast to pointer to 64bit integer adc_data = cast(rx_buffer, sp.ptr16) # cast to pointer to 16bit integer # Setup polling mode for timestamp data self.handle_error(sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_EXTRA_POLL)) # Start card acquistion and DMA usage self.handle_error(sp.spcm_dwSetParam_i32( self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_START | sp.M2CMD_CARD_ENABLETRIGGER | sp.M2CMD_DATA_STARTDMA, )) # Define helpers/buffer to read card parameter available_timestamp_bytes = sp.int32(0) available_timestamp_postion = sp.int32(0) available_data_bytes = sp.int32(0) available_data_position = sp.int32(0) # Track bytes from incomplete gate reads for next iteration remaining_bytes = 0 # Track the amount of gate events recorded self._total_gates = 0 # Check that the list of RxData objects has been passed if self.rx_data is None: self.log.critical("No RxData objects found for storing ADC data") raise RuntimeError("No RxData objects found for storing ADC data") # Signal that acquisition has started self.log.debug("Starting receive") self.is_receiving.set() while not self.is_running.is_set(): # Read the available timestamp buffer size sp.spcm_dwGetParam_i64(self.card, sp.SPC_TS_AVAIL_USER_LEN, byref(available_timestamp_bytes)) # Process, if buffer size is greater or equal 32 (corresponds to 2 timestamps) if available_timestamp_bytes.value >= 32: # Read timestamp position sp.spcm_dwGetParam_i32( self.card, sp.SPC_TS_AVAIL_USER_POS, byref(available_timestamp_postion), ) # Read exactly two timestamps timestamp_0 = pll_data[int(available_timestamp_postion.value / 8)] timestamp_1 = pll_data[int(available_timestamp_postion.value / 8) + 2] # Calculate gate duration and the number of adc gate sample points (per channel) gate_sample = timestamp_1 - timestamp_0 gate_length = gate_sample / (self.sample_rate * 1e6) self.log.info( "Gate: (%s s, %s s); ADC duration: %s ms ; Samples/gate/channel: %s", timestamp_0 / (self.sample_rate * 1e6), timestamp_1 / (self.sample_rate * 1e6), float(gate_length) * 1e3, # Can be trimmed. gate_sample, ) # Tell buffer 32 bytes were read from timestamp buffer try: self.handle_error(sp.spcm_dwSetParam_i32(self.card, sp.SPC_TS_AVAIL_CARD_LEN, 32)) except RuntimeError: # Reraise error for traceability raise RuntimeError # Calculate size of relevant data (pre_trigger needed to get position of start of gate) # This is the minimum amount of data must be available to get full gate data total_bytes_gate = (gate_sample + self.pre_trigger) * 2 * self.num_channels.value # Get the total data duration, including post trigger, to accurately track buffer position samples_sequence = (gate_sample + self.pre_trigger + self.post_trigger) # Ensure data aligmment alignment_samples = samples_sequence % self.gate_alignment samples_sequence += alignment_samples bytes_sequence = samples_sequence * 2 * self.num_channels.value # Check if total gate data does not exceed buffer size if bytes_sequence > rx_size: error_msg = (f"ADC gate data ({bytes_sequence} bytes) exceeds " f"available buffer ({rx_size} bytes). " f"Reduce adc length, sample rate or channel count") self.log.critical(error_msg) raise ValueError(error_msg) # Wait for ADC data to arrive in DMA buffer try: self.handle_error(sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_DATA_WAITDMA)) except RuntimeError as e: # Reraise error for traceability self.log.error(f"DMA wait failed with error: {e}") break # Read available data length and position sp.spcm_dwGetParam_i32(self.card, sp.SPC_DATA_AVAIL_USER_POS, byref(available_data_position)) sp.spcm_dwGetParam_i32(self.card, sp.SPC_DATA_AVAIL_USER_LEN, byref(available_data_bytes)) # # Debug log statements self.log.debug("ADC event size: %d bytes, Available data length: %s bytes" % (total_bytes_gate, available_data_bytes.value)) # If insufficient data is in buffer wait for more to arrive. if (available_data_bytes.value + remaining_bytes < total_bytes_gate): # Wait for sufficient data to come in wait_start = time.time() while (available_data_bytes.value + remaining_bytes < total_bytes_gate) \ and not self.is_running.is_set(): try: self.handle_error(sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_DATA_WAITDMA)) except RuntimeError as e: # Reraise error for traceability self.log.error(f"DMA wait failed with error: {e}") break sp.spcm_dwGetParam_i32(self.card, sp.SPC_DATA_AVAIL_USER_LEN, byref(available_data_bytes)) self.log.debug(f"Waited {(time.time() - wait_start) * 1e3:.3f} ms for extra data to enter buffer") if remaining_bytes + available_data_bytes.value > rx_size: error_msg = (f"Memory overflow. Sum of remaining bytes ({remaining_bytes} bytes) " f"and newly available bytes ({available_data_bytes.value} bytes) " f"exceeds receive buffer size ({rx_size} bytes)") self.log.critical(error_msg) raise MemoryError(error_msg) # Check if sufficient data is available (while loop doesn't guarantee it since it can be interrupted) if available_data_bytes.value + remaining_bytes >= total_bytes_gate: # Adjust memory position to account for bytes remaining after previous acquisition byte_position = available_data_position.value - remaining_bytes # Handle buffer wraparound if byte_position + total_bytes_gate >= rx_size: # Calculate number of bytes to end of buffer bytes_to_end = rx_size - byte_position # calculates number of samples to end of buffer (2 bytes per sample) samples_to_end = bytes_to_end // 2 # Get the remaining number of samples after overflow samples_leftover = total_bytes_gate // 2 - samples_to_end # Get the first part of the data # Handle edge case when memory position is exactly at end if samples_to_end == 0: slice_1 = np.array([], dtype=np.int16) else: ptr_to_slice_1 = cast(addressof(adc_data.contents) + byte_position, POINTER(c_short)) slice_1 = np.ctypeslib.as_array(ptr_to_slice_1, (samples_to_end,)) # Get the second part of the numpy slice ptr_to_slice_2 = cast(addressof(adc_data.contents), POINTER(c_short)) slice_2 = np.ctypeslib.as_array(ptr_to_slice_2, (samples_leftover,)) # Combine the slices gate_data = np.concatenate((slice_1, slice_2)) else: # If there is no memory position overflow, just get the data. ptr_to_slice = cast(addressof(adc_data.contents) + byte_position, POINTER(c_short)) gate_data = np.ctypeslib.as_array(ptr_to_slice, ((total_bytes_gate // 2),)) # Cut the pretrigger, we do not need it. pre_trigger_cut = (self.pre_trigger) * self.num_channels.value gate_data = gate_data[pre_trigger_cut:] # Store raw data in RxData object self.rx_data[self._total_gates].raw_data = gate_data.reshape((self.num_channels.value, gate_sample), order="F").copy() self.rx_data[self._total_gates].scaling_factor = self.rx_scaling[:self.num_channels.value] self.rx_data[self._total_gates].time_stamp = timestamp_0 / (self.sample_rate * 1e6) # The accumulation of the leftover bytes is positive, # if if the post-trigger event was not fully captured (accumulated sum increases), # or negative if more then the expected data could be read due to lefter bytes # from a previous acquisition (accumulated sum decreases). remaining_bytes += available_data_bytes.value - bytes_sequence self._total_gates += 1 # Tell the card that data has been read and the buffer can be reused. # Using the size of available data bytes prevents invalid values. try: self.handle_error( sp.spcm_dwSetParam_i32(self.card, sp.SPC_DATA_AVAIL_CARD_LEN, available_data_bytes) ) except RuntimeError: # Reraise error for traceability raise RuntimeError else: self.log.error("Needed at least %d bytes but only %d bytes available" % ( total_bytes_gate, available_data_bytes.value)) self.log.debug("Card operation stopped")