# -*- coding: utf-8 -*-
"""Module with signal processing functions in it
used by both measure and segment modules.
"""
from itsfm.sanity_checks import make_sure_its_negative
import numpy as np
import scipy.signal as signal
[docs]def dB(X):
'''Calculates the 20log of X'''
return 20*np.log10(X)
[docs]def rms(X):
'''Root mean square of a signal '''
return np.sqrt(np.mean(X**2.0))
[docs]def calc_energy(X):
'''Sum of all squared samples '''
return np.sum(X**2.0)
[docs]def get_power_spectrum(audio, fs=250000.0):
'''Calculates an RFFT of the audio.
Parameters
------------
audio : np.array
fs : int
Frequency of sampling in Hz
Returns
-------
dB_power_spectrum : np.array
dB(power_spectrum)
freqs : np.array
Centre frequencies of the RFFT.
'''
spectrum = np.fft.rfft(audio)
freqs = np.fft.rfftfreq(audio.size, 1.0/fs)
dB_power_spectrum = dB(abs(spectrum))
return dB_power_spectrum, freqs
[docs]def calc_sound_borders(audio, percentile=99):
'''Gives the start and stop of a sound based on the percentile
cumulative energy values.
Parameters
----------
audio : np.array
percentile : float, optional
Value between 100 and 0. The sound border is calcualted
as the border which encapsulates the percentile of energy
Defaults to 99.
Returns
--------
start, end : int
'''
audio_sq = audio**2.0
cum_energy = np.cumsum(audio_sq)
outside_percentile = (100-percentile)*0.5
lower, higher = outside_percentile, 100-outside_percentile
start, end = np.percentile(cum_energy,[lower, higher])
start_ind = np.argmin(abs(cum_energy-start))
end_ind = np.argmin(abs(cum_energy-end))
return start_ind, end_ind
[docs]def get_robust_peak_frequency(audio, **kwargs):
'''Makes a spectrogram from the audio
and calcualtes the peak frequency by averaging
each slice of the spectrogram's FFT's.
This 'smooths' out the structure of the power
spectrum and allows a single and clear peak
detection.
Thanks to Holger Goerlitz for the suggestion.
Parameters
----------
audio : np.array
fs : float
Frequency of sampling in Hz
seg_length : int, optional
The size of the FFt window used to calculate the moving FFT slices.
DEfaults to 256
noverlap : int, optional
The number of samples overlapping between one FFT slice and the next.
Defaults to seg_length -1
Returns
--------
peak_frequency : float
Frequency with highest power in the audio in Hz.
'''
seg_length = kwargs.get('seg_length',256)
frequency,t,sxx = signal.spectrogram(audio, fs=int(kwargs['fs']), nperseg=seg_length, noverlap=seg_length-1)
averaged_spectrogram = np.apply_along_axis(np.sum, 1, sxx)
peak = np.argmax(averaged_spectrogram)
peak_frequency = frequency[peak]
return peak_frequency
[docs]def get_peak_frequency(audio, fs):
'''Gives peak frequency and frequency resolution
with which the measurement is made
Parameters
----------
audio : np.array
fs : float>0
sampling rate in Hz
Returns
-------
peak_freq, freq_resolution : float
The peak frequency and frequency resolution of this
peak frequency in Hz.
'''
spectrum = np.fft.rfft(audio)
freqs = np.fft.rfftfreq(audio.size, 1.0/fs)
freq_resolution = get_frequency_resolution(freqs, fs)
peak_freq = freqs[np.argmax(spectrum)]
return peak_freq, freq_resolution
[docs]def get_frequency_resolution(audio, fs):
'''
Parameters
----------
audio : np.array
fs : float>0
sampling rate in Hz
Returns
-------
resolution : float
The frequency resolution in Hz.
'''
resolution = float(fs/audio.size)
return resolution
[docs]def moving_rms(X, **kwargs):
'''Calculates moving rms of a signal with given window size.
Outputs np.array of *same* size as X. The rms of the
last few samples <= window_size away from the end are assigned
to last full-window rms calculated
Parameters
----------
X : np.array
Signal of interest.
window_size : int, optional
Defaults to 125 samples.
Returns
-------
all_rms : np.array
Moving rms of the signal.
'''
window_size = kwargs.get('window_size', 125)
starts = np.arange(0, X.size)
stops = starts+window_size
valid = stops<X.size
valid_starts = np.int32(starts[valid])
valid_stops = np.int32(stops[valid])
all_rms = np.ones(X.size).reshape(-1,1)*999
for i, (start, stop) in enumerate(zip(valid_starts, valid_stops)):
rms_value = rms(X[start:stop])
all_rms[i] = rms_value
# replace all un-assigned samples with the last rms value
all_rms[all_rms==999] = np.nan
return all_rms
[docs]def moving_rms_edge_robust(X, **kwargs):
'''Calculates moving rms of a signal with given window size.
Outputs np.array of *same* size as X. This version is robust
and doesn't suffer from edge effects as it calculates the
moving rms in both forward and backward directions
and calculates a consensus moving rms profile.
The consensus rms profile is basically achieved by
taking the left half of the forward rms profile
and concatenating it with the right hald of the
backward passed rms profile.
Parameters
----------
X : np.array
Signal of interest.
window_size : int, optional
Defaults to 125 samples.
Returns
-------
all_rms : np.array
Moving rms of the signal.
Notes
-----
moving_rms_edge_robust may not be too accurate when the rms
is expected to vary over short time scales in the centre of
the signal!!
'''
forward_run = moving_rms(X, **kwargs)
backward_run = moving_rms(np.flip(X), **kwargs)
consensus = form_consensus_moving_rms(forward_run, backward_run)
return consensus
[docs]def calc_proper_kernel_size(durn, fs):
'''scipy.signal.medfilt requires an odd number of samples as
kernel_size. This function calculates the number of samples
for a given duration which is odd and is close to the
required duration.
Parameters
----------
durn : float
Duration in seconds.
fs : float
Sampling rate in Hz
Returns
-------
samples : int
Number of odd samples that is equal to or little
less (by one sample) than the input duration.
'''
samples = int(durn*fs)
if np.remainder(samples,2)==0:
samples -= 1
return samples
[docs]def resize_by_adding_one_sample(input_signal, original_signal, **kwargs):
'''Resizes the input_signal to the same size as the original signal by repeating one
sample value. The sample value can either the last or the first sample of the input_signal.
'''
check_signal_sizes(input_signal, original_signal)
repeat_start = kwargs.get('repeat_start', True)
if repeat_start:
return np.concatenate((np.array([input_signal[0]]), input_signal))
else:
return np.concatenate((input_signal, np.array([input_signal[-1]])))
def check_signal_sizes(input_signal, original_signal):
if int(input_signal.size) >= int(original_signal.size):
msg1 = 'The input signal"s size %d'%int(input_signal.size)
msg2 = ' is greater or equal to the original signal"s size: %d'%(int(original_signal.size))
raise ValueError(msg1+msg2)
if int(original_signal.size) - int(input_signal.size) >= 2:
raise ValueError('The original signal is >= 2 samples longer than the input signal.')
[docs]def get_terminal_frequency(audio, fs, **kwargs):
'''Gives the -XdB frequency from the peak.
The power spectrum is calculated and smoothened over 3 frequency bands to remove
complex comb-like structures.
Then the lowest frequency below XdB from the peak is returned.
Parameters
----------
audio : np.array
fs : float>0
Sampling rate in Hz
terminal_frequency_threshold : float, optional
The terminal frequency is calculated based on finding the level of the peak frequency
and choosing the lowest frequency which is -10 dB (20log10) below the peak level.
Defaults to -10 dB
Returns
---------
terminal_frequency
threshold
Notes
-----
Careful about setting threshold too low - it might lead to output of terminal
frequencies that are actually in the noise, and not part of the signal itself.
'''
threshold = kwargs.get('terminal_frequency_threshold', -10)
make_sure_its_negative(threshold, variable='terminal frequency threshold')
power_spectrum, freqs = get_power_spectrum(audio, fs)
# smooth the power spectrum over 3 frequency bands to remove 'comb'-iness in the spectrum
smooth_spectrum = np.convolve(10**(power_spectrum/20.0), np.ones(3)/3,'same')
smooth_power_spectrum = dB(abs(smooth_spectrum))
peak = np.max(smooth_power_spectrum)
geq_threshold = smooth_power_spectrum >= peak + threshold
all_frequencies_above_threshold = freqs[geq_threshold]
terminal_frequency = np.min(all_frequencies_above_threshold)
return terminal_frequency, threshold