Video Subtitle Generation using Whisper and OpenVINO™#

This Jupyter notebook can be launched on-line, opening an interactive environment in a browser window. You can also make a local installation. Choose one of the following options:

Google ColabGithub

Whisper is an automatic speech recognition (ASR) system trained on 680,000 hours of multilingual and multitask supervised data collected from the web. It is a multi-task model that can perform multilingual speech recognition as well as speech translation and language identification.

asr-training-data-desktop.svg

asr-training-data-desktop.svg#

You can find more information about this model in the research paper, OpenAI blog, model card and GitHub repository.

In this notebook, we will use Whisper with OpenVINO to generate subtitles in a sample video. Additionally, we will use NNCF improving model performance by INT8 quantization. Notebook contains the following steps: 1. Download the model. 2. Instantiate the PyTorch model pipeline. 3. Convert model to OpenVINO IR, using model conversion API. 4. Run the Whisper pipeline with OpenVINO models. 5. Quantize the OpenVINO model with NNCF. 6. Check quantized model result for the demo video. 7. Compare model size, performance and accuracy of FP32 and quantized INT8 models. 8. Launch Interactive demo for video subtitles generation.

Table of contents:

Installation Instructions#

This is a self-contained example that relies solely on its own code.

We recommend running the notebook in a virtual environment. You only need a Jupyter server to start. For details, please refer to Installation Guide.

Prerequisites#

Install dependencies.

%pip install -q "openvino>=2024.1.0" "nncf>=2.10.0"
%pip install -q "python-ffmpeg<=1.0.16" moviepy "onnx!=1.16.2" "git+https://github.com/huggingface/optimum-intel.git" "torch>=2.1" --extra-index-url https://download.pytorch.org/whl/cpu
%pip install -q "yt_dlp>=2024.8.6" soundfile librosa jiwer
%pip install -q  "gradio>=4.19"

Instantiate model#

Whisper is a Transformer based encoder-decoder model, also referred to as a sequence-to-sequence model. It maps a sequence of audio spectrogram features to a sequence of text tokens. First, the raw audio inputs are converted to a log-Mel spectrogram by action of the feature extractor. Then, the Transformer encoder encodes the spectrogram to form a sequence of encoder hidden states. Finally, the decoder autoregressively predicts text tokens, conditional on both the previous tokens and the encoder hidden states.

You can see the model architecture in the diagram below:

whisper_architecture.svg

whisper_architecture.svg#

There are several models of different sizes and capabilities trained by the authors of the model. In this tutorial, we will use the tiny model, but the same actions are also applicable to other models from Whisper family.

import ipywidgets as widgets

MODELS = [
    "openai/whisper-large-v3-turbo",
    "openai/whisper-large-v3",
    "openai/whisper-large-v2",
    "openai/whisper-large",
    "openai/whisper-medium",
    "openai/whisper-small",
    "openai/whisper-base",
    "openai/whisper-tiny",
]

model_id = widgets.Dropdown(
    options=list(MODELS),
    value="openai/whisper-tiny",
    description="Model:",
    disabled=False,
)

