Speech to Text with OpenVINO™

This tutorial is also available as a Jupyter notebook that can be cloned directly from GitHub. See the installation guide for instructions to run this tutorial locally on Windows, Linux or macOS. To run without installing anything, click the launch binder button.

Binder Github

This tutorial demonstrates speech-to-text recognition with OpenVINO.

This tutorial uses the quartznet 15x5 model. QuartzNet performs automatic speech recognition. Its design is based on the Jasper architecture, which is a convolutional model trained with Connectionist Temporal Classification (CTC) loss. The model is available from Open Model Zoo.

Imports

!pip install -q "librosa>=0.8.1"
from pathlib import Path
import sys

import torch
import torch.nn as nn
import IPython.display as ipd
import matplotlib.pyplot as plt
import librosa
import librosa.display
import numpy as np
import scipy
from openvino.runtime import Core, serialize, Tensor
from openvino.tools import mo

Settings

In this part, all variables used in the notebook are set.

model_folder = "model"
download_folder = "output"
data_folder = "../data"

precision = "FP16"
model_name = "quartznet-15x5-en"

Download and Convert Public Model

If it is your first run, models will be downloaded and converted here. It my take a few minutes. Use omz_downloader and omz_converter, which are command-line tools from the openvino-dev package.

Download Model

The omz_downloader tool automatically creates a directory structure and downloads the selected model. This step is skipped if the model is already downloaded. The selected model comes from the public directory, which means it must be converted into OpenVINO Intermediate Representation (OpenVINO IR).

# Check if a model is already downloaded (to the download directory).
path_to_model_weights = Path(f'{download_folder}/public/{model_name}/models')
downloaded_model_file = list(path_to_model_weights.glob('*.pth'))

if not path_to_model_weights.is_dir() or len(downloaded_model_file) == 0:
    download_command = f"omz_downloader --name {model_name} --output_dir {download_folder} --precision {precision}"
    ! $download_command

Convert Model

In previous step, model was downloaded in PyTorch format. Currently, PyTorch models supported in OpenVINO via ONNX exporting, torch.onnx.export function helps to trace PyTorch model to ONNX and save it on disk. It is also recommended to convert model to OpenVINO Intermediate Representation format for applying optimizations.

def convert_model(model_path:Path, converted_model_path:Path):
    """
    helper function for converting QuartzNet model to IR
    The function accepts path to directory with dowloaded packages, weights and configs using OMZ downloader,
    initialize model, export it to ONNX and then convert to OpenVINO model and serialize it to IR.
    Params:
      model_path: path to model modules, weights and configs downloaded via omz_downloader
      converted_model_path: path for saving converted model
    Returns:
      None
    """
    # add model path to PYTHONPATH for access to downloaded modules
    sys.path.append(str(model_path))

    # import necessary classes
    from ruamel.yaml import YAML

    from nemo.collections.asr import JasperEncoder, JasperDecoderForCTC
    from nemo.core import NeuralModuleFactory, DeviceType

    YAML = YAML(typ='safe')

    # utility fornction fr replacing 1d convolutions to 2d for better efficiency
    def convert_to_2d(model):
        for name, l in model.named_children():
            layer_type = l.__class__.__name__
            if layer_type == 'Conv1d':
                new_layer = nn.Conv2d(l.in_channels, l.out_channels,
                                      (1, l.kernel_size[0]), (1, l.stride[0]),
                                      (0, l.padding[0]), (1, l.dilation[0]),
                                      l.groups, False if l.bias is None else True, l.padding_mode)
                params = l.state_dict()
                params['weight'] = params['weight'].unsqueeze(2)
                new_layer.load_state_dict(params)
                setattr(model, name, new_layer)
            elif layer_type == 'BatchNorm1d':
                new_layer = nn.BatchNorm2d(l.num_features, l.eps)
                new_layer.load_state_dict(l.state_dict())
                new_layer.eval()
                setattr(model, name, new_layer)
            else:
                convert_to_2d(l)

    # model class
    class QuartzNet(torch.nn.Module):
        def __init__(self, model_config, encoder_weights, decoder_weights):
            super().__init__()
            with open(model_config, 'r') as config:
                model_args = YAML.load(config)
            _ = NeuralModuleFactory(placement=DeviceType.CPU)

            encoder_params = model_args['init_params']['encoder_params']['init_params']
            self.encoder = JasperEncoder(**encoder_params)
            self.encoder.load_state_dict(torch.load(encoder_weights, map_location='cpu'))

            decoder_params = model_args['init_params']['decoder_params']['init_params']
            self.decoder = JasperDecoderForCTC(**decoder_params)
            self.decoder.load_state_dict(torch.load(decoder_weights, map_location='cpu'))

            self.encoder._prepare_for_deployment()
            self.decoder._prepare_for_deployment()
            convert_to_2d(self.encoder)
            convert_to_2d(self.decoder)

        def forward(self, input_signal):
            input_signal = input_signal.unsqueeze(axis=2)
            i_encoded = self.encoder(input_signal)
            i_log_probs = self.decoder(i_encoded)

            shape = i_log_probs.shape
            return i_log_probs.reshape(shape[0], shape[1], shape[3])

    # path to configs and weights for creating model instane
    model_config = model_path / ".nemo_tmp/module.yaml"
    encoder_weights = model_path / ".nemo_tmp/JasperEncoder.pt"
    decoder_weights = model_path / ".nemo_tmp/JasperDecoderForCTC.pt"
    # create model instance
    model = QuartzNet(model_config, encoder_weights, decoder_weights)
    # turn model to inference mode
    model.eval()
    # export model to ONNX with preserving dynamic shapes
    onnx_model_path = model_path / "quartznet.onnx"
    torch.onnx.export(
        model,
        torch.zeros([1, 64, 128]),
        onnx_model_path,
        opset_version=11,
        input_names=["audio_signal"],
        output_names=['output'],
        dynamic_axes={"audio_signal": {0: "batch_size", 2: "wave_len"}, "output": {0: "batch_size", 2: "wave_len"}}
    )
    # convert model to OpenVINO Model using OpenVINO Model Optimizer
    ov_model = mo.convert_model(str(onnx_model_path))
    # serialize model to IR for next usage
    serialize(ov_model, str(converted_model_path))
