Magika: AI powered fast and efficient file type identification using OpenVINO

This Jupyter notebook can be launched on-line, opening an interactive environment in a browser window. You can also make a local installation. Choose one of the following options:

BinderGoogle ColabGithub

Magika is a novel AI powered file type detection tool that relies on the recent advance of deep learning to provide accurate detection. Under the hood, Magika employs a custom, highly optimized model that only weighs about 1MB, and enables precise file identification within milliseconds, even when running on a single CPU.

Why identifying file type is difficult

Since the early days of computing, accurately detecting file types has been crucial in determining how to process files. Linux comes equipped with libmagic and the file utility, which have served as the de facto standard for file type identification for over 50 years. Today web browsers, code editors, and countless other software rely on file-type detection to decide how to properly render a file. For example, modern code editors use file-type detection to choose which syntax coloring scheme to use as the developer starts typing in a new file.

Accurate file-type detection is a notoriously difficult problem because each file format has a different structure, or no structure at all. This is particularly challenging for textual formats and programming languages as they have very similar constructs. So far, libmagic and most other file-type-identification software have been relying on a handcrafted collection of heuristics and custom rules to detect each file format.

This manual approach is both time consuming and error prone as it is hard for humans to create generalized rules by hand. In particular for security applications, creating dependable detection is especially challenging as attackers are constantly attempting to confuse detection with adversarially-crafted payloads.

To address this issue and provide fast and accurate file-type detection Magika was developed. More details about approach and model can be found in original repo and Google’s blog post.

In this tutorial we consider how to bring OpenVINO power into Magika.

Prerequisites

## Prerequisites

%pip install -q magika "openvino>=2024.1.0" "gradio>=4.19"
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tensorflow 2.12.0 requires numpy<1.24,>=1.22, but you have numpy 1.24.4 which is incompatible.
Note: you may need to restart the kernel to use updated packages.

Define model loading class

At inference time Magika uses ONNX as an inference engine to ensure files are identified in a matter of milliseconds, almost as fast as a non-AI tool even on CPU. The code below extending original Magika inference class with OpenVINO API. The provided code is fully compatible with original Magika Python API.

import time
from pathlib import Path
from functools import partial
from typing import List, Tuple, Optional, Dict

from magika import Magika
from magika.types import ModelFeatures, ModelOutput, MagikaResult
from magika.prediction_mode import PredictionMode
import numpy.typing as npt
import numpy as np

import openvino as ov