model_id
Dropdown(description='Model:', index=6, options=('openai/whisper-large-v3', 'openai/whisper-large-v2', 'openai…

Convert model to OpenVINO Intermediate Representation (IR) format using Optimum-Intel.#

The Hugging Face Optimum API is a high-level API that enables us to convert and quantize models from the Hugging Face Transformers library to the OpenVINO™ IR format. For more details, refer to the Hugging Face Optimum documentation.

Optimum Intel can be used to load optimized models from the Hugging Face Hub and create pipelines to run an inference with OpenVINO Runtime using Hugging Face APIs. The Optimum Inference models are API compatible with Hugging Face Transformers models. This means we just need to replace the AutoModelForXxx class with the corresponding OVModelForXxx class.

Below is an example of the whisper-tiny model

-from transformers import AutoModelForSpeechSeq2Seq
+from optimum.intel.openvino import OVModelForSpeechSeq2Seq
from transformers import AutoTokenizer, pipeline

model_id = "openai/whisper-tiny"
-model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id)
+model = OVModelForSpeechSeq2Seq.from_pretrained(model_id, export=True)

Model class initialization starts with calling the from_pretrained method. When downloading and converting the Transformers model, the parameter export=True should be added. We can save the converted model for the next usage with the save_pretrained method. Alternatively, model conversion can be performed using Optimum-CLI interface. You can find more details about Optimum-Intel and Optimum CLI usage in this tutorial. The command bellow illustrates how to convert whisper using optimum cli.

from pathlib import Path

model_dir = model_id.value.split("/")[-1]

if not Path(model_dir).exists():
    !optimum-cli export openvino -m {model_id.value} {model_dir} --weight-format fp16

Prepare inference pipeline#

The image below illustrates the pipeline of video transcribing using the Whisper model.

whisper_pipeline.png

whisper_pipeline.png#

Preprocessing and post-processing are important in this model use. transformers.AutoProcessor class used for initialization WhisperProcessor is responsible for preparing audio input data for the PyTorch model, converting it to Mel-spectrogram and decoding predicted output token_ids into string using tokenizer. Tokenizers and Processors are distributed with models also compatible with the OpenVINO model.

Like the original PyTorch model, the OpenVINO model is also compatible with HuggingFace pipeline interface for automatic-speech-recognition. Pipeline can be used for long audio transcription. Distil-Whisper uses a chunked algorithm to transcribe long-form audio files. In practice, this chunked long-form algorithm is 9x faster than the sequential algorithm proposed by OpenAI in the Whisper paper. To enable chunking, pass the chunk_length_s parameter to the pipeline. For Distil-Whisper, a chunk length of 15 seconds is optimal. To activate batching, pass the argument batch_size.

Select inference device#

select device from dropdown list for running inference using OpenVINO

import openvino as ov

core = ov.Core()
import requests

r = requests.get(
    url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/notebook_utils.py",
)
open("notebook_utils.py", "w").write(r.text)

from notebook_utils import device_widget

device = device_widget()

device
Dropdown(description='Device:', index=3, options=('CPU', 'GPU.0', 'GPU.1', 'AUTO'), value='AUTO')
from optimum.intel.openvino import OVModelForSpeechSeq2Seq
from transformers import AutoProcessor, pipeline

ov_model = OVModelForSpeechSeq2Seq.from_pretrained(model_dir, device=device.value)

processor = AutoProcessor.from_pretrained(model_dir)

pipe = pipeline(
    "automatic-speech-recognition",
    model=ov_model,
    chunk_length_s=30,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
)

Run video transcription pipeline#

Now, we are ready to start transcription. We select a video from YouTube that we want to transcribe. Be patient, as downloading the video may take some time.

import ipywidgets as widgets

VIDEO_LINK = "https://youtu.be/kgL5LBM-hFI"
link = widgets.Text(
    value=VIDEO_LINK,
    placeholder="Type link for video",
    description="Video:",
    disabled=False,
)

link
Text(value='https://youtu.be/kgL5LBM-hFI', description='Video:', placeholder='Type link for video')
from pathlib import Path
import yt_dlp

print(f"Downloading video {link.value} started")

output_file = Path("downloaded_video.mp4")
ydl_ops = {"format": "best[ext=mp4]", "outtmpl": output_file.as_posix()}
with yt_dlp.YoutubeDL(ydl_ops) as ydl:
    ydl.download(link.value)

print(f"Video saved to {output_file}")
Downloading video https://youtu.be/kgL5LBM-hFI started
Video saved to downloaded_video.mp4

Select the task for the model:

  • transcribe - generate audio transcription in the source language (automatically detected).

  • translate - generate audio transcription with translation to English language.

task = widgets.Select(
    options=["transcribe", "translate"],
    value="translate",
    description="Select task:",
    disabled=False,
)
task
Select(description='Select task:', index=1, options=('transcribe', 'translate'), value='translate')
from moviepy.editor import VideoFileClip
from transformers.pipelines.audio_utils import ffmpeg_read


def get_audio(video_file):
    """
    Extract audio signal from a given video file, then convert it to float,
    then mono-channel format and resample it to the expected sample rate

    Parameters:
        video_file: path to input video file
    Returns:
      resampled_audio: mono-channel float audio signal with 16000 Hz sample rate
                       extracted from video
      duration: duration of video fragment in seconds
    """
    input_video = VideoFileClip(str(video_file))
    duration = input_video.duration
    audio_file = video_file.stem + ".wav"
    input_video.audio.write_audiofile(audio_file, verbose=False, logger=None)
    with open(audio_file, "rb") as f:
        inputs = f.read()
    audio = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate)
    return {
        "raw": audio,
        "sampling_rate": pipe.feature_extractor.sampling_rate,
    }, duration
