Video Super Resolution with OpenVINO

This tutorial is also available as a Jupyter notebook that can be cloned directly from GitHub. See the installation guide for instructions to run this tutorial locally on Windows, Linux or macOS. To run without installing anything, click the launch binder button.

Binder Github

Super Resolution is the process of enhancing the quality of an image by increasing the pixel count using deep learning. This notebook applies Single Image Super Resolution (SISR) to frames in a 360p (480×360) video in 360p resolution. We use a model called single-image-super-resolution-1032 which is available from the Open Model Zoo. It is based on the research paper cited below.

Y. Liu et al., “An Attention-Based Approach for Single Image Super Resolution,” 2018 24th International Conference on Pattern Recognition (ICPR), 2018, pp. 2777-2784, doi: 10.1109/ICPR.2018.8545760.

NOTE: The Single Image Super Resolution (SISR) model used in this demo is not optimized for video. Results may vary depending on the video.

Preparation

Preparation

Imports

import time
import urllib
from pathlib import Path

import cv2
import numpy as np
from IPython.display import (
    HTML,
    FileLink,
    Pretty,
    ProgressBar,
    Video,
    clear_output,
    display,
)
from openvino.runtime import Core
from pytube import YouTube

Settings

# Device to use for inference. For example, "CPU", or "GPU"
DEVICE = "CPU"
# 1032: 4x superresolution, 1033: 3x superresolution
MODEL_FILE = "model/single-image-super-resolution-1032.xml"
model_name = Path(MODEL_FILE).name
model_xml_path = Path(MODEL_FILE).with_suffix(".xml")

Functions

def write_text_on_image(image: np.ndarray, text: str) -> np.ndarray:
    """
    Write the specified text in the top left corner of the image
    as white text with a black border.

    :param image: image as numpy array with HWC shape, RGB or BGR
    :param text: text to write
    :return: image with written text, as numpy array
    """
    font = cv2.FONT_HERSHEY_PLAIN
    org = (20, 20)
    font_scale = 4
    font_color = (255, 255, 255)
    line_type = 1
    font_thickness = 2
    text_color_bg = (0, 0, 0)
    x, y = org

    image = cv2.UMat(image)
    (text_w, text_h), _ = cv2.getTextSize(
        text=text, fontFace=font, fontScale=font_scale, thickness=font_thickness
    )
    result_im = cv2.rectangle(
        img=image, pt1=org, pt2=(x + text_w, y + text_h), color=text_color_bg, thickness=-1
    )

    textim = cv2.putText(
        img=result_im,
        text=text,
        org=(x, y + text_h + font_scale - 1),
        fontFace=font,
        fontScale=font_scale,
        color=font_color,
        thickness=font_thickness,
        lineType=line_type,
    )
    return textim.get()


def load_image(path: str) -> np.ndarray:
    """
    Loads an image from `path` and returns it as BGR numpy array.

    :param path: path to an image filename or url
    :return: image as numpy array, with BGR channel order
    """
    if path.startswith("http"):
        # Set User-Agent to Mozilla because some websites block requests
        # with User-Agent Python
        request = urllib.request.Request(url=path, headers={"User-Agent": "Mozilla/5.0"})
        response = urllib.request.urlopen(url=request)
        array = np.asarray(bytearray(response.read()), dtype="uint8")
        image = cv2.imdecode(buf=array, flags=-1)  # Loads the image as BGR
    else:
        image = cv2.imread(filename=path)
    return image


def convert_result_to_image(result) -> np.ndarray:
    """
    Convert network result of floating point numbers to image with integer
    values from 0-255. Values outside this range are clipped to 0 and 255.

    :param result: a single superresolution network result in N,C,H,W shape
    """
    result = result.squeeze(0).transpose(1, 2, 0)
    result *= 255
    result[result < 0] = 0
    result[result > 255] = 255
    result = result.astype(np.uint8)
    return result

Load the Superresolution Model

Load the model in Inference Engine with ie.read_model and compile it for the specified device with ie.compile_model.

ie = Core()
model = ie.read_model(model=model_xml_path)
compiled_model = ie.compile_model(model=model, device_name=DEVICE)

Get information about network inputs and outputs. The Super Resolution model expects two inputs: 1) the input image, 2) a bicubic interpolation of the input image to the target size 1920x1080. It returns the super resolution version of the image in 1920x1080.