# Check if a model is already converted (in the model directory).
path_to_converted_weights = Path(f'{model_folder}/public/{model_name}/{precision}/{model_name}.bin')
path_to_converted_model = Path(f'{model_folder}/public/{model_name}/{precision}/{model_name}.xml')

if not path_to_converted_weights.is_file():
    downloaded_model_path = Path("output/public/quartznet-15x5-en/models")
    convert_model(downloaded_model_path, path_to_converted_model)

Audio Processing

Now that the model is converted, load an audio file.

Define constants

First, locate an audio file and define the alphabet used by the model. This tutorial uses the Latin alphabet beginning with a space symbol and ending with a blank symbol. In this case it will be ~, but that could be any other character.

audio_file_name = "edge_to_cloud.ogg"
alphabet = " abcdefghijklmnopqrstuvwxyz'~"

Available Audio Formats

There are multiple supported audio formats that can be used with the model:

AIFF, AU, AVR, CAF, FLAC, HTK, SVX, MAT4, MAT5, MPC2K, OGG, PAF, PVF, RAW, RF64, SD2, SDS, IRCAM, VOC, W64, WAV, NIST, WAVEX, WVE, XI

Load Audio File

Load the file after checking a file extension. Pass sr (stands for a sampling rate) as an additional parameter. The model supports files with a sampling rate of 16 kHz.

audio, sampling_rate = librosa.load(path=f'{data_folder}/audio/{audio_file_name}', sr=16000)

Now, you can play your audio file.

ipd.Audio(audio, rate=sampling_rate)

Visualise Audio File

You can visualize how your audio file presents on a wave plot and spectrogram.

plt.figure()
librosa.display.waveshow(y=audio, sr=sampling_rate, max_points=50000, x_axis='time', offset=0.0);
plt.show()
specto_audio = librosa.stft(audio)
specto_audio = librosa.amplitude_to_db(np.abs(specto_audio), ref=np.max)
print(specto_audio.shape)
librosa.display.specshow(specto_audio, sr=sampling_rate, x_axis='time', y_axis='hz');
/tmp/ipykernel_3057245/2518307745.py:2: FutureWarning: waveshow() keyword argument 'x_axis' has been renamed to 'axis' in version 0.10.0.
    This alias will be removed in version 1.0.
  librosa.display.waveshow(y=audio, sr=sampling_rate, max_points=50000, x_axis='time', offset=0.0);
../_images/211-speech-to-text-with-output_21_1.png
(1025, 51)
../_images/211-speech-to-text-with-output_21_3.png

Change Type of Data

The file loaded in the previous step may contain data in float type with a range of values between -1 and 1. To generate a viable input, multiply each value by the max value of int16 and convert it to int16 type.

if max(np.abs(audio)) <= 1:
    audio = (audio * (2**15 - 1))
audio = audio.astype(np.int16)

Convert Audio to Mel Spectrum

Next, convert the pre-pre-processed audio to Mel Spectrum. For more information on why it needs to be done, refer to this article.

