Speech to Text with OpenVINO™

This Jupyter notebook can be launched on-line, opening an interactive environment in a browser window. You can also make a local installation. Choose one of the following options:

BinderGoogle ColabGithub

This tutorial demonstrates speech-to-text recognition with OpenVINO.

This tutorial uses the QuartzNet 15x5 model. QuartzNet performs automatic speech recognition. Its design is based on the Jasper architecture, which is a convolutional model trained with Connectionist Temporal Classification (CTC) loss. The model is available from Open Model Zoo.

Table of contents:

Imports

%pip install -q "librosa>=0.8.1" "matplotlib<3.8" "openvino-dev>=2023.1.0" "numpy<1.24"
from pathlib import Path
import sys

import torch
import torch.nn as nn
import IPython.display as ipd
import matplotlib.pyplot as plt
import librosa
import librosa.display
import numpy as np
import scipy
import openvino as ov
# Fetch `notebook_utils` module
import urllib.request
urllib.request.urlretrieve(
    url='https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/main/notebooks/utils/notebook_utils.py',
    filename='notebook_utils.py'
)
from notebook_utils import download_file

Settings

In this part, all variables used in the notebook are set.

model_folder = "model"
download_folder = "output"
data_folder = "data"

precision = "FP16"
model_name = "quartznet-15x5-en"

Download and Convert Public Model

If it is your first run, models will be downloaded and converted here. It my take a few minutes. Use omz_downloader and omz_converter, which are command-line tools from the openvino-dev package.

Download Model

The omz_downloader tool automatically creates a directory structure and downloads the selected model. This step is skipped if the model is already downloaded. The selected model comes from the public directory, which means it must be converted into OpenVINO Intermediate Representation (OpenVINO IR).

# Check if a model is already downloaded (to the download directory).
path_to_model_weights = Path(f'{download_folder}/public/{model_name}/models')
downloaded_model_file = list(path_to_model_weights.glob('*.pth'))

if not path_to_model_weights.is_dir() or len(downloaded_model_file) == 0:
    download_command = f"omz_downloader --name {model_name} --output_dir {download_folder} --precision {precision}"
    ! $download_command

sys.path.insert(0, str(path_to_model_weights))
def convert_model(model_path:Path, converted_model_path:Path):
    """
    helper function for converting QuartzNet model to IR
    The function accepts path to directory with dowloaded packages, weights and configs using OMZ downloader,
    initialize model and convert to OpenVINO model and serialize it to IR.
    Params:
      model_path: path to model modules, weights and configs downloaded via omz_downloader
      converted_model_path: path for saving converted model
    Returns:
      None
    """
    # add model path to PYTHONPATH for access to downloaded modules
    sys.path.append(str(model_path))

    # import necessary classes
    from ruamel.yaml import YAML

    from nemo.collections.asr import JasperEncoder, JasperDecoderForCTC
    from nemo.core import NeuralModuleFactory, DeviceType

    YAML = YAML(typ='safe')

    # utility fornction fr replacing 1d convolutions to 2d for better efficiency
    def convert_to_2d(model):
        for name, l in model.named_children():
            layer_type = l.__class__.__name__
            if layer_type == 'Conv1d':
                new_layer = nn.Conv2d(l.in_channels, l.out_channels,
                                      (1, l.kernel_size[0]), (1, l.stride[0]),
                                      (0, l.padding[0]), (1, l.dilation[0]),
                                      l.groups, False if l.bias is None else True, l.padding_mode)
                params = l.state_dict()
                params['weight'] = params['weight'].unsqueeze(2)
                new_layer.load_state_dict(params)
                setattr(model, name, new_layer)
            elif layer_type == 'BatchNorm1d':
                new_layer = nn.BatchNorm2d(l.num_features, l.eps)
                new_layer.load_state_dict(l.state_dict())
                new_layer.eval()
                setattr(model, name, new_layer)
            else:
                convert_to_2d(l)

    # model class
    class QuartzNet(torch.nn.Module):
        def __init__(self, model_config, encoder_weights, decoder_weights):
            super().__init__()
            with open(model_config, 'r') as config:
                model_args = YAML.load(config)
            _ = NeuralModuleFactory(placement=DeviceType.CPU)

            encoder_params = model_args['init_params']['encoder_params']['init_params']
            self.encoder = JasperEncoder(**encoder_params)
            self.encoder.load_state_dict(torch.load(encoder_weights, map_location='cpu'))

            decoder_params = model_args['init_params']['decoder_params']['init_params']
            self.decoder = JasperDecoderForCTC(**decoder_params)
            self.decoder.load_state_dict(torch.load(decoder_weights, map_location='cpu'))

            self.encoder._prepare_for_deployment()
            self.decoder._prepare_for_deployment()
            convert_to_2d(self.encoder)
            convert_to_2d(self.decoder)

        def forward(self, input_signal):
            input_signal = input_signal.unsqueeze(axis=2)
            i_encoded = self.encoder(input_signal)
            i_log_probs = self.decoder(i_encoded)

            shape = i_log_probs.shape
            return i_log_probs.reshape(shape[0], shape[1], shape[3])

    # path to configs and weights for creating model instane
    model_config = model_path / ".nemo_tmp/module.yaml"
    encoder_weights = model_path / ".nemo_tmp/JasperEncoder.pt"
    decoder_weights = model_path / ".nemo_tmp/JasperDecoderForCTC.pt"
    # create model instance
    model = QuartzNet(model_config, encoder_weights, decoder_weights)
    # turn model to inference mode
    model.eval()
    # convert model to OpenVINO Model using model conversion API
    ov_model = ov.convert_model(model, example_input=torch.zeros([1, 64, 128]))
    # save model in IR format for next usage
    ov.save_model(ov_model, converted_model_path)
