Classification on Emucore
Device: Emucore
Introduction
We have used QCi’s EmuCore reservoir technology to classify emotional speech files. What follows is a brief discussion of the dataset, our approach, and some results.
Reservoir Computing
The idea is to pass data through a randomly initialized neural network and then use the processed data to train a simple linear model downstream, instead of training a full neural netwrok. Reservoir based models are easier to setup and are less expensive to train. Setting up reservoir based models needs less domanin expertise for different applications.
Recurrent Neural Networks (RNN)
Here we use a Recurrent Neural Networks (RNN) architecture as our reservoir.
https://en.wikipedia.org/wiki/Recurrent_neural_network
QCi's EmuCore
QCI's EmuCore technology is based a time delayed scheme.
Dataset
In this study, we used the RAVDESS Emotional Speech Dataset. RAVDESS dataset consists of 1440 audio files, generated by 24 speakers, which correspond to 8 different emotion. The aim is build a model that can detect which emotion an audio file corresponds to.
https://www.kaggle.com/datasets/uwrfkaggler/ravdess-emotional-speech-audio
Copy the files to a directory called "ravdess_files" under the working directory.
Method
Our approach consists of the following steps,
- Convert each audio file to a time series with an appropriate sampling rate.
In [81]:
from scipy.io import wavfileimport matplotlib.pyplot as pltimport warningswarnings.filterwarnings("ignore")samp_freq, y = wavfile.read("ravdess_files/03-01-05-01-01-02-20.wav")y = np.array(y, dtype=np.float64)y = y[50001: 125000]plt.plot(y)plt.show()
Out [ ]:
<Figure size 640x480 with 1 Axes>
- Convert each time series to a chochleagram. A cochleagram is a time-frequency representation of the audio signal and as such is a 2 dimensional array.
https://en.wikipedia.org/wiki/Computational_auditory_scene_analysis#Cochleagram
In [68]:
import numpy as npfrom lyon.calc import LyonCalccalc = LyonCalc()vec = calc.lyon_passive_ear(y, 1.0e6, decimation_factor=1000, ear_q=8,) vec = np.array(vec)plt.imshow(vec)
Out [68]:
<matplotlib.image.AxesImage at 0x281d1df60>
Out [ ]:
<Figure size 640x480 with 1 Axes>
-
Initialize reservoir (QCi's EmuCore) once and then run each cochleagram through the reservoir and store its transient response.
-
Use the transient responses to build a linear model.
The training and testing data were split such as they contain different sets of speakers.
Implementation
Utility Functions
We define a few utility functions.
In [69]:
import osfrom os import listdirfrom os.path import isfile, joinimport sysimport numpy as npimport librosafrom lyon.calc import LyonCalcfrom scipy.io import wavfiledef to_categorical(y, num_classes=None, dtype="float32"):y = np.array(y, dtype="int")input_shape = y.shape# Shrink the last dimension if the shape is (..., 1).if input_shape and input_shape[-1] == 1 and len(input_shape) > 1:input_shape = tuple(input_shape[:-1])y = y.reshape(-1)if not num_classes:num_classes = np.max(y) + 1n = y.shape[0]categorical = np.zeros((n, num_classes), dtype=dtype)categorical[np.arange(n), y] = 1output_shape = input_shape + (num_classes,)categorical = np.reshape(categorical, output_shape)return categoricaldef _parse_rescale_arg(rescale):"""Parse the rescaling argument to a standard form.Args:rescale ({'normalize', 'standardize', None}): Determines how rescalingwill be performed.Returns:(str or None): A valid rescaling argument, for use with wav_to_array orsimilar.Raises:ValueError: Throws an error if rescale value is unrecognized."""if rescale is not None:rescale = rescale.lower()if rescale == 'normalize':out_rescale = 'normalize'elif rescale == 'standardize':out_rescale = 'standardize'elif rescale is None:out_rescale = Noneelse:raise ValueError('Unrecognized rescale value: %s' % rescale)return out_rescaledef rescale_sound(snd_array, rescale):"""Rescale the sound with the provided rescaling method (if supported).Args:snd_array (array): The array containing the sound data.rescale ({'standardize', 'normalize', None}): Determines type ofrescaling to perform. 'standardize' will divide by the max valueallowed by the numerical precision of the input. 'normalize' willrescale to the interval [-1, 1]. None will not perform rescaling (NOTE:be careful with this as this can be *very* loud if playedback!).Returns:array:**rescaled_snd**: The sound array after rescaling."""rescale = _parse_rescale_arg(rescale)if rescale == 'standardize':if issubclass(snd_array.dtype.type, np.integer):snd_array = snd_array / float(np.iinfo(snd_array.dtype).max) # rescale so max value allowed by precision has value 1elif issubclass(snd_array.dtype.type, np.floating):snd_array = snd_array / float(np.finfo(snd_array.dtype).max) # rescale so max value allowed by precision has value 1else:raise ValueError('rescale is undefined for input type: %s' % snd_array.dtype)elif rescale == 'normalize':snd_array = snd_array / float(snd_array.max()) # rescale to [-1, 1]# do nothing if rescale is Nonereturn snd_arraydef wav_to_array(fn, rescale='standardize'):""" Reads wav file data into a numpy array.Args:fn (str): The file path to .wav file.rescale ({'standardize', 'normalize', None}): Determines type ofrescaling to perform. 'standardize' will divide by the max valueallowed by the numerical precision of the input. 'normalize' willrescale to the interval [-1, 1]. None will not perform rescaling (NOTE:be careful with this as this can be *very* loud if playedback!).Returns:tuple:**snd** (int): The sound in the .wav file as a numpy array.**samp_freq** (array): Sampling frequency of the input sound."""samp_freq, snd = wavfile.read(fn)snd = rescale_sound(snd, rescale)return snd, samp_freqdef search_audio_files(folder_path):"""Parse the folder for audio filesArgs:folder_path ([string]): [parse the path for .wave files]Returns:[list]: [list of audio file names]""" file_names = [f for f in listdir(folder_path) if isfile(join(folder_path, f)) and '.wav' in f]return file_namesdef convert_to_cochleagram(audio_path,decimation_factor=77,n=None,nonlinearity=None,maxLength=None,): """Creates a spectrogram of a wav file. Cochleagram dimension = Nf(channel)xNt(time steps):param audio_path: path of wav file:param n: (int) Number of filters to use in the filterbank.:param nonlinearity: None applies no nonlinearity. 'db' will convert output to decibels (truncated at -60). 'power' will apply 3/10 power compression.:return:None"""signal,sample_rate = wav_to_array(audio_path)signal = signal[50001: 125000]fs=12E3 #resample frequencydata = librosa.core.resample(y=signal.astype(np.float64), orig_sr=sample_rate, target_sr=fs, res_type="scipy")# zero paddingif maxLength is not None and len(data) > maxLength:err_msg=f"datalenght={len(data)}, maxlength={maxLength},data length cannot exceed padding length."print(err_msg)# raise ValueError(err_msg)return (None,-1)elif maxLength is not None and len(data) < maxLength:embedded_data = np.zeros(maxLength)offset = np.random.randint(low = 0, high = maxLength - len(data))embedded_data[offset:offset+len(data)] = dataelif maxLength is not None and len(data) == maxLength:# nothing to do hereembedded_data = datapassif maxLength is not None:data = embedded_datacalc = LyonCalc()#using resampled datacoch = calc.lyon_passive_ear(data, fs, decimation_factor=decimation_factor, ear_q=8,) #, step_factor=0.35)coch=np.array(coch)return (coch,0)def WSR_MSE(target_lst,est_lst):''':param target: this is list of targets :param estimate: this is list of estimate'''sucess=0count=0for estimate,target in zip(est_lst,target_lst):estimate_winner=(estimate==np.amax(estimate))*2-1count=count+1if((estimate_winner==target).all()):sucess=sucess+1wsr=sucess/countreturn wsr
Build training and testing datasets
This step includes converting each file to a time series and consequently to a cochleagram. As mentioned, the data is split into training and testing such that they correspond to different speakers.
In [70]:
import osimport sysimport randomimport timeimport numpy as npimport pandas as pdimport matplotlib.pyplot as pltAUDIO_DIR = "ravdess_files/"NUM_LABELS = 8NUM_ACTORS = 24NUM_TRAIN_ACTORS = 19DECIMATION_FACTOR = 77MAX_LENGTH = NoneACTORS = [i for i in range(1, NUM_ACTORS + 1)]
We choose a subset of the actors (speakers) as the training set. This way we use files from different speakers in training and testing.
In [65]:
random.shuffle(ACTORS)train_actors = ACTORS[:NUM_TRAIN_ACTORS]
We then loop through all audio files and convert them to cochleagrams. We also get the label corresponding to each audio files from the name of the file, for both training and testing sets. The results are stored into two binary files.
In [ ]:
audio_files = search_audio_files(AUDIO_DIR)train_set = {}test_set = {}for audio_file in audio_files:print(audio_file)base_name = audio_file.split(".")[0]tmp_list = base_name.split("-")emotion = int(tmp_list[2]) emotion_intensity = tmp_list[3] statement = tmp_list[4] repetition = tmp_list[5] actor = int(tmp_list[6]) cochleagram, status = convert_to_cochleagram(os.path.join(AUDIO_DIR, audio_file),decimation_factor=DECIMATION_FACTOR,maxLength=MAX_LENGTH)if cochleagram is None: continue label = to_categorical(emotion - 1,num_classes=NUM_LABELS)if actor in train_actors: train_set[audio_file] = {"coch": cochleagram, "label": label}else: test_set[audio_file] = {"coch": cochleagram, "label": label}np.save("train_set_ravdess.npy", train_set)np.save("test_set_ravdess.npy", test_set)
Run data through the reservoir. We should now run the features through QCi's reservoir computer. We start by importing some libraries and setting some parameters.
In [72]:
import osimport numpy as npfrom bumblebee_client.bumblebee_client import BumblebeeClientIP_ADDR = "172.22.19.49"VBIAS = 0.3GAIN = 0.65NUM_NODES = 1000NUM_TAPS = NUM_NODESFEATURE_SCALING = 0.5DENSITY = 1NUM_F = 77
We should now instantiate a reservoir object,
In [ ]:
client = BumblebeeClient(ip_addr=IP_ADDR)lock_id, start, end = client.wait_for_lock()client.reservoir_reset(lock_id=lock_id)client.rc_config(lock_id=lock_id,vbias=VBIAS,gain=GAIN,num_nodes=NUM_NODES,num_taps=NUM_TAPS)
and load the training and testing features and labels,
In [ ]:
train_set = np.load("train_set_ravdess.npy", allow_pickle=True).item()test_set = np.load("test_set_ravdess.npy", allow_pickle=True).item()
We should now loop through training and testing data and run the features through the reservoir,
In [ ]:
for item in train_set.keys():print("Train", item)assert train_set[item]["coch"].shape[1] == NUM_FX_trans_response, _, _ = client.process_all_data(input_data=train_set[item]["coch"],num_nodes=NUM_NODES,density=DENSITY,feature_scaling=FEATURE_SCALING,lock_id=lock_id,)train_set[item]["coch"] = X_trans_responseassert train_set[item]["coch"].shape[1] == NUM_NODESfor item in test_set.keys():print("Test", item)assert test_set[item]["coch"].shape[1] == NUM_FX_trans_response, _, _ = client.process_all_data(input_data=test_set[item]["coch"],num_nodes=NUM_NODES,density=DENSITY,feature_scaling=FEATURE_SCALING,lock_id=lock_id,)test_set[item]["coch"] = X_trans_responseassert test_set[item]["coch"].shape[1] == NUM_NODES
And finally store the reservoir output,
In [31]:
np.save("train_set_ravdess_reservoir2.npy", train_set)np.save("test_set_ravdess_reservoir2.npy", test_set)
Train a linear regressor
We now build a linear regression model from the reservoir output. We start by importing libraries, setting parameters, and loading the output of the reservoir,
In [74]:
import osimport sysimport numpy as npfrom sklearn.linear_model import LinearRegression, LogisticRegressionfrom sklearn.metrics import accuracy_scorefrom sklearn.ensemble import RandomForestClassifierfrom sklearn.tree import DecisionTreeClassifierfrom sklearn.svm import SVCSTATEMENTS = ["01", "02"]INTENSITIES = ["01", "02"]REPETITIONS = ["01", "02"]train_set = np.load("train_set_ravdess_reservoir2.npy",allow_pickle=True,).item()test_set = np.load("test_set_ravdess_reservoir2.npy",allow_pickle=True,).item()
We now loop through training and testing data to assemble the features and labels arrays,
In [75]:
X_train = Noney_train = Nonefor file_name in train_set.keys():base_name = file_name.split(".")[0]if base_name.split("-")[5] not in REPETITIONS:continue if base_name.split("-")[4] not in STATEMENTS:continue if base_name.split("-")[3] not in INTENSITIES:continue coch = train_set[file_name]["coch"]if X_train is None:X_train = cochelse:X_train = np.concatenate([X_train, coch]) label = train_set[file_name]["label"] * 2 - 1 labels = np.repeat(label.reshape(-1,1), coch.shape[0], axis=1).Tif y_train is None:y_train = labelselse:y_train = np.concatenate([y_train, labels])
We can now train a linear model,
In [76]:
clf = LinearRegression(fit_intercept=True)clf.fit(X_train, y_train)
Out [76]:
LinearRegression()In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
LinearRegression()
And we can calculate the success rate of the classifier on both training and testing data,
In [78]:
y_train = []y_train_prd = []for file_name in train_set.keys():base_name = file_name.split(".")[0]if base_name.split("-")[5] not in REPETITIONS:continueif base_name.split("-")[4] not in STATEMENTS:continueif base_name.split("-")[3] not in INTENSITIES:continueX_train_tmp = train_set[file_name]["coch"]y_train.append(train_set[file_name]["label"] * 2 - 1)y_train_prd.append(clf.predict(X_train_tmp).mean(axis=0))print("Success rate on train data: %0.3f" % (WSR_MSE(y_train, y_train_prd)))y_test = []y_test_prd = []for file_name in test_set.keys():base_name = file_name.split(".")[0] if base_name.split("-")[5] not in REPETITIONS:continueif base_name.split("-")[4] not in STATEMENTS:continueif base_name.split("-")[3] not in INTENSITIES:continueX_test_tmp = test_set[file_name]["coch"]y_test.append(test_set[file_name]["label"] * 2 - 1)y_test_prd.append(clf.predict(X_test_tmp).mean(axis=0))print("Success rate on test data: %0.3f" % (WSR_MSE(y_test, y_test_prd)))
Out [ ]:
Success rate on train data: 0.650 Success rate on test data: 0.373
Several tests were done using different number of reservoir nodes. Some the results are presented here.