Video Super Resolution with OpenVINO™

This Jupyter notebook can be launched on-line, opening an interactive environment in a browser window. You can also make a local installation. Choose one of the following options:

BinderGoogle ColabGithub

Super Resolution is the process of enhancing the quality of an image by increasing the pixel count using deep learning. This notebook applies Single Image Super Resolution (SISR) to frames in a 360p (480×360) video in 360p resolution. A model called single-image-super-resolution-1032, which is available in Open Model Zoo, is used in this tutorial. It is based on the research paper cited below.

Y. Liu et al., “An Attention-Based Approach for Single Image Super Resolution,” 2018 24th International Conference on Pattern Recognition (ICPR), 2018, pp. 2777-2784, doi: 10.1109/ICPR.2018.8545760.

NOTE: The Single Image Super Resolution (SISR) model used in this demo is not optimized for a video. Results may vary depending on the video.

Table of contents:

Preparation

Install requirements

%pip install -q "openvino>=2023.1.0"
%pip install -q opencv-python
%pip install -q "pytube>=12.1.0"
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.

Imports

import time
from pathlib import Path

import cv2
import numpy as np
from IPython.display import (
    HTML,
    FileLink,
    Pretty,
    ProgressBar,
    Video,
    clear_output,
    display,
)
import openvino as ov
from pytube import YouTube
# Define a download file helper function
def download_file(url: str, path: Path) -> None:
    """Download file."""
    import urllib.request
    path.parent.mkdir(parents=True, exist_ok=True)
    urllib.request.urlretrieve(url, path)

Settings

Select inference device

select device from dropdown list for running inference using OpenVINO

import ipywidgets as widgets

core = ov.Core()
device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value='AUTO',
    description='Device:',
    disabled=False,
)

device
Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')
# 1032: 4x superresolution, 1033: 3x superresolution
model_name = 'single-image-super-resolution-1032'

base_model_dir = Path('./model').expanduser()

model_xml_name = f'{model_name}.xml'
model_bin_name = f'{model_name}.bin'

model_xml_path = base_model_dir / model_xml_name
model_bin_path = base_model_dir / model_bin_name

if not model_xml_path.exists():
    base_url = f'https://storage.openvinotoolkit.org/repositories/open_model_zoo/2023.0/models_bin/1/{model_name}/FP16/'
    model_xml_url = base_url + model_xml_name
    model_bin_url = base_url + model_bin_name

    download_file(model_xml_url, model_xml_path)
    download_file(model_bin_url, model_bin_path)
else:
    print(f'{model_name} already downloaded to {base_model_dir}')
single-image-super-resolution-1032 already downloaded to model
def convert_result_to_image(result) -> np.ndarray:
    """
    Convert network result of floating point numbers to image with integer
    values from 0-255. Values outside this range are clipped to 0 and 255.

    :param result: a single superresolution network result in N,C,H,W shape
    """
    result = result.squeeze(0).transpose(1, 2, 0)
    result *= 255
    result[result < 0] = 0
    result[result > 255] = 255
    result = result.astype(np.uint8)
    return result

Load the Superresolution Model

Load the model in OpenVINO Runtime with core.read_model and compile it for the specified device with core.compile_model.

core = ov.Core()
model = core.read_model(model=model_xml_path)
compiled_model = core.compile_model(model=model, device_name=device.value)

Get information about network inputs and outputs. The Super Resolution model expects two inputs: the input image and a bicubic interpolation of the input image to the target size of 1920x1080. It returns the super resolution version of the image in 1920x1080.

# Network inputs and outputs are dictionaries. Get the keys for the
# dictionaries.
original_image_key, bicubic_image_key = compiled_model.inputs
output_key = compiled_model.output(0)

# Get the expected input and target shape. The `.dims[2:]` function returns the height
# and width.The `resize` function of  OpenCV expects the shape as (width, height),
# so reverse the shape with `[::-1]` and convert it to a tuple.
input_height, input_width = list(original_image_key.shape)[2:]
target_height, target_width = list(bicubic_image_key.shape)[2:]

upsample_factor = int(target_height / input_height)