# Check if a model is already converted (in the model directory).
path_to_converted_weights = Path(f'{model_folder}/public/{model_name}/{precision}/{model_name}.bin')
path_to_converted_model = Path(f'{model_folder}/public/{model_name}/{precision}/{model_name}.xml')

if not path_to_converted_weights.is_file():
    downloaded_model_path = Path("output/public/quartznet-15x5-en/models")
    convert_model(downloaded_model_path, path_to_converted_model)
[NeMo W 2023-09-11 15:01:17 jasper:148] Turned off 170 masked convolutions
INFO:nncf:NNCF initialized successfully. Supported frameworks detected: torch, tensorflow, onnx, openvino
[NeMo W 2023-09-11 15:01:18 deprecated:66] Function local_parameters is deprecated. It is going to be removed in the 0.11 version.

Audio Processing

Now that the model is converted, load an audio file.

Define constants

First, locate an audio file and define the alphabet used by the model. This tutorial uses the Latin alphabet beginning with a space symbol and ending with a blank symbol. In this case it will be ~, but that could be any other character.

audio_file_name = "edge_to_cloud.ogg"
alphabet = " abcdefghijklmnopqrstuvwxyz'~"

Available Audio Formats

There are multiple supported audio formats that can be used with the model:

AIFF, AU, AVR, CAF, FLAC, HTK, SVX, MAT4, MAT5, MPC2K, OGG, PAF, PVF, RAW, RF64, SD2, SDS, IRCAM, VOC, W64, WAV, NIST, WAVEX, WVE, XI

Load Audio File

Load the file after checking a file extension. Pass sr (stands for a sampling rate) as an additional parameter. The model supports files with a sampling rate of 16 kHz.

# Download the audio from the openvino_notebooks storage
file_name = download_file(
    "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/audio/" + audio_file_name,
    directory=data_folder
)

audio, sampling_rate = librosa.load(path=str(file_name), sr=16000)

Now, you can play your audio file.

ipd.Audio(audio, rate=sampling_rate)

Visualize Audio File

You can visualize how your audio file presents on a wave plot and spectrogram.

plt.figure()
librosa.display.waveshow(y=audio, sr=sampling_rate, max_points=50000, x_axis='time', offset=0.0);
plt.show()
specto_audio = librosa.stft(audio)
specto_audio = librosa.amplitude_to_db(np.abs(specto_audio), ref=np.max)
print(specto_audio.shape)
librosa.display.specshow(specto_audio, sr=sampling_rate, x_axis='time', y_axis='hz');
../_images/211-speech-to-text-with-output_20_0.png
(1025, 51)
../_images/211-speech-to-text-with-output_20_2.png

Change Type of Data

The file loaded in the previous step may contain data in float type with a range of values between -1 and 1. To generate a viable input, multiply each value by the max value of int16 and convert it to int16 type.

if max(np.abs(audio)) <= 1:
    audio = (audio * (2**15 - 1))
audio = audio.astype(np.int16)

Convert Audio to Mel Spectrum

Next, convert the pre-pre-processed audio to Mel Spectrum. For more information on why it needs to be done, refer to this article.