inputs, duration = get_audio(output_file)

transcription = pipe(inputs, generate_kwargs={"task": task.value}, return_timestamps=True)["chunks"]
import math


def format_timestamp(seconds: float):
    """
    format time in srt-file expected format
    """
    assert seconds >= 0, "non-negative timestamp expected"
    milliseconds = round(seconds * 1000.0)

    hours = milliseconds // 3_600_000
    milliseconds -= hours * 3_600_000

    minutes = milliseconds // 60_000
    milliseconds -= minutes * 60_000

    seconds = milliseconds // 1_000
    milliseconds -= seconds * 1_000

    return (f"{hours}:" if hours > 0 else "00:") + f"{minutes:02d}:{seconds:02d},{milliseconds:03d}"


def prepare_srt(transcription, filter_duration=None):
    """
    Format transcription into srt file format
    """
    segment_lines = []
    for idx, segment in enumerate(transcription):
        # for the case where the model could not predict an ending timestamp, which can happen if audio is cut off in the middle of a word.
        if segment["timestamp"][1] is None:
            segment["timestamp"] = (segment["timestamp"][0], filter_duration)

        if filter_duration is not None and (segment["timestamp"][0] >= math.floor(filter_duration) or segment["timestamp"][1] > math.ceil(filter_duration) + 1):
            break
        segment_lines.append(str(idx + 1) + "\n")
        time_start = format_timestamp(segment["timestamp"][0])
        time_end = format_timestamp(segment["timestamp"][1])
        time_str = f"{time_start} --> {time_end}\n"
        segment_lines.append(time_str)
        segment_lines.append(segment["text"] + "\n\n")
    return segment_lines

“The results will be saved in the downloaded_video.srt file. SRT is one of the most popular formats for storing subtitles and is compatible with many modern video players. This file can be used to embed transcription into videos during playback or by injecting them directly into video files using ffmpeg.

srt_lines = prepare_srt(transcription, filter_duration=duration)
# save transcription
with output_file.with_suffix(".srt").open("w") as f:
    f.writelines(srt_lines)

Now let us see the results.