# Network inputs and outputs are dictionaries. Get the keys for the
# dictionaries.
original_image_key, bicubic_image_key = compiled_model.inputs
output_key = compiled_model.output(0)

# Get the expected input and target shape. `.dims[2:]` returns the height
# and width. OpenCV's resize function expects the shape as (width, height),
# so we reverse the shape with `[::-1]` and convert it to a tuple
input_height, input_width = list(original_image_key.shape)[2:]
target_height, target_width = list(bicubic_image_key.shape)[2:]

upsample_factor = int(target_height / input_height)

print(f"The network expects inputs with a width of {input_width}, " f"height of {input_height}")
print(f"The network returns images with a width of {target_width}, " f"height of {target_height}")

print(
    f"The image sides are upsampled by a factor {upsample_factor}. "
    f"The new image is {upsample_factor**2} times as large as the "
    "original image"
)
The network expects inputs with a width of 480, height of 270
The network returns images with a width of 1920, height of 1080
The image sides are upsampled by a factor 4. The new image is 16 times as large as the original image

Superresolution on Video

Download a YouTube* video with PyTube and enhance the video quality with superresolution.

By default only the first 100 frames of the video are processed. Change NUM_FRAMES in the cell below to modify this.

Note: - The resulting video does not contain audio. - The input video should be a landscape video and have an input resolution of 360p (640x360) for the 1032 model, or 480p (720x480) for the 1033 model.

Settings

VIDEO_DIR = "data"
OUTPUT_DIR = "output"

Path(OUTPUT_DIR).mkdir(exist_ok=True)
# Maximum number of frames to read from the input video. Set to 0 to read all frames.
NUM_FRAMES = 100
# The format for saving the result videos. vp09 is slow, but widely available.
# If you have FFMPEG installed, you can change FOURCC to `*"THEO"` to improve video writing speed.
FOURCC = cv2.VideoWriter_fourcc(*"vp09")

Download and Prepare Video

# Use pytube to download a video. It downloads to the videos subdirectory.
# You can also place a local video there and comment out the following lines
VIDEO_URL = "https://www.youtube.com/watch?v=V8yS3WIkOrA"
yt = YouTube(VIDEO_URL)
# Use `yt.streams` to see all available streams. See the PyTube documentation
# https://python-pytube.readthedocs.io/en/latest/api.html for advanced
# filtering options
try:
    Path(VIDEO_DIR).mkdir(exist_ok=True)
    stream = yt.streams.filter(resolution="360p").first()
    filename = Path(stream.default_filename.encode("ascii", "ignore").decode("ascii")).stem
    stream.download(output_path=OUTPUT_DIR, filename=filename)
    print(f"Video {filename} downloaded to {OUTPUT_DIR}")

    # Create Path objects for the input video and the resulting videos
    video_path = Path(stream.get_file_path(filename, OUTPUT_DIR))
except Exception:
    # If PyTube fails, use a local video stored in the VIDEO_DIR directory
    video_path = Path(rf"{VIDEO_DIR}/CEO Pat Gelsinger on Leading Intel.mp4")

# Path names for the result videos
superres_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_superres.mp4")
bicubic_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_bicubic.mp4")
comparison_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_superres_comparison.mp4")
Video Leading Intel with CEO Pat Gelsinger downloaded to output
# Open the video and get the dimensions and the FPS
cap = cv2.VideoCapture(filename=str(video_path))
ret, image = cap.read()
if not ret:
    raise ValueError(f"The video at '{video_path}' cannot be read.")
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)

if NUM_FRAMES == 0:
    total_frames = frame_count
else:
    total_frames = min(frame_count, NUM_FRAMES)

original_frame_height, original_frame_width = image.shape[:2]

cap.release()
print(
    f"The input video has a frame width of {original_frame_width}, "
    f"frame height of {original_frame_height} and runs at {fps:.2f} fps"
)
The input video has a frame width of 640, frame height of 360 and runs at 29.97 fps

Create superresolution video, bicubic video and comparison video. The superresolution video contains the enhanced video, upsampled with superresolution, the bicubic video is the input video upsampled with bicubic interpolation, the combination video sets the bicubic video and the superresolution side by side.

superres_video = cv2.VideoWriter(
    filename=str(superres_video_path),
    fourcc=FOURCC,
    fps=fps,
    frameSize=(target_width, target_height),
)
bicubic_video = cv2.VideoWriter(
    filename=str(bicubic_video_path),
    fourcc=FOURCC,
    fps=fps,
    frameSize=(target_width, target_height),
)
comparison_video = cv2.VideoWriter(
    filename=str(comparison_video_path),
    fourcc=FOURCC,
    fps=fps,
    frameSize=(target_width * 2, target_height),
)