def audio_to_mel(audio, sampling_rate):
    assert sampling_rate == 16000, "Only 16 KHz audio supported"
    preemph = 0.97
    preemphased = np.concatenate([audio[:1], audio[1:] - preemph * audio[:-1].astype(np.float32)])

    # Calculate the window length.
    win_length = round(sampling_rate * 0.02)

    # Based on the previously calculated window length, run short-time Fourier transform.
    spec = np.abs(librosa.core.spectrum.stft(preemphased, n_fft=512, hop_length=round(sampling_rate * 0.01),
                  win_length=win_length, center=True, window=scipy.signal.windows.hann(win_length), pad_mode='reflect'))

    # Create mel filter-bank, produce transformation matrix to project current values onto Mel-frequency bins.
    mel_basis = librosa.filters.mel(sr=sampling_rate, n_fft=512, n_mels=64, fmin=0.0, fmax=8000.0, htk=False)
    return mel_basis, spec


def mel_to_input(mel_basis, spec, padding=16):
    # Convert to a logarithmic scale.
    log_melspectrum = np.log(np.dot(mel_basis, np.power(spec, 2)) + 2 ** -24)

    # Normalize the output.
    normalized = (log_melspectrum - log_melspectrum.mean(1)[:, None]) / (log_melspectrum.std(1)[:, None] + 1e-5)

    # Calculate padding.
    remainder = normalized.shape[1] % padding
    if remainder != 0:
        return np.pad(normalized, ((0, 0), (0, padding - remainder)))[None]
    return normalized[None]

Run Conversion from Audio to Mel Format

In this step, convert a current audio file into Mel scale.

mel_basis, spec = audio_to_mel(audio=audio.flatten(), sampling_rate=sampling_rate)

Visualize Mel Spectrogram

For more information about Mel spectrogram, refer to this article. The first image visualizes Mel frequency spectrogram, the second one presents filter bank for converting Hz to Mels.

librosa.display.specshow(data=spec, sr=sampling_rate, x_axis='time', y_axis='log');
plt.show();
librosa.display.specshow(data=mel_basis, sr=sampling_rate, x_axis='linear');
plt.ylabel('Mel filter');
../_images/211-speech-to-text-with-output_28_0.png ../_images/211-speech-to-text-with-output_28_1.png

Adjust Mel scale to Input

Before reading the network, make sure that the input is ready.

audio = mel_to_input(mel_basis=mel_basis, spec=spec)

Load the Model

Now, you can read and load the network.

core = ov.Core()

You may run the model on multiple devices. By default, it will load the model on CPU (you can choose manually CPU, GPU etc.) or let the engine choose the best available device (AUTO).

To list all available devices that can be used, run print(core.available_devices) command.

print(core.available_devices)
['CPU', 'GPU']

Select device from dropdown list

import ipywidgets as widgets

device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value='AUTO',
    description='Device:',
    disabled=False,
)

device
Dropdown(description='Device:', index=2, options=('CPU', 'GPU', 'AUTO'), value='AUTO')
model = core.read_model(
    model=f"{model_folder}/public/{model_name}/{precision}/{model_name}.xml"
)
model_input_layer = model.input(0)
shape = model_input_layer.partial_shape
shape[2] = -1
model.reshape({model_input_layer: shape})
compiled_model = core.compile_model(model=model, device_name=device.value)

Do Inference

Everything is set up. Now, the only thing that remains is passing input to the previously loaded network and running inference.

character_probabilities = compiled_model([ov.Tensor(audio)])[0]

Read Output

After inference, you need to reach out the output. The default output format for QuartzNet 15x5 are per-frame probabilities (after LogSoftmax) for every symbol in the alphabet, name - output, shape - 1x64x29, output data format is BxNxC, where:

  • B - batch size

  • N - number of audio frames

  • C - alphabet size, including the Connectionist Temporal Classification (CTC) blank symbol

You need to make it in a more human-readable format. To do this you, use a symbol with the highest probability. When you hold a list of indexes that are predicted to have the highest probability, due to limitations given by Connectionist Temporal Classification Decoding you will remove concurrent symbols and then remove all the blanks.

The last step is getting symbols from corresponding indexes in charlist.

# Remove unnececery dimension
character_probabilities = np.squeeze(character_probabilities)

# Run argmax to pick most possible symbols
character_probabilities = np.argmax(character_probabilities, axis=1)

Implementation of Decoding

To decode previously explained output, you need the Connectionist Temporal Classification (CTC) decode function. This solution will remove consecutive letters from the output.

def ctc_greedy_decode(predictions):
    previous_letter_id = blank_id = len(alphabet) - 1
    transcription = list()
    for letter_index in predictions:
        if previous_letter_id != letter_index != blank_id:
            transcription.append(alphabet[letter_index])
        previous_letter_id = letter_index
    return ''.join(transcription)

Run Decoding and Print Output

transcription = ctc_greedy_decode(character_probabilities)
print(transcription)
from the edge to the cloud