Source code for console.spcm_control.abstract_device
"""Device interface class."""
import json
from abc import ABC, abstractmethod
from ctypes import _SimpleCData, byref, c_char_p, create_string_buffer
from logging import Logger
import console.spcm_control.spcm.pyspcm as sp
from console.spcm_control.spcm.tools import translate_error, type_to_name
[docs]
class SpectrumDevice(ABC):
"""Spectrum device abstract base class."""
def __init__(self, path: str, log: Logger):
"""Init function of spectrum device.
Parameters
----------
path
Path of the spectrum card device, e.g. /dev/spcm1
"""
super().__init__()
self.card: c_char_p | None = None
self.name: str | None = None
self.path = path
self.log = log
[docs]
@abstractmethod
def dict(self) -> dict:
"""Abstract method which returns variables for logging in dictionary."""
attributes = {}
for key, var in vars(self).items():
# Check if var exists, is not None and its variable name
# does not have a leading or ending double underscore
if not key.startswith("__") and not key.endswith("__"):
if not var or var is None:
continue
if isinstance(var, _SimpleCData):
# Is a ctypes type
attributes[key] = var.value
try:
# Check if variable can be json serialized
json.dumps(var)
except TypeError:
continue
attributes[key] = var
return attributes
[docs]
def disconnect(self):
"""Disconnect card."""
# Closing the card
if self.card:
self.log.info(f"Stopping and closing card {self.name}...")
sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_STOP)
sp.spcm_vClose(self.card)
# Reset card information
self.card = None
self.name = None
[docs]
def connect(self) -> bool:
"""Establish card connection.
Raises
------
ConnectionError
Connection to card already exists
ConnectionError
Connection could not be established
"""
self.log.debug("Connecting to card")
if self.card:
# Raise connection error if card object already exists
self.log.error("Already connected to card")
# Only connect, if card is not already defined
self.card = sp.spcm_hOpen(create_string_buffer(str.encode(self.path)))
if self.card:
# Read card information
card_type = sp.int32(0)
sp.spcm_dwGetParam_i32(self.card, sp.SPC_PCITYP, byref(card_type))
self.name = type_to_name(card_type.value)
self.log.debug(f"Connection to card {self.name} established!")
self.setup_card()
else:
self.log.critical("Could not connect to card")
raise ConnectionError("Could not connect to card")
return True
[docs]
def handle_error(self, error: int):
"""General error handling function."""
if error:
# Read error message from card
err_msg = create_string_buffer(sp.ERRORTEXTLEN)
sp.spcm_dwGetErrorInfo_i32(self.card, None, None, err_msg)
# Disconnect and raise error
self.log.critical(f"Catched error: {err_msg}, {translate_error(error)}; stopping card {self.name}")
sp.spcm_dwSetParam_i32(self.card, sp.SPC_M2CMD, sp.M2CMD_CARD_STOP)
[docs]
@abstractmethod
def get_status(self) -> int:
"""Abstract method to obtain card status."""
[docs]
@abstractmethod
def setup_card(self):
"""Abstract method to setup the card."""
[docs]
@abstractmethod
def start_operation(self):
"""Abstract method to start card operation.
Parameters
----------
data, optional
Replay data in correct spcm format as numpy array, by default None
"""
[docs]
@abstractmethod
def stop_operation(self):
"""Abstract method to stop card operation."""