"""Interface class for acquisition data."""
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from importlib.metadata import version
from pathlib import Path
from typing import Any
import h5py
import ismrmrd
import numpy as np
from console.interfaces.acquisition_parameter import AcquisitionParameter
from console.interfaces.rx_data import RxData
from console.pulseq_interpreter.sequence_provider import Sequence, SequenceProvider
from console.utilities.data import get_nexus_acquisition_system, write_acquisition_to_mrd
from console.utilities.json_encoder import JSONEncoder
log = logging.getLogger("AcqData")
[docs]
@dataclass(slots=True, frozen=True)
class AcquisitionData:
"""Parameters which define an acquisition."""
receive_data: list[RxData]
""" A list containing a list of RxData objects which contain all of the receive data for the acquisition. The outer
list contains the list of RxData for each average."""
acquisition_parameters: AcquisitionParameter
"""Acquisition parameters."""
sequence: SequenceProvider | Sequence
"""Sequence object used for the acquisition acquisition."""
session_path: str
"""Directory the acquisition data will be stored in.
Within the given `storage_path` a new directory with time stamp and sequence name will be created."""
meta: dict[str, Any] = field(default_factory=dict)
"""Meta data dictionary for additional acquisition info.
Dictionary is updated (extended) by post-init method with some general information."""
_additional_numpy_data: dict = field(default_factory=dict)
"""Dictionarz containing addition (numpy) data.
Use the function add_data to update this dictionarz before saving.
They key of each entry is used as filename."""
def __post_init__(self) -> None:
"""Post init method to update meta data object."""
datetime_now = datetime.now()
seq_name = self.sequence.definitions["Name"].replace(" ", "_")
acquisition_id = datetime_now.strftime("%Y-%m-%d-%H%M%S-") + seq_name
self.meta.update(
{
"version": version("nexus-console"),
"date": datetime_now.strftime("%Y-%m-%d"),
"time": datetime_now.strftime("%H:%M:%S"),
"acquisition_id": acquisition_id,
"folder_name": acquisition_id,
"acquisition_parameter": self.acquisition_parameters.dict(),
"sequence": {
"name": seq_name,
"duration": self.sequence.duration()[0],
"definitions": {
# Write all sequence definitions, turn numpy arrays into lists
k: v.tolist() if isinstance(v, np.ndarray) else v for k, v in self.sequence.definitions.items()
},
},
"info": {},
}
)
[docs]
def save(self, user_path: str | None = None, overwrite: bool = False) -> None:
"""Save all the acquisition data to a given data path.
Parameters
----------
user_path
Optional user path, default is None.
If provided, it is taken to store the acquisition data.
Other wise a datetime-based folder is created.
overwrite
Flag which indicates whether the acquisition data should be overwritten
in case it already exists from a previous call to this function, default is False.
"""
# Add trailing slash and make dir
base_path = Path(user_path) if user_path is not None else Path(self.session_path)
base_path.mkdir(parents=True, exist_ok=True)
acq_folder_path = base_path / self.meta["folder_name"]
acq_folder_path.mkdir(parents=True, exist_ok=True)
try:
self._write_acquisition_data(acq_folder_path / "rx_data.h5")
except TypeError as exc:
log.warning("Type error when saving acquisition data to h5 format.", exc_info=exc)
except Exception as exc:
log.warning("Unexpected error when saving acquisition data to h5 format.", exc_info=exc)
# Save meta data
if not (meta_file := acq_folder_path / "meta.json").exists() or overwrite:
with open(meta_file, "w", encoding="utf-8") as outfile:
json.dump(self.meta, outfile, indent=4, cls=JSONEncoder)
if not (sequence_file := acq_folder_path / "sequence.seq").exists() or overwrite:
try:
# Write sequence .seq file
self.sequence.write(sequence_file)
except Exception as exc:
log.warning("Could not save sequence: %s", exc)
if len(self._additional_numpy_data) > 0:
for key, value in self._additional_numpy_data.items():
np.save(acq_folder_path / f"{key}.npy", value)
log.info("Saved acquisition data to: %s", acq_folder_path)
[docs]
def add_info(self, info: dict[str, Any]) -> None:
"""Add entries to meta data dictionary.
Parameters
----------
info
Information as dictionary to be added.
"""
try:
json.dumps(info, cls=JSONEncoder)
except TypeError as exc:
log.error("Could not append info to meta data.", exc)
self.meta["info"].update(info)
[docs]
def add_data(self, data: dict[str, np.ndarray]) -> None:
"""Add data to additional_data dictionary.
Parameters
----------
data
Data which is to be added to acquisition data.
"""
for key, val in data.items():
if isinstance(key, str) and isinstance(val, np.ndarray) and hasattr(val, "shape"):
self._additional_numpy_data.update(data)
else:
detail = f"Could not add `{key}` to acquisition data...\n\
Key-value pairs of str: np.ndarray are required."
log.error(detail)
continue
[docs]
def save_ismrmrd(
self,
header: ismrmrd.xsd.ismrmrdHeader | str | Path | None = None,
user_path: str | None = None,
) -> Path | None:
"""Store acquisition data in (ISMR)MRD format."""
# Ensure that receive data is available
if not self.receive_data or self.receive_data[0].processed_data is None:
detail = "Processed data not found in receive data. Cannot export ISMRMRD."
raise AttributeError(detail)
# Get MRD data path
base_path = Path(user_path) if user_path else Path(self.session_path)
base_path = base_path / self.meta["folder_name"]
base_path.mkdir(parents=True, exist_ok=True)
dataset_path = base_path / "data.mrd"
# Create measurement info from meta
info = ismrmrd.xsd.measurementInformationType(
measurementID=self.meta["acquisition_id"],
seriesDate=self.meta["date"],
seriesTime=self.meta["time"],
)
# Get number of coils per receive event and create acquisition system info
coils_per_rx = [
rx_data.processed_data.shape[0] for rx_data in self.receive_data if rx_data.processed_data is not None
]
system_info = get_nexus_acquisition_system(
num_coils=max(coils_per_rx),
larmor_frequency=int(self.acquisition_parameters.larmor_frequency),
)
# Define experimental conditions with true larmor frequency
conditions = ismrmrd.xsd.experimentalConditionsType(
H1resonanceFrequency_Hz=int(self.acquisition_parameters.larmor_frequency),
)
# Create header if not given
if header is None:
log.info("ISMRMRD header not given, creating header without encoding/reconstruction info.")
header = ismrmrd.xsd.ismrmrdHeader()
# Load header from xml file
if isinstance(header, (str, Path)):
header_path = Path(header)
log.info("Loading ISMRMRD header from file: %s", header_path.name)
# Open the dataset
dataset = ismrmrd.Dataset(header_path)
# Read the XML file and create header
xml_header = dataset.read_xml_header()
header = ismrmrd.xsd.CreateFromDocument(xml_header)
# Extend header if given
if isinstance(header, ismrmrd.xsd.ismrmrdHeader):
# Update existing ismrmrd header with measurement info, conditions and system info
header.measurementInformation = info
header.experimentalConditions = conditions
header.acquisitionSystemInformation = system_info
return write_acquisition_to_mrd(
data=self.receive_data,
header=header,
sequence=self.sequence,
dataset_path=dataset_path,
)
log.warning("Invalid MRD header, could not write MRD file.")
return None
def _write_acquisition_data(self, file_path: str) -> None:
"""Save AcquisitionData and all RxData entries to an HDF5 file."""
def _write_dict(group: h5py.Group, _dict: dict) -> None:
"""Write dictionary to h5py group."""
for key, value in _dict.items():
if isinstance(value, dict):
_write_dict(group.create_group(key), value)
elif isinstance(value, np.generic):
group.attrs[key] = value.item()
elif isinstance(value, (int, float, bool)):
group.attrs[key] = value
elif value is None:
group.attrs[key] = "None"
else:
group.attrs[key] = str(value)
with h5py.File(file_path, "w") as fh:
# --- Metadata
meta_group = fh.create_group("meta")
_write_dict(meta_group, self.meta)
# --- RxData per average
receive_data_group = fh.create_group("receive_data")
receive_data_group.attrs["length"] = len(self.receive_data)
for idx, rx_data in enumerate(self.receive_data):
rx_group = receive_data_group.create_group(str(idx))
_write_dict(rx_group, rx_data.dict())
if rx_data.processed_data is not None:
rx_group.create_dataset("processed_data", data=rx_data.processed_data)
if rx_data.raw_data is not None:
rx_group.create_dataset("raw_data", data=rx_data.raw_data)