Source code for console.spcm_control.abstract_device

"""Device interface class."""
import json
from abc import ABC, abstractmethod
from ctypes import _SimpleCData, byref, c_char_p, create_string_buffer
from logging import Logger

import console.spcm_control.spcm.pyspcm as sp
from console.spcm_control.spcm.tools import translate_error, type_to_name


[docs] class SpectrumDevice(ABC): """Spectrum device abstract base class.""" def __init__(self, path: str, log: Logger): """Init function of spectrum device. Parameters ---------- path Path of the spectrum card device, e.g. /dev/spcm1 """ super().__init__() self.card: c_char_p | None = None self.name: str | None = None self.path = path self.log = log
[docs] @abstractmethod def dict(self) -> dict: """Abstract method which returns variables for logging in dictionary.""" attributes = {} for key, var in vars(self).items(): # Check if var exists, is not None and its variable name # does not have a leading or ending double underscore if not key.startswith("__") and not key.endswith("__"): if not var or var is None: continue if isinstance(var, _SimpleCData): # Is a ctypes type attributes[key] = var.value try: # Check if variable can be json serialized json.dumps(var) except TypeError: continue attributes[key] = var return attributes
[docs] def disconnect(self): """Disconnect card.""" # Closing the card if self.card: self.log.info(f"Stopping and closing card {self.name}...") sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_STOP) sp.spcm_vClose(self.card) # Reset card information self.card = None self.name = None
[docs] def connect(self) -> bool: """Establish card connection. Raises ------ ConnectionError Connection to card already exists ConnectionError Connection could not be established """ self.log.debug("Connecting to card") if self.card: # Raise connection error if card object already exists self.log.error("Already connected to card") # Only connect, if card is not already defined self.card = sp.spcm_hOpen(create_string_buffer(str.encode(self.path))) if self.card: # Read card information card_type = sp.int32(0) sp.spcm_dwGetParam_i32(self.card, sp.SPC_PCITYP, byref(card_type)) self.name = type_to_name(card_type.value) self.log.debug(f"Connection to card {self.name} established!") self.setup_card() else: self.log.critical("Could not connect to card") raise ConnectionError("Could not connect to card") return True
[docs] def handle_error(self, error: int): """General error handling function.""" if error: # Read error message from card err_msg = create_string_buffer(sp.ERRORTEXTLEN) sp.spcm_dwGetErrorInfo_i32(self.card, None, None, err_msg) # Disconnect and raise error self.log.critical(f"Catched error: {err_msg}, {translate_error(error)}; stopping card {self.name}") sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_STOP)
[docs] @abstractmethod def get_status(self) -> int: """Abstract method to obtain card status."""
[docs] @abstractmethod def setup_card(self): """Abstract method to setup the card."""
[docs] @abstractmethod def start_operation(self): """Abstract method to start card operation. Parameters ---------- data, optional Replay data in correct spcm format as numpy array, by default None """
[docs] @abstractmethod def stop_operation(self): """Abstract method to stop card operation."""