widgets.Video.from_file(output_file, loop=False, width=800, height=800)
Video(value=b"x00x00x00x18ftypmp42x00x00x00x00isommp42x00x00:'moovx00x00x00lmvhd...", height='800…
print("".join(srt_lines))
1
00:00:00,000 --> 00:00:05,000
 Oh, what's that?

2
00:00:05,000 --> 00:00:08,000
 Oh, wow.

3
00:00:08,000 --> 00:00:10,000
 Hello, humans.

4
00:00:13,000 --> 00:00:15,000
 Focus on me.

5
00:00:15,000 --> 00:00:17,000
 Focus on the guard.

6
00:00:17,000 --> 00:00:20,000
 Don't tell anyone what you're seeing in here.

7
00:00:22,000 --> 00:00:24,000
 Have you seen what's in there?

8
00:00:24,000 --> 00:00:25,000
 They have intel.

9
00:00:25,000 --> 00:00:27,000
 This is where it all changes.

Quantization#

NNCF enables post-training quantization by adding the quantization layers into the model graph and then using a subset of the training dataset to initialize the parameters of these additional quantization layers. The framework is designed so that modifications to your original training code are minor.

The optimization process contains the following steps:

  1. Create a calibration dataset for quantization.

  2. Run nncf.quantize to obtain quantized encoder and decoder models.

  3. Serialize the INT8 model using openvino.save_model function.

Note: Quantization is time and memory consuming operation. Running quantization code below may take some time.

Please select below whether you would like to run Whisper quantization.

to_quantize = widgets.Checkbox(
    value=True,
    description="Quantization",
    disabled=False,
)

to_quantize
Checkbox(value=True, description='Quantization')
# Fetch `skip_kernel_extension` module
import requests

r = requests.get(
    url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/skip_kernel_extension.py",
)
open("skip_kernel_extension.py", "w").write(r.text)

ov_quantized_model = None

%load_ext skip_kernel_extension

Prepare calibration datasets#

First step is to prepare calibration datasets for quantization. Since we quantize whisper encoder and decoder separately, we need to prepare a calibration dataset for each of the models. We import an InferRequestWrapper class that will intercept model inputs and collect them to a list. Then we run model inference on some small amount of audio samples. Generally, increasing the calibration dataset size improves quantization quality.

%%skip not $to_quantize.value

from itertools import islice
from optimum.intel.openvino.quantization import InferRequestWrapper


def collect_calibration_dataset(ov_model: OVModelForSpeechSeq2Seq, calibration_dataset_size: int):
    # Overwrite model request properties, saving the original ones for restoring later
    encoder_calibration_data = []
    decoder_calibration_data = []
    ov_model.encoder.request = InferRequestWrapper(ov_model.encoder.request, encoder_calibration_data, apply_caching=True)
    ov_model.decoder_with_past.request = InferRequestWrapper(ov_model.decoder_with_past.request,
                                                             decoder_calibration_data,
                                                             apply_caching=True)

    pipe = pipeline(
      "automatic-speech-recognition",
      model=ov_model,
      chunk_length_s=30,
      tokenizer=processor.tokenizer,
      feature_extractor=processor.feature_extractor)
    try:
        calibration_dataset = dataset = load_dataset("openslr/librispeech_asr", "clean", split="validation", streaming=True, trust_remote_code=True)
        for sample in tqdm(islice(calibration_dataset, calibration_dataset_size), desc="Collecting calibration data",
                           total=calibration_dataset_size):
            pipe(sample["audio"], generate_kwargs={"task": task.value}, return_timestamps=True)
    finally:
        ov_model.encoder.request = ov_model.encoder.request.request
        ov_model.decoder_with_past.request = ov_model.decoder_with_past.request.request

    return encoder_calibration_data, decoder_calibration_data

Quantize Whisper encoder and decoder models#

Below we run the quantize function which calls nncf.quantize on Whisper encoder and decoder-with-past models. We don’t quantize first-step-decoder because its share in whole inference time is negligible.

%%skip not $to_quantize.value

import gc
import shutil
import nncf
from datasets import load_dataset
from tqdm.notebook import tqdm

def extract_input_features(sample):
    input_features = processor(
        sample["audio"]["array"],
        sampling_rate=sample["audio"]["sampling_rate"],
        return_tensors="pt",
    ).input_features
    return input_features



CALIBRATION_DATASET_SIZE = 50
quantized_model_path = Path(f"{model_dir}_quantized")


def quantize(ov_model: OVModelForSpeechSeq2Seq, calibration_dataset_size: int):
    if not quantized_model_path.exists():
        encoder_calibration_data, decoder_calibration_data = collect_calibration_dataset(
            ov_model, calibration_dataset_size
        )
        print("Quantizing encoder")
        quantized_encoder = nncf.quantize(
            ov_model.encoder.model,
            nncf.Dataset(encoder_calibration_data),
            subset_size=len(encoder_calibration_data),
            model_type=nncf.ModelType.TRANSFORMER,
            # Smooth Quant algorithm reduces activation quantization error; optimal alpha value was obtained through grid search
            advanced_parameters=nncf.AdvancedQuantizationParameters(smooth_quant_alpha=0.50)
        )
        ov.save_model(quantized_encoder, quantized_model_path / "openvino_encoder_model.xml")
        del quantized_encoder
        del encoder_calibration_data
        gc.collect()

        print("Quantizing decoder with past")
        quantized_decoder_with_past = nncf.quantize(
            ov_model.decoder_with_past.model,
            nncf.Dataset(decoder_calibration_data),
            subset_size=len(decoder_calibration_data),
            model_type=nncf.ModelType.TRANSFORMER,
            # Smooth Quant algorithm reduces activation quantization error; optimal alpha value was obtained through grid search
            advanced_parameters=nncf.AdvancedQuantizationParameters(smooth_quant_alpha=0.96)
        )
        ov.save_model(quantized_decoder_with_past, quantized_model_path / "openvino_decoder_with_past_model.xml")
        del quantized_decoder_with_past
        del decoder_calibration_data
        gc.collect()

        # Copy the config file and the first-step-decoder manually
        model_path = Path(model_dir)
        shutil.copy(model_path / "config.json", quantized_model_path / "config.json")
        shutil.copy(model_path / "generation_config.json", quantized_model_path / "generation_config.json")
        shutil.copy(model_path / "openvino_decoder_model.xml", quantized_model_path / "openvino_decoder_model.xml")
        shutil.copy(model_path / "openvino_decoder_model.bin", quantized_model_path / "openvino_decoder_model.bin")

    quantized_ov_model = OVModelForSpeechSeq2Seq.from_pretrained(quantized_model_path, compile=False)
    quantized_ov_model.to(device.value)
    quantized_ov_model.compile()
    return quantized_ov_model


ov_quantized_model = quantize(ov_model, CALIBRATION_DATASET_SIZE)
Collecting calibration data:   0%|          | 0/50 [00:00<?, ?it/s]
Output()
Quantizing encoder
Output()
INFO:nncf:12 ignored nodes were found by name in the NNCFGraph
INFO:nncf:16 ignored nodes were found by name in the NNCFGraph
Output()
Output()
Output()
Quantizing decoder with past
Output()
INFO:nncf:24 ignored nodes were found by name in the NNCFGraph
INFO:nncf:24 ignored nodes were found by name in the NNCFGraph
Output()
Output()
Compiling the encoder to AUTO ...
Compiling the decoder to AUTO ...
Compiling the decoder to AUTO ...

Run quantized model inference#

Let’s compare the transcription results for original and quantized models.

if ov_quantized_model is not None:
    int8_pipe = pipeline(
        "automatic-speech-recognition",
        model=ov_quantized_model,
        chunk_length_s=30,
        tokenizer=processor.tokenizer,
        feature_extractor=processor.feature_extractor,
    )
    inputs, duration = get_audio(output_file)
    transcription = int8_pipe(inputs, generate_kwargs={"task": task.value}, return_timestamps=True)["chunks"]
    srt_lines = prepare_srt(transcription, filter_duration=duration)
    print("".join(srt_lines))
    widgets.Video.from_file(output_file, loop=False, width=800, height=800)
1
00:00:00,000 --> 00:00:05,000
 What's that?

2
00:00:05,000 --> 00:00:07,000
 Oh, wow.

3
00:00:09,000 --> 00:00:11,000
 Hello humans.

4
00:00:14,000 --> 00:00:15,000
 Focus on me.

5
00:00:15,000 --> 00:00:16,000
 Focus on the guard.

6
00:00:18,000 --> 00:00:20,000
 Don't tell anyone what you're seen in here.

7
00:00:22,000 --> 00:00:24,000
 Have you seen what's in there?

8
00:00:24,000 --> 00:00:25,000
 They have intel.

9
00:00:25,000 --> 00:00:27,000
 This is where it all changes.

Compare performance and accuracy of the original and quantized models#

Finally, we compare original and quantized Whisper models from accuracy and performance stand-points.

To measure accuracy, we use 1 - WER as a metric, where WER stands for Word Error Rate.

When measuring inference time, we do it separately for encoder and decoder-with-past model forwards, and for the whole model inference too.

%%skip not $to_quantize.value

import time
from contextlib import contextmanager
from jiwer import wer, wer_standardize


TEST_DATASET_SIZE = 50
MEASURE_TIME = False

@contextmanager
def time_measurement():
    global MEASURE_TIME
    try:
        MEASURE_TIME = True
        yield
    finally:
        MEASURE_TIME = False

def time_fn(obj, fn_name, time_list):
    original_fn = getattr(obj, fn_name)

    def wrapper(*args, **kwargs):
        if not MEASURE_TIME:
            return original_fn(\*args, \*\*kwargs)
        start_time = time.perf_counter()
        result = original_fn(\*args, \*\*kwargs)
        end_time = time.perf_counter()
        time_list.append(end_time - start_time)
        return result

    setattr(obj, fn_name, wrapper)

def calculate_transcription_time_and_accuracy(ov_model, test_samples):
    encoder_infer_times = []
    decoder_with_past_infer_times = []
    whole_infer_times = []
    time_fn(ov_model, "generate", whole_infer_times)
    time_fn(ov_model.encoder, "forward", encoder_infer_times)
    time_fn(ov_model.decoder_with_past, "forward", decoder_with_past_infer_times)

    ground_truths = []
    predictions = []
    for data_item in tqdm(test_samples, desc="Measuring performance and accuracy"):
        input_features = extract_input_features(data_item)

        with time_measurement():
            predicted_ids = ov_model.generate(input_features)
        transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)

        ground_truths.append(data_item["text"])
        predictions.append(transcription[0])

    word_accuracy = (1 - wer(ground_truths, predictions, reference_transform=wer_standardize,
                             hypothesis_transform=wer_standardize)) * 100
    mean_whole_infer_time = sum(whole_infer_times)
    mean_encoder_infer_time = sum(encoder_infer_times)
    mean_decoder_with_time_infer_time = sum(decoder_with_past_infer_times)
    return word_accuracy, (mean_whole_infer_time, mean_encoder_infer_time, mean_decoder_with_time_infer_time)

