Source code for itsfm.batch_processing

# -*- coding: utf-8 -*-
"""Runs the batch processing option. The main outputs are the call measurements
and the visualisations. (See __main__.py)

.. code-block:: bash

    $ python -m itsfm -batchfile template_batchfile.csv

Also allows the user to run only one specific row of the whole batch file 

.. code-block:: bash

    $ python -m itsfm -batchfile template_batchfile.csv -one_row 10

The line above loads the 11th row (0-based indexing!!) of the template_batchfile


"""
from copy import copy
from glob import glob 
import os
import pdb
import matplotlib.pyplot as plt
import numpy as np 
import pandas as pd
try:
	import soundfile as sf
except:
	print('Cannot import SoundFile!!') # a hack for rtd build to pass.
from tqdm import tqdm
import itsfm
from itsfm.user_interface import segment_and_measure_call
from itsfm.user_interface import save_overview_graphs
from itsfm.view import itsFMInspector
from itsfm.sanity_checks import check_preexisting_file, make_sure_its_positive

[docs]def run_from_batchfile(batchfile_path, **kwargs): ''' Parameters ---------- batchfile_path : str/path Path to a batchfile Keyword Arguments ----------------- one_row : int, optional A specific row to be loaded from the whole batchfile The first row starts with 0. Defaults to None _from : int, optional Row to start the batchfile processing from. Defaults to None _till : int, optional Row to end the batchfile processing. Defaults to None ''' measurement_file_action(**kwargs) batch_data = load_batchfile(batchfile_path) final_batch_data = subset_batch_data(batch_data, **kwargs) batchfile_name = get_only_filename(batchfile_path) analysis_name = '_'.join(['measurements',batchfile_name]) measurements_output_file = analysis_name + '.csv' all_measurements = [] for row_number, one_batchfile_row in tqdm(final_batch_data.iterrows(), total=final_batch_data.shape[0]): input_arguments = parse_batchfile_row(one_batchfile_row) row_skip = input_arguments.get('skip', False) if not row_skip: main_audio, fs = load_raw_audio(input_arguments) audio_file_name = get_only_filename(input_arguments['audio_path']) print('Processing '+audio_file_name+' ...') segment_and_measure = segment_and_measure_call(main_audio, fs, **input_arguments) out_inspect = itsFMInspector(segment_and_measure, main_audio, fs, **input_arguments) (cf, fm, info), call_parts, measurements = segment_and_measure # start making diagnostic plots one, _ = out_inspect.visualise_geq_signallevel() two, _ = out_inspect.visualise_cffm_segmentation() three,_ = out_inspect.visualise_frequency_profiles() four, _, _ = out_inspect.visualise_fmrate() five, _, _ = out_inspect.visualise_accelaration() subplots_to_graph = [one, two, three, four, five] save_overview_graphs(subplots_to_graph, batchfile_name, audio_file_name, row_number, **input_arguments) measurements['audio_file'] = audio_file_name all_measurements = save_measurements_to_file(measurements_output_file, audio_file_name,all_measurements, measurements) plt.close('all')
[docs]def subset_batch_data(batch_data, **kwargs): ''' Parameters ---------- batch_data : pd.DataFrame Keyword Arguments ----------------- one_row : int, optional Defaults to None _from : int, optional Defaults to None _till : int, optional The row number the analysis should run till, including the end point. Remember the row numbering starts from 0! Defaults to None Returns ------- subset_batch_data : pd.DataFrame Either a copy of batch_data or a part of batch_data Example ------- # let's get only one row from the fake batch data file >>> batch = pd.DataFrame(data={'a':range(10), 'b':range(10)}) >>> onerow = subset_batch_data(batch, one_row=5) >>> print(onerow) # get a limited range of the dataframe >>> part = subset_batch_data(batch, _from=3, _till=8) >>> print(part) ''' # check that one_row is not being used in conjunction with # from or till if kwargs.get('one_row') is not None: onerow_used_properly(**kwargs) one_row = kwargs.get('one_row') try: subset_batch_data = make_to_oned_dataframe(batch_data.loc[one_row]) return subset_batch_data except: print(f"Unable to subset batch file with row number: {one_row}") if kwargs.get('_from') is None: start_row = 0 else: start_row = kwargs.get('_from') if kwargs.get('_till') is None: end_row = batch_data.shape[0] else: end_row = kwargs.get('_till') if end_row < start_row: raise ValueError('end row : {end_row} is before start row: {start_row}') elif np.logical_or(end_row <0, start_row<0): raise ValueError('One of either end row : {end_row} or start row: {start_row} are <0!') subset_batch_data = batch_data.loc[start_row:end_row,:] return subset_batch_data
[docs]def measurement_file_action(**kwargs): ''' Either lets the measurement file remain, or deletes it if present Keyword Arguments ----------------- del_measurement : boolean True means all files starting with 'measurement' are deleted ''' if kwargs.get('del_measurement'): # check if there is a measurement file already in the folder measurement_file_match = glob('measurement*') if len(measurement_file_match) > 0: [os.remove(each) for each in measurement_file_match]
[docs]def onerow_used_properly(**kwargs): '''Checks that the -one_row argument is not used in conjunction with -from or -till ''' if kwargs.get('one_row') is not None: from_till = [ kwargs.get('_from', 0), kwargs.get('_till', 0)] if any(from_till): raise ImproperArguments('one_row is being used with either -from or -till. This is not allowed!')
[docs]def save_measurements_to_file(output_filepath, audio_file_name, previous_rows, measurements): ''' Continously saves a row to a csv file and updates it. Thanks to tmss @ https://stackoverflow.com/a/46775108 Parameters ---------- output_filepath :str/path audio_file_name : str. previous_rows : pd.DataFrame All the previous measurements. Can also just have a single row. measurements : pd.DataFrame Current measurements to be incorporated Returns ------- None, previous rows Notes ----- Main side effect is to write an updated version of the output file. ''' #raise NotImplementedError('Long format measurement saving not implemented!!') current_measures = measurements.copy() if len(previous_rows)==0: previous_rows = current_measures.copy() previous_rows.sort_index(axis=1, inplace=True) check_preexisting_file(output_filepath) previous_rows.to_csv(output_filepath, mode='a', index=True, sep=',', encoding='utf-8') else: num_new_rows = current_measures.shape[0] current_last_row = previous_rows.shape[0] previous_rows = pd.concat((previous_rows, current_measures)) new_row, new_row_end = current_last_row, current_last_row+num_new_rows previous_rows.iloc[new_row: new_row_end,:].to_csv(output_filepath, mode='a', index=True, sep=',', encoding='utf-8', header=False) return previous_rows
def load_batchfile(batchfile): try: return pd.read_csv(batchfile) except: error_msg = 'Could not read batchfile:'+ batchfile+'. Please check file path again' raise ValueError(error_msg)
[docs]def load_raw_audio(kwargs): '''Takes a dictioanry input. All the parameter names need to be keys in the input dictionary. Parameters ----------- audio_path : str/path Path to audio file channel : int, optional Channel number to be loaded - starting from 1! Defaults to 1. start,stop : float, optional Returns -------- raw_audio : np.array The audio corresponding to the start and stop times and the required channel. ''' audio_path = kwargs.get('audio_path', None) try: fs = sf.info(audio_path).samplerate except: errormsg = 'Could not access: '+audio_path raise ValueError(errormsg) channel_to_load = int(kwargs.get('channel', 1)) -1 start_time, stop_time = kwargs.get('start', None), kwargs.get('stop', None) start_sample = convert_time_to_samples(start_time, fs) stop_sample = convert_time_to_samples(stop_time, fs) audio, fs = sf.read(audio_path, start=start_sample, stop=stop_sample) num_channels = get_number_channels(audio) if num_channels>1: return audio[:, channel_to_load], fs else: return audio, fs
def get_only_filename(file_path): folder, file_w_extension = os.path.split(file_path) filename, extension = os.path.splitext(file_w_extension) return filename
[docs]def to_separate_from_background(arguments): ''' ''' try: user_input = arguments.get('segment_call_background', True) boolean_user_input = get_boolean_from_string[user_input] return boolean_user_input except: error = 'user input '+user_input+' for segment_call_background is not True or False or DEFAULT - please check' raise ValueError(error)
get_boolean_from_string = {'True':True, 'False':False, True:True, False:False} def get_number_channels(audio): try: rows,cols = audio.shape return cols except: return 1 def convert_time_to_samples(time, fs): if not(time is None): samples = int(time*fs) else: samples = None return samples to_string = lambda X: str(X) to_float = lambda X: float(X) to_integer = lambda X: int(X) to_bool = lambda X: {'True':True, 'False':False}[X]
[docs]def to_list_w_funcs(X, source_module=itsfm.measurement_functions, **kwargs): """ Parameters ---------- X : str String defining a list with commas as separators eg. "[func_name1, func_name2] " source_module : str, optional Defaults to itsfm.measurement_functions signs_to_remove : list w str Any special signs to remove from each str in the list of comma separated strings. Defaults to None. Returns ------- list_w_funcs list with functions belonging to the source module Example ------- >>> x = "[measure_rms, measure_peak_amplitude]" >>> list_w_funcs = to_list_w_funcs(x) """ individual_strings = X.split(',') # remove unnecessary punctuations list_w_funcs = [] for each in individual_strings: cleaned = remove_punctuations(each, **kwargs) try: list_w_funcs.append(getattr(source_module, cleaned)) except: raise ValueError(f"Unable to find function {cleaned} in module {source_module}") return list_w_funcs
[docs]def remove_punctuations(full_str, **kwargs): """ Removes spaces, ], and [ in a string. Additional signs can be removed too Parameters ---------- full_str : str A long string with multiple punctuation marks to be removed (space, comma, ]) signs_to_remove : list w str', optional Additional specific punctuation/s to be removed Defaults to None Returns ------- clean_str : str """ clean_str = copy(full_str) # remove spaces clean_str = clean_str.replace(" ", "") # remove ] clean_str = clean_str.replace("]", "") # remove [ clean_str = clean_str.replace("[", "") if kwargs.get('signs_to_remove') is not None: for each in kwargs['signs_to_remove']: clean_str = clean_str.replace(each, "") return clean_str
# dictionary which converts the entries in a column to # their appropriate types convert_column_to_proper_type = { 'audio_path': to_string, 'start': to_float, 'stop' : to_float, 'channel' : to_integer, 'peak_percentage' : to_float, 'window_size' : to_integer, 'signal_level' : to_float, 'terminal_frequency_threshold' : to_float, 'fft_size' : to_integer, 'segment_method' : to_string, 'tfr_cliprange' : to_float, 'pwvd_window' : to_integer, 'pwvd_filter' : to_bool, 'measurements' : to_list_w_funcs, 'sample_every' : to_float, 'skip': to_bool }
[docs]def parse_batchfile_row(one_row): '''checks for all user-given arguments and removes any columns with DEFAULT in them. Parameters --------- one_row : pd.DataFrame A single row with multiple column names, corresponding to compulsory required arguments and the optional ones Returns ------- arguments : dictionary Simple dictioanry with one entry for each key. ''' arguments = one_row.to_dict() # remove all keys with 'NONE' in them columns_to_remove = [] for column, value in arguments.items(): if value=='DEFAULT': columns_to_remove.append(column) else: # convert to relevant type: try: arguments[column] = convert_column_to_proper_type[column](value) except: pass if len(columns_to_remove)>0: for each in columns_to_remove: try: del arguments[each] except KeyError: pass return arguments
[docs]def make_to_oned_dataframe(oned_series): """ Parameters ---------- oned_series : pd.Series One dimensional pd.Series with columns and values Returns ------- oned_df """ columns = oned_series.index.to_list() values = oned_series.values entries = data={key:value for key, value in zip(columns, values)} oned_df = pd.DataFrame(data=entries, index=[0]) return oned_df
[docs]class ImproperArguments(ValueError): pass