def audio_to_mel(audio, sampling_rate):
    assert sampling_rate == 16000, "Only 16 KHz audio supported"
    preemph = 0.97
    preemphased = np.concatenate([audio[:1], audio[1:] - preemph * audio[:-1].astype(np.float32)])

    # Calculate the window length.
    win_length = round(sampling_rate * 0.02)

    # Based on the previously calculated window length, run short-time Fourier transform.
    spec = np.abs(librosa.core.spectrum.stft(preemphased, n_fft=512, hop_length=round(sampling_rate * 0.01),
                  win_length=win_length, center=True, window=scipy.signal.windows.hann(win_length), pad_mode='reflect'))

    # Create mel filter-bank, produce transformation matrix to project current values onto Mel-frequency bins.
    mel_basis = librosa.filters.mel(sr=sampling_rate, n_fft=512, n_mels=64, fmin=0.0, fmax=8000.0, htk=False)
    return mel_basis, spec


def mel_to_input(mel_basis, spec, padding=16):
    # Convert to a logarithmic scale.
    log_melspectrum = np.log(np.dot(mel_basis, np.power(spec, 2)) + 2 ** -24)

    # Normalize the output.
    normalized = (log_melspectrum - log_melspectrum.mean(1)[:, None]) / (log_melspectrum.std(1)[:, None] + 1e-5)

    # Calculate padding.
    remainder = normalized.shape[1] % padding
    if remainder != 0:
        return np.pad(normalized, ((0, 0), (0, padding - remainder)))[None]
    return normalized[None]

Run Conversion from Audio to Mel Format

In this step, convert a current audio file into Mel scale.

mel_basis, spec = audio_to_mel(audio=audio.flatten(), sampling_rate=sampling_rate)

Visualise Mel Spectogram

For more information about Mel spectrogram, refer to this article. The first image visualizes Mel frequency spectrogram, the second one presents filter bank for converting Hz to Mels.

librosa.display.specshow(data=spec, sr=sampling_rate, x_axis='time', y_axis='log');
plt.show();
librosa.display.specshow(data=mel_basis, sr=sampling_rate, x_axis='linear');
plt.ylabel('Mel filter');
../_images/211-speech-to-text-with-output_29_0.png ../_images/211-speech-to-text-with-output_29_1.png

Adjust Mel scale to Input

Before reading the network, make sure that the input is ready.

audio = mel_to_input(mel_basis=mel_basis, spec=spec)

Load the Model

Now, you can read and load the network.

ie = Core()

You may run the network on multiple devices. By default, it will load the model on CPU (you can choose manually CPU, GPU, MYRIAD, etc.) or let the engine choose the best available device (AUTO).

To list all available devices that can be used, run print(ie.available_devices) command.

print(ie.available_devices)
['CPU', 'GPU']

To change the device used for your network, change value of device_name variable to one of the values listed by print() in the cell above.

model = ie.read_model(
    model=f"{model_folder}/public/{model_name}/{precision}/{model_name}.xml"
)
model_input_layer = model.input(0)
shape = model_input_layer.partial_shape
shape[2] = -1
model.reshape({model_input_layer: shape})
compiled_model = ie.compile_model(model=model, device_name="CPU")

Do Inference

Everything is set up. Now, the only thing that remains is passing input to the previously loaded network and running inference.

output_layer_ir = compiled_model.output(0)

character_probabilities = compiled_model([Tensor(audio)])[output_layer_ir]

Read Output

After inference, you need to reach out the output. The default output format for quartznet 15x5 are per-frame probabilities (after LogSoftmax) for every symbol in the alphabet, name - output, shape - 1x64x29, output data format is BxNxC, where:

  • B - batch size

  • N - number of audio frames

  • C - alphabet size, including the Connectionist Temporal Classification (CTC) blank symbol

You need to make it in a more human-readable format. To do this you, use a symbol with the highest probability. When you hold a list of indexes that are predicted to have the highest probability, due to limitations given by Connectionist Temporal Classification Decoding you will remove concurrent symbols and then remove all the blanks.

The last step is getting symbols from corresponding indexes in charlist.

# Remove unnececery dimension
character_probabilities = np.squeeze(character_probabilities)

# Run argmax to pick most possible symbols
character_probabilities = np.argmax(character_probabilities, axis=1)

Implementation of Decoding

To decode previously explained output, you need the Connectionist Temporal Classification (CTC) decode function. This solution will remove consecutive letters from the output.

def ctc_greedy_decode(predictions):
    previous_letter_id = blank_id = len(alphabet) - 1
    transcription = list()
    for letter_index in predictions:
        if previous_letter_id != letter_index != blank_id:
            transcription.append(alphabet[letter_index])
        previous_letter_id = letter_index
    return ''.join(transcription)

Run Decoding and Print Output

transcription = ctc_greedy_decode(character_probabilities)
print(transcription)
from the edge to the cloud