test_dataset = load_dataset("openslr/librispeech_asr", "clean", split="validation", streaming=True, trust_remote_code=True)
test_dataset = test_dataset.shuffle(seed=42).take(TEST_DATASET_SIZE)
test_samples = [sample for sample in test_dataset]

accuracy_original, times_original = calculate_transcription_time_and_accuracy(ov_model, test_samples)
accuracy_quantized, times_quantized = calculate_transcription_time_and_accuracy(ov_quantized_model, test_samples)
print(f"Encoder performance speedup: {times_original[1] / times_quantized[1]:.3f}")
print(f"Decoder with past performance speedup: {times_original[2] / times_quantized[2]:.3f}")
print(f"Whole pipeline performance speedup: {times_original[0] / times_quantized[0]:.3f}")
print(f"Whisper transcription word accuracy. Original model: {accuracy_original:.2f}%. Quantized model: {accuracy_quantized:.2f}%.")
print(f"Accuracy drop: {accuracy_original - accuracy_quantized:.2f}%.")
Measuring performance and accuracy:   0%|          | 0/50 [00:00<?, ?it/s]
Measuring performance and accuracy:   0%|          | 0/50 [00:00<?, ?it/s]
Encoder performance speedup: 1.352
Decoder with past performance speedup: 1.342
Whole pipeline performance speedup: 1.350
Whisper transcription word accuracy. Original model: 81.67%. Quantized model: 83.67%.
Accuracy drop: -1.99%.