Do Inference

Read video frames and enhance them with superresolution. Save the superresolution video, the bicubic video and the comparison video to file.

The code in this cell reads the video frame by frame. Each frame is resized and reshaped to network input shape and upsampled with bicubic interpolation to target shape. Both the original and the bicubic image are propagated through the network. The network result is a numpy array with floating point values, with a shape of (1,3,1920,1080). This array is converted to an 8-bit image with shape (1080,1920,3) and written to superres_video. The bicubic image is written to bicubic_video for comparison. Lastly, the bicubic and result frames are combined side by side and written to comparison_video. A progress bar shows the progress of the process. Inference time is measured, as well as total time to process each frame, which includes inference time as well as the time it takes to process and write the video.

start_time = time.perf_counter()
frame_nr = 0
total_inference_duration = 0

progress_bar = ProgressBar(total=total_frames)
progress_bar.display()

cap = cv2.VideoCapture(filename=str(video_path))
try:
    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            cap.release()
            break

        if frame_nr >= total_frames:
            break

        # Resize the input image to network shape and convert from (H,W,C) to
        # (N,C,H,W)
        resized_image = cv2.resize(src=image, dsize=(input_width, input_height))
        input_image_original = np.expand_dims(resized_image.transpose(2, 0, 1), axis=0)

        # Resize and reshape the image to the target shape with bicubic
        # interpolation
        bicubic_image = cv2.resize(
            src=image, dsize=(target_width, target_height), interpolation=cv2.INTER_CUBIC
        )
        input_image_bicubic = np.expand_dims(bicubic_image.transpose(2, 0, 1), axis=0)

        # Do inference
        inference_start_time = time.perf_counter()
        result = compiled_model(
            {
                original_image_key.any_name: input_image_original,
                bicubic_image_key.any_name: input_image_bicubic,
            }
        )[output_key]
        inference_stop_time = time.perf_counter()
        inference_duration = inference_stop_time - inference_start_time
        total_inference_duration += inference_duration

        # Transform inference result into an image
        result_frame = convert_result_to_image(result=result)

        # Write resulting image and bicubic image to video
        superres_video.write(image=result_frame)
        bicubic_video.write(image=bicubic_image)

        stacked_frame = np.hstack((bicubic_image, result_frame))
        comparison_video.write(image=stacked_frame)

        frame_nr = frame_nr + 1

        # Update progress bar and status message
        progress_bar.progress = frame_nr
        progress_bar.update()
        if frame_nr % 10 == 0 or frame_nr == total_frames:
            clear_output(wait=True)
            progress_bar.display()
            display(
                Pretty(
                    f"Processed frame {frame_nr}. Inference time: "
                    f"{inference_duration:.2f} seconds "
                    f"({1/inference_duration:.2f} FPS)"
                )
            )


except KeyboardInterrupt:
    print("Processing interrupted.")
finally:
    superres_video.release()
    bicubic_video.release()
    comparison_video.release()
    end_time = time.perf_counter()
    duration = end_time - start_time
    print(f"Video's saved to {comparison_video_path.parent} directory.")
    print(
        f"Processed {frame_nr} frames in {duration:.2f} seconds. Total FPS "
        f"(including video processing): {frame_nr/duration:.2f}. "
        f"Inference FPS: {frame_nr/total_inference_duration:.2f}."
    )
Processed frame 100. Inference time: 0.12 seconds (8.42 FPS)
Video's saved to output directory.
Processed 100 frames in 245.67 seconds. Total FPS (including video processing): 0.41. Inference FPS: 8.29.

Show Side-by-Side Video of Bicubic and Superresolution Version

if not comparison_video_path.exists():
    raise ValueError("The comparison video does not exist.")
else:
    video_link = FileLink(comparison_video_path)
    video_link.html_link_str = "<a href='%s' download>%s</a>"
    display(
        HTML(
            f"Showing side by side comparison. If you cannot see the video in "
            "your browser, please click on the following link to download "
            f"the video<br>{video_link._repr_html_()}"
        )
    )
    display(Video(comparison_video_path, width=800, embed=True))
Showing side by side comparison. If you cannot see the video in your browser, please click on the following link to download the video
output/Leading Intel with CEO Pat Gelsinger_superres_comparison.mp4