class OVMagika(Magika):
    def __init__(
        self,
        model_dir: Optional[Path] = None,
        prediction_mode: PredictionMode = PredictionMode.HIGH_CONFIDENCE,
        no_dereference: bool = False,
        verbose: bool = False,
        debug: bool = False,
        use_colors: bool = False,
        device="CPU",
    ) -> None:
        self._device = device
        super().__init__(model_dir, prediction_mode, no_dereference, verbose, debug, use_colors)

    def _init_onnx_session(self):
        # overload model loading using OpenVINO
        start_time = time.time()
        core = ov.Core()
        ov_model = core.compile_model(self._model_path, self._device.upper())
        elapsed_time = 1000 * (time.time() - start_time)
        self._log.debug(f'ONNX DL model "{self._model_path}" loaded in {elapsed_time:.03f} ms on {self._device}')
        return ov_model

    def _get_raw_predictions(self, features: List[Tuple[Path, ModelFeatures]]) -> npt.NDArray:
        """
        Given a list of (path, features), return a (files_num, features_size)
        matrix encoding the predictions.
        """

        dataset_format = self._model_config["train_dataset_info"]["dataset_format"]
        assert dataset_format == "int-concat/one-hot"
        start_time = time.time()
        X_bytes = []
        for _, fs in features:
            sample_bytes = []
            if self._input_sizes["beg"] > 0:
                sample_bytes.extend(fs.beg[: self._input_sizes["beg"]])
            if self._input_sizes["mid"] > 0:
                sample_bytes.extend(fs.mid[: self._input_sizes["mid"]])
            if self._input_sizes["end"] > 0:
                sample_bytes.extend(fs.end[-self._input_sizes["end"] :])
            X_bytes.append(sample_bytes)
        X = np.array(X_bytes).astype(np.float32)
        elapsed_time = time.time() - start_time
        self._log.debug(f"DL input prepared in {elapsed_time:.03f} seconds")

        start_time = time.time()
        raw_predictions_list = []
        samples_num = X.shape[0]

        max_internal_batch_size = 1000
        batches_num = samples_num // max_internal_batch_size
        if samples_num % max_internal_batch_size != 0:
            batches_num += 1

        for batch_idx in range(batches_num):
            self._log.debug(f"Getting raw predictions for (internal) batch {batch_idx+1}/{batches_num}")
            start_idx = batch_idx * max_internal_batch_size
            end_idx = min((batch_idx + 1) * max_internal_batch_size, samples_num)
            batch_raw_predictions = self._onnx_session({"bytes": X[start_idx:end_idx, :]})["target_label"]
            raw_predictions_list.append(batch_raw_predictions)
        elapsed_time = time.time() - start_time
        self._log.debug(f"DL raw prediction in {elapsed_time:.03f} seconds")
        return np.concatenate(raw_predictions_list)

    def _get_topk_model_outputs_from_features(self, all_features: List[Tuple[Path, ModelFeatures]], k: int = 5) -> List[Tuple[Path, List[ModelOutput]]]:
        """
        Helper function for getting top k the highest ranked model results for each feature
        """
        raw_preds = self._get_raw_predictions(all_features)
        top_preds_idxs = np.argsort(raw_preds, axis=1)[:, -k:][:, ::-1]
        scores = [raw_preds[i, idx] for i, idx in enumerate(top_preds_idxs)]
        results = []
        for (path, _), scores, top_idxes in zip(all_features, raw_preds, top_preds_idxs):
            model_outputs_for_path = []
            for idx in top_idxes:
                ct_label = self._target_labels_space_np[idx]
                score = scores[idx]
                model_outputs_for_path.append(ModelOutput(ct_label=ct_label, score=float(score)))
            results.append((path, model_outputs_for_path))
        return results

    def _get_results_from_features_topk(self, all_features: List[Tuple[Path, ModelFeatures]], top_k=5) -> Dict[str, MagikaResult]:
        """
        Helper function for getting top k the highest ranked model results for each feature
        """
        # We now do inference for those files that need it.

        if len(all_features) == 0:
            # nothing to be done
            return {}

        outputs: Dict[str, MagikaResult] = {}

        for path, model_output in self._get_topk_model_outputs_from_features(all_features, top_k):
            # In additional to the content type label from the DL model, we
            # also allow for other logic to overwrite such result. For
            # debugging and information purposes, the JSON output stores
            # both the raw DL model output and the final output we return to
            # the user.
            results = []
            for out in model_output:
                output_ct_label = self._get_output_ct_label_from_dl_result(out.ct_label, out.score)

                results.append(
                    self._get_result_from_labels_and_score(
                        path,
                        dl_ct_label=out.ct_label,
                        output_ct_label=output_ct_label,
                        score=out.score,
                    )
                )
            outputs[str(path)] = results

        return outputs

    def identify_bytes_topk(self, content: bytes, top_k=5) -> MagikaResult:
        # Helper function for getting topk results from bytes
        _get_results_from_features = self._get_results_from_features
        self._get_results_from_features = partial(self._get_results_from_features_topk, top_k=top_k)
        result = super().identify_bytes(content)
        self._get_results_from_features = _get_results_from_features
        return result

Run OpenVINO model inference

Now let’s check model inference result.

For starting work, please, select one of represented devices from dropdown list.

import ipywidgets as widgets

core = ov.Core()

device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value="AUTO",
    description="Device:",
    disabled=False,
)

device
Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')

As we discussed above, our OpenVINO extended OVMagika class has the same API like original one. Let’s try to create interface instance and launch it on different input formats

ov_magika = OVMagika(device=device.value)
result = ov_magika.identify_bytes(b"# Example\nThis is an example of markdown!")
print(f"Content type: {result.output.ct_label} - {result.output.score * 100:.4}%")
Content type: markdown - 99.29%
import requests

input_file = Path("./README.md")
if not input_file.exists():
    r = requests.get("https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/README.md")
    with open("README.md", "w") as f:
        f.write(r.text)
result = ov_magika.identify_path(input_file)
print(f"Content type: {result.output.ct_label} - {result.output.score * 100:.4}%")
Content type: markdown - 100.0%

Interactive demo

Now, you can try model on own files. Upload file into input file window, click submit button and look on predicted file types.

import gradio as gr


def classify(file_path):
    """Classify file using classes listing.
    Args:
        file_path): path to input file
    Returns:
        (dict): Mapping between class labels and class probabilities.
    """
    results = ov_magika.identify_bytes_topk(file_path)

    return {result.dl.ct_label: float(result.output.score) for result in results}


demo = gr.Interface(
    classify,
    [
        gr.File(label="Input file", type="binary"),
    ],
    gr.Label(label="Result"),
    examples=[["./README.md"]],
    allow_flagging="never",
)
try:
    demo.launch(debug=False)
except Exception:
    demo.launch(share=True, debug=False)
# if you are launching remotely, specify server_name and server_port
# demo.launch(server_name='your server name', server_port='server port in int')
# Read more in the docs: https://gradio.app/docs/
Running on local URL:  http://127.0.0.1:7860

To create a public link, set share=True in launch().