print(f"The network expects inputs with a width of {input_width}, " f"height of {input_height}")
print(f"The network returns images with a width of {target_width}, " f"height of {target_height}")

print(
    f"The image sides are upsampled by a factor of {upsample_factor}. "
    f"The new image is {upsample_factor**2} times as large as the "
    "original image"
)
The network expects inputs with a width of 480, height of 270
The network returns images with a width of 1920, height of 1080
The image sides are upsampled by a factor of 4. The new image is 16 times as large as the original image

Superresolution on Video

Download a YouTube video with PyTube and enhance the video quality with superresolution.

By default, only the first 100 frames of the video are processed. Change NUM_FRAMES in the cell below to modify this.

NOTE: The resulting video does not contain audio. The input video should be a landscape video and have an input resolution of 360p (640x360) for the 1032 model, or 480p (720x480) for the 1033 model.

Settings

OUTPUT_DIR = "output"

Path(OUTPUT_DIR).mkdir(exist_ok=True)
# Maximum number of frames to read from the input video. Set to 0 to read all frames.
NUM_FRAMES = 100
# The format for saving the result videos. The `vp09` codec is slow, but widely available.
# If you have FFMPEG installed, you can change FOURCC to `*"THEO"` to improve video writing speed.
FOURCC = cv2.VideoWriter_fourcc(*"vp09")

Download and Prepare Video

# Use pytube to download a video. It downloads to the videos subdirectory.
# You can also place a local video there and comment out the following lines
VIDEO_URL = "https://www.youtube.com/watch?v=V8yS3WIkOrA"
yt = YouTube(VIDEO_URL)
# Use `yt.streams` to see all available streams. See the PyTube documentation
# https://python-pytube.readthedocs.io/en/latest/api.html for advanced
# filtering options
stream = yt.streams.filter(resolution="360p").first()
filename = Path(stream.default_filename.encode("ascii", "ignore").decode("ascii")).stem
stream.download(output_path=OUTPUT_DIR, filename=filename)
print(f"Video {filename} downloaded to {OUTPUT_DIR}")

# Create Path objects for the input video and the resulting videos.
video_path = Path(stream.get_file_path(filename, OUTPUT_DIR))

# Path names for the result videos.
superres_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_superres.mp4")
bicubic_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_bicubic.mp4")
comparison_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_superres_comparison.mp4")
Video Leading Intel with CEO Pat Gelsinger downloaded to output
# Open the video and get the dimensions and the FPS.
cap = cv2.VideoCapture(filename=str(video_path))
ret, image = cap.read()
if not ret:
    raise ValueError(f"The video at '{video_path}' cannot be read.")
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)

if NUM_FRAMES == 0:
    total_frames = frame_count
else:
    total_frames = min(frame_count, NUM_FRAMES)

original_frame_height, original_frame_width = image.shape[:2]

cap.release()
print(
    f"The input video has a frame width of {original_frame_width}, "
    f"frame height of {original_frame_height} and runs at {fps:.2f} fps"
)
The input video has a frame width of 640, frame height of 360 and runs at 29.97 fps

Create a superresolution video, a bicubic video and a comparison video. The superresolution video contains the enhanced video, upsampled with superresolution, the bicubic video is the input video upsampled with bicubic interpolation, the comparison video sets the bicubic video and the superresolution side by side.

superres_video = cv2.VideoWriter(
    filename=str(superres_video_path),
    fourcc=FOURCC,
    fps=fps,
    frameSize=(target_width, target_height),
)
bicubic_video = cv2.VideoWriter(
    filename=str(bicubic_video_path),
    fourcc=FOURCC,
    fps=fps,
    frameSize=(target_width, target_height),
)
comparison_video = cv2.VideoWriter(
    filename=str(comparison_video_path),
    fourcc=FOURCC,
    fps=fps,
    frameSize=(target_width * 2, target_height),
)

Do Inference

Read video frames and enhance them with superresolution. Save the superresolution video, the bicubic video and the comparison video to a file.