Interactive demo#

def transcribe(url, task, use_int8):
    output_file = Path("downloaded_video.mp4")
    ydl_ops = {"format": "best[ext=mp4]", "outtmpl": output_file.as_posix()}
    with yt_dlp.YoutubeDL(ydl_ops) as ydl:
        ydl.download(link.value)
    inputs, duration = get_audio(output_file)
    m_pipe = int8_pipe if use_int8 else pipe
    transcription = m_pipe(inputs, generate_kwargs={"task": task.lower()}, return_timestamps=True)["chunks"]
    srt_lines = prepare_srt(transcription, duration)
    with output_file.with_suffix(".srt").open("w") as f:
        f.writelines(srt_lines)
    return [str(output_file), str(output_file.with_suffix(".srt"))]


if not Path("gradio_helper.py").exists():
    r = requests.get(url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/notebooks/whisper-subtitles-generation/gradio_helper.py")
    open("gradio_helper.py", "w").write(r.text)

from gradio_helper import make_demo

demo = make_demo(fn=transcribe, quantized=ov_quantized_model is not None)

try:
    demo.launch(debug=False)
except Exception:
    demo.launch(share=True, debug=False)
# if you are launching remotely, specify server_name and server_port
# demo.launch(server_name='your server name', server_port='server port in int')
# Read more in the docs: https://gradio.app/docs/