"""Digital down converter (DDC) function."""
import numpy as np
from scipy.signal import decimate
[docs]
def filter_moving_average(signal, decimation: int = 100, overlap: int = 8):
r"""Decimate data using a moving average filter.
$kernel = e^{\frac{-1}{1 - x^2}} * \sin{2.073 * \pi \ x} / x, \; x = -1, ..., 1$
Parameters
----------
signal
Unprocessed raw signal to be decimated and filtered.
Input signal shape is supposed to be [averages, coils, phase encoding, readout].
Downsampling is applied to the readout dimension.
decimation, optional
Decimation factor, by default 100
overlap, optional
Overlap factor of the kernel.
If 2, kernel is applied every kernel_size/2, if 4 every kerne_size/4 and so on.
By default 4
Returns
-------
Downsampled signal
"""
# Calculate kernel size
kernel_size = int(overlap * decimation)
# Exponential function for resampling
# To prevent division by zero for [-1, 1, 0], noise at the scale of 1e-20 is added
# kernel_noise = np.random.choice(list(range(-9, 0)) + list(range(1, 10)), kernel_size) * 1e-3
kernel_space = np.linspace(-1 + 1e-10, 1 - 1e-10, kernel_size)
kernel_space[kernel_space == 0] = 1e-10
# Define kernel
kernel = np.exp(-1 / (1 - kernel_space**2)) * np.sin(kernel_space * 2.073 * np.pi) / kernel_space
# Integral for normalization
norm = np.sum(kernel)
# Calculate size of down-sampled signal
# num_ddc_samples = signal.shape[-1] // decimation
num_ddc_samples = round(signal.shape[-1] / decimation)
signal_filtered = np.zeros(signal.shape[:-1] + (num_ddc_samples,), dtype=complex)
# Zero-padding of signal to center down-sampled signal
n_pad = [(0, 0)] * signal.ndim
n_pad[-1] = (int(overlap * decimation / 2),) * 2
signal_pad = np.pad(signal, pad_width=n_pad, mode="constant", constant_values=[0])
# 1D strided convolution
for k in range(num_ddc_samples):
# _tmp = np.sum(signal_pad[..., k * decimation : k * decimation + kernel_size] * kernel)
_tmp = signal_pad[..., k * decimation : k * decimation + kernel_size] @ kernel
signal_filtered[..., k] = 2 * _tmp / norm
return signal_filtered
[docs]
def filter_cic_fir_comp(signal, decimation, number_of_stages):
"""Decimate data using CIC filter and FIR compensation.
Two stage decimation:
(1) CIC decimation by decimation/2 rate
(2) FIR decimation by factor 2
The signal size after decimation can be determined by
>>> signal.shape[-1] // decimation
The number of decimated samples after the first stage depends output sample size.
Any left over samples in CIC decimation stage are truncated.
Parameters
----------
signal
Unprocessed raw signal to be decimated and filtered.
Input signal shape is supposed to be [averages, coils, phase encoding, readout].
Downsampling is applied to the readout dimension.
decimation
Decimation factor, by default 100.
Must be even, since decimation is performed in two steps.
number_of_stages
Number of CIC stages.
Returns
-------
Downsampled signal
Raises
------
ValueError
Uneven decimation factor.
"""
cic_samples = 2 * (signal.shape[-1] // decimation)
cic_decimation = signal.shape[-1] // cic_samples
# CIC integrator Stages
for _ in range(number_of_stages):
signal = np.cumsum(signal, axis=-1)
# CIC decimation, truncate decimated signal to cic_samples (throw last sample if present)
decimated_signal = signal[..., :: int(cic_decimation)][..., :cic_samples]
# Comb Stages
for _ in range(number_of_stages):
delayed_signal = np.zeros_like(decimated_signal)
delayed_signal[..., 1:] = decimated_signal[..., :-1]
decimated_signal = decimated_signal - delayed_signal
# Normalization
gain = np.power(cic_decimation, number_of_stages)
decimated_signal = decimated_signal / gain
# Apply FIR decimation with decimation factor of 2 along readout axis
return decimate(x=decimated_signal, q=2, ftype="fir", axis=-1)