The code below reads the video frame by frame. Each frame is resized and reshaped to the network input shape and upsampled with bicubic interpolation to the target shape. Both the original and the bicubic images are propagated through the network. The network result is a numpy array with floating point values, with a shape of (1,3,1920,1080). This array is converted to an 8-bit image with the (1080,1920,3) shape and written to a superres_video. The bicubic image is written to a bicubic_video for comparison. Finally, the bicubic and result frames are combined side by side and written to a comparison_video. A progress bar shows the progress of the process. Both inference time and total time to process each frame are measured. That also includes inference time as well as the time it takes to process and write the video.

start_time = time.perf_counter()
frame_nr = 0
total_inference_duration = 0

progress_bar = ProgressBar(total=total_frames)
progress_bar.display()

cap = cv2.VideoCapture(filename=str(video_path))
try:
    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            cap.release()
            break

        if frame_nr >= total_frames:
            break

        # Resize the input image to the network shape and convert it from (H,W,C) to
        # (N,C,H,W).
        resized_image = cv2.resize(src=image, dsize=(input_width, input_height))
        input_image_original = np.expand_dims(resized_image.transpose(2, 0, 1), axis=0)

        # Resize and reshape the image to the target shape with bicubic
        # interpolation.
        bicubic_image = cv2.resize(
            src=image, dsize=(target_width, target_height), interpolation=cv2.INTER_CUBIC
        )
        input_image_bicubic = np.expand_dims(bicubic_image.transpose(2, 0, 1), axis=0)

        # Do inference.
        inference_start_time = time.perf_counter()
        result = compiled_model(
            {
                original_image_key.any_name: input_image_original,
                bicubic_image_key.any_name: input_image_bicubic,
            }
        )[output_key]
        inference_stop_time = time.perf_counter()
        inference_duration = inference_stop_time - inference_start_time
        total_inference_duration += inference_duration

        # Transform the inference result into an image.
        result_frame = convert_result_to_image(result=result)

        # Write the result image and the bicubic image to a video file.
        superres_video.write(image=result_frame)
        bicubic_video.write(image=bicubic_image)

        stacked_frame = np.hstack((bicubic_image, result_frame))
        comparison_video.write(image=stacked_frame)

        frame_nr = frame_nr + 1

        # Update the progress bar and the status message.
        progress_bar.progress = frame_nr
        progress_bar.update()
        if frame_nr % 10 == 0 or frame_nr == total_frames:
            clear_output(wait=True)
            progress_bar.display()
            display(
                Pretty(
                    f"Processed frame {frame_nr}. Inference time: "
                    f"{inference_duration:.2f} seconds "
                    f"({1/inference_duration:.2f} FPS)"
                )
            )


except KeyboardInterrupt:
    print("Processing interrupted.")
finally:
    superres_video.release()
    bicubic_video.release()
    comparison_video.release()
    end_time = time.perf_counter()
    duration = end_time - start_time
    print(f"Video's saved to {comparison_video_path.parent} directory.")
    print(
        f"Processed {frame_nr} frames in {duration:.2f} seconds. Total FPS "
        f"(including video processing): {frame_nr/duration:.2f}. "
        f"Inference FPS: {frame_nr/total_inference_duration:.2f}."
    )
Processed frame 100. Inference time: 0.06 seconds (17.00 FPS)
Video's saved to output directory.
Processed 100 frames in 243.08 seconds. Total FPS (including video processing): 0.41. Inference FPS: 17.69.

Show Side-by-Side Video of Bicubic and Superresolution Version

if not comparison_video_path.exists():
    raise ValueError("The comparison video does not exist.")
else:
    video_link = FileLink(comparison_video_path)
    video_link.html_link_str = "<a href='%s' download>%s</a>"
    display(
        HTML(
            f"Showing side by side comparison. If you cannot see the video in "
            "your browser, please click on the following link to download "
            f"the video<br>{video_link._repr_html_()}"
        )
    )
    display(Video(comparison_video_path, width=800, embed=True))
Showing side by side comparison. If you cannot see the video in your browser, please click on the following link to download the video
output/Leading Intel with CEO Pat Gelsinger_superres_comparison.mp4