Kosmos-2: Multimodal Large Language Model and OpenVINO#
This Jupyter notebook can be launched after a local installation only.
KOSMOS-2 is a multimodal large language model (MLLM) that has new capabilities of multimodal grounding and referring. KOSMOS-2 can understand multimodal input, follow instructions, perceive object descriptions (e.g., bounding boxes), and ground language to the visual world.
Multimodal Large Language Models (MLLMs) have successfully played a role as a general-purpose interface across a wide range of tasks, such as language, vision, and vision-language tasks. MLLMs can perceive general modalities, including texts, images, and audio, and generate responses using free-form texts under zero-shot and few-shot settings.
In this work, authors unlock the grounding capability for multimodal large language models. Grounding capability can provide a more convenient and efficient human-AI interaction for vision-language tasks. It enables the user to point to the object or region in the image directly rather than input detailed text descriptions to refer to it, the model can understand that image region with its spatial locations. Grounding capability also enables the model to respond with visual answers (i.e., bounding boxes), which can support more vision-language tasks such as referring expression comprehension. Visual answers are more accurate and resolve the coreference ambiguity compared with text-only responses. In addition, grounding capability can link noun phrases and referring expressions in the generated free-form text response to the image regions, providing more accurate, informational, and comprehensive answers.
Table of contents:
This is a self-contained example that relies solely on its own code.
We recommend running the notebook in a virtual environment. You only need a Jupyter server to start. For details, please refer to Installation Guide.
Install requirements#
%pip install --upgrade pip
%pip install -q "openvino>=2024.0.0" "nncf>=2.11.0" "datasets>=2.20.0"
%pip install -q "transformers>=4.35" Pillow "gradio>=4.19" opencv-python
%pip install -q --extra-index-url https://download.pytorch.org/whl/cpu torch torchvision
Original model inference#
Let’s take the original example
import requests
from PIL import Image
from transformers import AutoProcessor, AutoModelForVision2Seq
model = AutoModelForVision2Seq.from_pretrained("microsoft/kosmos-2-patch14-224")
processor = AutoProcessor.from_pretrained("microsoft/kosmos-2-patch14-224")
prompt = "<grounding>An image of" # <grounding> is used to prompt the model to generate locations tokens
url = "https://huggingface.co/microsoft/kosmos-2-patch14-224/resolve/main/snowman.png"
image = Image.open(requests.get(url, stream=True).raw)
# The original Kosmos-2 demo saves the image first then reload it. For some images, this will give slightly different image input and change the generation outputs.
image.save("new_image.jpg")
image = Image.open("new_image.jpg")
inputs = processor(text=prompt, images=image, return_tensors="pt")
generated_ids = model.generate(
pixel_values=inputs["pixel_values"],
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
image_embeds=None,
image_embeds_position_mask=inputs["image_embeds_position_mask"],
use_cache=True,
max_new_tokens=128,
)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
# Specify `cleanup_and_extract=False` in order to see the raw model generation.
processed_text = processor.post_process_generation(generated_text, cleanup_and_extract=False)
print(f"Raw model generation: {processed_text}")
# `<grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911></object>.`
# By default, the generated text is cleanup and the entities are extracted.
processed_text, entities = processor.post_process_generation(generated_text)
print(f"Cleaned up generated text: {processed_text=}")
# `An image of a snowman warming himself by a fire.`
print(f"Extracted entities: {entities}")
# `[('a snowman', (12, 21), [(0.390625, 0.046875, 0.984375, 0.828125)]), ('a fire', (41, 47), [(0.171875, 0.015625, 0.484375, 0.890625)])]`
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Raw model generation: <grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911></object>.
Cleaned up generated text: processed_text='An image of a snowman warming himself by a fire.'
Extracted entities: [('a snowman', (12, 21), [(0.390625, 0.046875, 0.984375, 0.828125)]), ('a fire', (41, 47), [(0.171875, 0.015625, 0.484375, 0.890625)])]
Once you have the entities, you can use the following helper function to draw their bounding bboxes on the image:
import cv2
import numpy as np
from PIL import Image
def is_overlapping(rect1, rect2):
x1, y1, x2, y2 = rect1
x3, y3, x4, y4 = rect2
return not (x2 < x3 or x1 > x4 or y2 < y3 or y1 > y4)
def draw_entity_boxes_on_image(image, entities):
"""_summary_
Args:
image (_type_): image or image path
collect_entity_location (_type_): _description_
"""
if isinstance(image, Image.Image):
image_h = image.height
image_w = image.width
image = np.array(image)[:, :, [2, 1, 0]]
else:
raise ValueError(f"invaild image format, {type(image)} for {image}")
if len(entities) == 0:
return image
new_image = image.copy()
previous_bboxes = []
# size of text
text_size = 1
# thickness of text
text_line = 1 # int(max(1 * min(image_h, image_w) / 512, 1))
box_line = 3
(c_width, text_height), _ = cv2.getTextSize("F", cv2.FONT_HERSHEY_COMPLEX, text_size, text_line)
base_height = int(text_height * 0.675)
text_offset_original = text_height - base_height
text_spaces = 3
for entity_name, (start, end), bboxes in entities:
for x1_norm, y1_norm, x2_norm, y2_norm in bboxes:
orig_x1, orig_y1, orig_x2, orig_y2 = (
int(x1_norm * image_w),
int(y1_norm * image_h),
int(x2_norm * image_w),
int(y2_norm * image_h),
)
# draw bbox
# random color
color = tuple(np.random.randint(0, 255, size=3).tolist())
new_image = cv2.rectangle(new_image, (orig_x1, orig_y1), (orig_x2, orig_y2), color, box_line)
l_o, r_o = box_line // 2 + box_line % 2, box_line // 2 + box_line % 2 + 1
x1 = orig_x1 - l_o
y1 = orig_y1 - l_o
if y1 < text_height + text_offset_original + 2 * text_spaces:
y1 = orig_y1 + r_o + text_height + text_offset_original + 2 * text_spaces
x1 = orig_x1 + r_o
# add text background
(text_width, text_height), _ = cv2.getTextSize(f" {entity_name}", cv2.FONT_HERSHEY_COMPLEX, text_size, text_line)
text_bg_x1, text_bg_y1, text_bg_x2, text_bg_y2 = (
x1,
y1 - (text_height + text_offset_original + 2 * text_spaces),
x1 + text_width,
y1,
)
for prev_bbox in previous_bboxes:
while is_overlapping((text_bg_x1, text_bg_y1, text_bg_x2, text_bg_y2), prev_bbox):
text_bg_y1 += text_height + text_offset_original + 2 * text_spaces
text_bg_y2 += text_height + text_offset_original + 2 * text_spaces
y1 += text_height + text_offset_original + 2 * text_spaces
if text_bg_y2 >= image_h:
text_bg_y1 = max(
0,
image_h - (text_height + text_offset_original + 2 * text_spaces),
)
text_bg_y2 = image_h
y1 = image_h
break
alpha = 0.5
for i in range(text_bg_y1, text_bg_y2):
for j in range(text_bg_x1, text_bg_x2):
if i < image_h and j < image_w:
if j < text_bg_x1 + 1.35 * c_width:
# original color
bg_color = color
else:
# white
bg_color = [255, 255, 255]
new_image[i, j] = (alpha * new_image[i, j] + (1 - alpha) * np.array(bg_color)).astype(np.uint8)
cv2.putText(
new_image,
f" {entity_name}",
(x1, y1 - text_offset_original - 1 * text_spaces),
cv2.FONT_HERSHEY_COMPLEX,
text_size,
(0, 0, 0),
text_line,
cv2.LINE_AA,
)
# previous_locations.append((x1, y1))
previous_bboxes.append((text_bg_x1, text_bg_y1, text_bg_x2, text_bg_y2))
pil_image = Image.fromarray(new_image[:, :, [2, 1, 0]])
return pil_image
# Draw the bounding bboxes
new_image = draw_entity_boxes_on_image(image, entities)
display(new_image)
Convert models to OpenVINO Intermediate representation (IR) format#
The original model includes 3 models: vision model
Kosmos2VisionModel
, Kosmos2ImageToTextProjection
that is the
layer that transforms the image model’s output to part of the text
model’s input (namely, image features), and transformer based text model
Kosmos2TextForCausalLM
. We will convert all of them and then replace
the original models.
Define paths for converted models:
from pathlib import Path
models_base_folder = Path("models")
VISION_MODEL_IR_PATH = models_base_folder / "vision_model.xml"
IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH = models_base_folder / "image_to_text_projection_model.xml"
FIRST_STAGE_MODEL_PATH = models_base_folder / "kosmos_input_embed.xml"
SECOND_STAGE_MODEL_PATH = models_base_folder / "kosmos_with_past.xml"
Define the conversion function for PyTorch modules. We use
ov.convert_model
function to obtain OpenVINO Intermediate
Representation object and ov.save_model
function to save it as XML
file.
import gc
import torch
import openvino as ov
def cleanup_torchscript_cache():
# cleanup memory
torch._C._jit_clear_class_registry()
torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore()
torch.jit._state._clear_class_state()
gc.collect()
def convert(model: torch.nn.Module, xml_path: str, example_input):
xml_path = Path(xml_path)
if not xml_path.exists():
xml_path.parent.mkdir(parents=True, exist_ok=True)
with torch.no_grad():
converted_model = ov.convert_model(model, example_input=example_input)
ov.save_model(converted_model, xml_path, compress_to_fp16=False)
cleanup_torchscript_cache()
Convert the vision model#
Vision model accept pixel_values
and returns image_embeds
.
convert(model.vision_model, VISION_MODEL_IR_PATH, inputs["pixel_values"])
Convert Image To Text Projection model#
from torch import nn
def get_image_embeds(pixel_values):
vision_model_output = model.vision_model(pixel_values)
image_embeds = model.vision_model.model.post_layernorm(vision_model_output[0])
image_embeds = nn.functional.normalize(image_embeds, dim=-1)
return image_embeds
image_embeds = get_image_embeds(inputs["pixel_values"])
convert(model.image_to_text_projection, IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH, image_embeds)
Convert Text model#
The Text Model performs in generation pipeline and we can separate it
into two stage. In the first stage the model transforms image_embeds
into output for the second stage. In the second stage the model produces
tokens during several runs that can be transformed into raw model
generated text by AutoProcessor
.
from typing import Optional, List
from transformers.models.kosmos2.modeling_kosmos2 import (
create_position_ids_from_input_ids,
)
def get_projecton_image_embeds(pixel_values):
vision_model_output = model.vision_model(pixel_values)
image_embeds = model.vision_model.model.post_layernorm(vision_model_output[0])
image_embeds = nn.functional.normalize(image_embeds, dim=-1)
image_embeds, _ = model.image_to_text_projection(image_embeds)
return image_embeds
def flattenize_inputs(inputs):
"""
Helper function for making nested inputs flattens
"""
flatten_inputs = []
for input_data in inputs:
if input_data is None:
continue
if isinstance(input_data, (list, tuple)):
flatten_inputs.extend(flattenize_inputs(input_data))
else:
flatten_inputs.append(input_data)
return flatten_inputs
def postprocess_converted_model(
ov_model,
example_input=None,
input_names=None,
output_names=None,
dynamic_shapes=None,
):
"""
Helper function for appling postprocessing on converted model with updating input names, shapes and output names
acording to requested specification
"""
flatten_example_inputs = flattenize_inputs(example_input) if example_input else []
if input_names:
for inp_name, m_input, input_data in zip(input_names, ov_model.inputs, flatten_example_inputs):
m_input.get_tensor().set_names({inp_name})
if output_names:
for out, out_name in zip(ov_model.outputs, output_names):
out.get_tensor().set_names({out_name})
return ov_model
def convert_text_model():
model.text_model.model.config.torchscript = True
model.text_model.config.torchscript = True
image_embeds = get_projecton_image_embeds(inputs["pixel_values"])
conv_inputs = {
"input_ids": inputs["input_ids"],
"attention_mask": inputs["attention_mask"],
"image_embeds": image_embeds,
"image_embeds_position_mask": inputs["image_embeds_position_mask"],
}
outs = model.text_model.model(**conv_inputs)
inputs_ = ["input_ids", "attention_mask"]
outputs = ["logits"]
dynamic_shapes = {
"input_ids": {1: "seq_len"},
"attention_mask": {1: "seq_len"},
"position_ids": {0: "seq_len"},
}
for idx in range(len(outs[1])):
inputs_.extend([f"past_key_values.{idx}.key", f"past_key_values.{idx}.value"])
dynamic_shapes[inputs_[-1]] = {2: "past_sequence + sequence"}
dynamic_shapes[inputs_[-2]] = {2: "past_sequence + sequence"}
outputs.extend([f"present.{idx}.key", f"present.{idx}.value"])
if not FIRST_STAGE_MODEL_PATH.exists():
ov_model = ov.convert_model(model.text_model.model, example_input=conv_inputs)
ov_model = postprocess_converted_model(ov_model, output_names=outputs)
ov.save_model(ov_model, FIRST_STAGE_MODEL_PATH)
del ov_model
cleanup_torchscript_cache()
if not SECOND_STAGE_MODEL_PATH.exists():
position_ids = create_position_ids_from_input_ids(
inputs["input_ids"],
padding_idx=model.text_model.config.pad_token_id,
past_key_values_length=0,
)[:, -1:]
example_input_second_stage = {
"input_ids": inputs["input_ids"][:, -1:],
"attention_mask": inputs["input_ids"].new_ones(1, inputs["input_ids"].shape[1] + 1),
"position_ids": position_ids,
"past_key_values": outs[1],
}
ov_model = ov.convert_model(model.text_model.model, example_input=example_input_second_stage)
ov_model = postprocess_converted_model(
ov_model,
example_input=example_input_second_stage.values(),
input_names=inputs_,
output_names=outputs,
dynamic_shapes=dynamic_shapes,
)
ov.save_model(ov_model, SECOND_STAGE_MODEL_PATH)
del ov_model
cleanup_torchscript_cache()
convert_text_model()
Compiling models and prepare pipeline#
Select device that will be used to do models inference using OpenVINO from the dropdown list:
import ipywidgets as widgets
core = ov.Core()
device = widgets.Dropdown(
options=core.available_devices + ["AUTO"],
value="AUTO",
description="Device:",
disabled=False,
)
device
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using tokenizers before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Dropdown(description='Device:', index=4, options=('CPU', 'GPU.0', 'GPU.1', 'GPU.2', 'AUTO'), value='AUTO')
Let’s create callable wrapper classes for compiled models to allow
interaction with original pipeline. Note that all of wrapper classes
return torch.Tensor
s instead of np.array
s.
class WraperInternalVisionModel:
post_layernorm = model.vision_model.model.post_layernorm
class VisionModelWrapper(torch.nn.Module):
def __init__(self, model_ir_path):
super().__init__()
self.model = WraperInternalVisionModel()
self.vision_model = core.compile_model(model_ir_path, device.value)
def forward(self, pixel_values, **kwargs):
vision_model_output = self.vision_model(pixel_values)[0]
return [torch.from_numpy(vision_model_output)]
class ImageToTextProjectionModelWrapper(torch.nn.Module):
def __init__(self, model_ir_path):
super().__init__()
self.image_to_text_projection = core.compile_model(model_ir_path, device.value)
def forward(self, image_embeds):
output = self.image_to_text_projection(image_embeds)
image_embeds = output[0]
projection_attentions = output[1]
return image_embeds, projection_attentions
from transformers.generation import GenerationConfig, GenerationMixin
from transformers.models.kosmos2.modeling_kosmos2 import (
Kosmos2ForConditionalGenerationModelOutput,
)
class KosmosForCausalLMWrapper(GenerationMixin):
def __init__(self, first_stage_model_path, second_stage_model_path, device):
self.model_stage_1 = core.compile_model(first_stage_model_path, device.value)
self.model_stage_2 = core.read_model(second_stage_model_path)
self.input_names = {key.get_any_name(): idx for idx, key in enumerate(self.model_stage_2.inputs)}
self.output_names = {key.get_any_name(): idx for idx, key in enumerate(self.model_stage_2.outputs)}
self.key_value_input_names = [key for key in self.input_names if "key_values" in key]
self.key_value_output_names = [key for key in self.output_names if "present" in key]
self.model_stage_2 = core.compile_model(self.model_stage_2, device.value)
self.request = self.model_stage_2.create_infer_request()
self.config = model.config
self.generation_config = GenerationConfig.from_model_config(model.config)
self.main_input_name = "input_ids"
self.device = torch.device("cpu")
self.num_pkv = 2
self.lm_head = nn.Linear(
in_features=model.text_model.config.embed_dim,
out_features=model.text_model.config.vocab_size,
bias=False,
)
self._supports_cache_class = False
def get_input_embeddings(self) -> nn.Module:
return self.model.embed_tokens
def set_input_embeddings(self, value):
self.model.embed_tokens = value
def get_output_embeddings(self) -> nn.Module:
return self.lm_head
def set_output_embeddings(self, new_embeddings):
self.lm_head = new_embeddings
def can_generate(self):
"""Returns True to validate the check that the model using `GenerationMixin.generate()` can indeed generate."""
return True
def __call__(
self,
input_ids,
attention_mask: Optional[torch.Tensor] = None,
image_embeds: Optional[torch.Tensor] = None,
image_embeds_position_mask: Optional[torch.Tensor] = None,
position_ids=None,
past_key_values: Optional[List[torch.FloatTensor]] = None,
**kwargs,
):
return self.forward(
input_ids,
attention_mask,
image_embeds,
image_embeds_position_mask,
position_ids,
past_key_values,
)
def forward(
self,
input_ids,
attention_mask: Optional[torch.Tensor] = None,
image_embeds: Optional[torch.Tensor] = None,
image_embeds_position_mask: Optional[torch.Tensor] = None,
position_ids=None,
past_key_values: Optional[List[torch.FloatTensor]] = None,
**kwargs,
):
if past_key_values is None:
outs = self.model_stage_1(
{
"input_ids": input_ids,
"attention_mask": attention_mask,
"image_embeds": image_embeds,
"image_embeds_position_mask": image_embeds_position_mask,
}
)
lm_logits = model.text_model.lm_head(torch.from_numpy(outs[0]))
pkv = list(outs.values())[1:]
pkv = tuple(pkv[i : i + 2] for i in range(0, len(pkv), 2))
return Kosmos2ForConditionalGenerationModelOutput(logits=lm_logits, past_key_values=pkv)
if past_key_values is not None:
past_key_values = tuple(past_key_value for pkv_per_layer in past_key_values for past_key_value in pkv_per_layer)
inputs_ = {
"input_ids": input_ids[:, -1].unsqueeze(-1),
"attention_mask": attention_mask,
"position_ids": position_ids,
}
inputs_.update(dict(zip(self.key_value_input_names, past_key_values)))
# Run inference
self.request.start_async(inputs_, share_inputs=True)
self.request.wait()
logits = torch.from_numpy(self.request.get_tensor("logits").data)
logits = model.text_model.lm_head(logits)
# Tuple of length equal to : number of layer * number of past_key_value per decoder layer (2 corresponds to the self-attention layer)
past_key_values = tuple(self.request.get_tensor(key).data for key in self.key_value_output_names)
# Tuple of tuple of length `n_layers`, with each tuple of length equal to 2 (k/v of self-attention)
past_key_values = tuple(past_key_values[i : i + self.num_pkv] for i in range(0, len(past_key_values), self.num_pkv))
return Kosmos2ForConditionalGenerationModelOutput(logits=logits, past_key_values=past_key_values)
def prepare_inputs_for_generation(
self,
input_ids,
image_embeds=None,
image_embeds_position_mask=None,
past_key_values=None,
attention_mask=None,
use_cache=None,
**kwargs,
):
input_shape = input_ids.shape
# if model is used as a decoder in encoder-decoder model, the decoder attention mask is created on the fly
if attention_mask is None:
attention_mask = input_ids.new_ones(input_shape)
position_ids = None
# cut input_ids if past_key_values is used
if past_key_values is not None:
position_ids = create_position_ids_from_input_ids(
input_ids,
padding_idx=model.text_model.config.pad_token_id,
past_key_values_length=0,
)[:, -1:]
input_ids = input_ids[:, -1:]
image_embeds = None
image_embeds_position_mask = None
elif image_embeds_position_mask is not None:
batch_size, seq_len = input_ids.size()
mask_len = image_embeds_position_mask.size()[-1]
image_embeds_position_mask = torch.cat(
(
image_embeds_position_mask,
torch.zeros(
size=(batch_size, seq_len - mask_len),
dtype=torch.bool,
device=input_ids.device,
),
),
dim=1,
)
return {
"input_ids": input_ids,
"image_embeds": image_embeds,
"image_embeds_position_mask": image_embeds_position_mask,
"position_ids": position_ids,
"past_key_values": past_key_values,
"attention_mask": attention_mask,
}
@staticmethod
# Copied from transformers.models.umt5.modeling_umt5.UMT5ForConditionalGeneration._reorder_cache
def _reorder_cache(past_key_values, beam_idx):
reordered_past = ()
for layer_past in past_key_values:
reordered_past += (tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past),)
return reordered_past
class Kosmos2ForConditionalGenerationWrapper:
def __init__(
self,
vision_model_path,
image_to_text_projection_model_path,
first_stage_model_path,
second_stage_model_path,
device,
):
self.vision_model = VisionModelWrapper(vision_model_path)
self.image_to_text_projection = ImageToTextProjectionModelWrapper(image_to_text_projection_model_path)
self.text_model = KosmosForCausalLMWrapper(first_stage_model_path, second_stage_model_path, device)
def generate(
self,
pixel_values=None,
image_embeds_position_mask=None,
input_ids=None,
attention_mask=None,
image_embeds=None,
**kwargs,
):
vision_model_output = self.vision_model(pixel_values)
image_embeds = model.vision_model.model.post_layernorm(vision_model_output[0])
# normalized features
image_embeds = nn.functional.normalize(image_embeds, dim=-1)
image_embeds, projection_attentions = self.image_to_text_projection(image_embeds.detach().numpy())
output = self.text_model.generate(
input_ids,
attention_mask=attention_mask,
image_embeds=image_embeds,
image_embeds_position_mask=image_embeds_position_mask,
**kwargs,
)
return output
ov_model = Kosmos2ForConditionalGenerationWrapper(
VISION_MODEL_IR_PATH,
IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH,
FIRST_STAGE_MODEL_PATH,
SECOND_STAGE_MODEL_PATH,
device,
)
Inference#
def generate_entities(model):
generated_ids = model.generate(
pixel_values=inputs["pixel_values"],
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
image_embeds=None,
image_embeds_position_mask=inputs["image_embeds_position_mask"],
max_new_tokens=128,
)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
# Specify `cleanup_and_extract=False` in order to see the raw model generation.
processed_text = processor.post_process_generation(generated_text, cleanup_and_extract=False)
print(f"Raw model generation: {processed_text}")
# `<grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911></object>.`
# By default, the generated text is cleanup and the entities are extracted.
processed_text, entities = processor.post_process_generation(generated_text)
print(f"Cleaned up generated text: {processed_text=}")
# `An image of a snowman warming himself by a fire.`
print(f"Extracted entities: {entities}")
# `[('a snowman', (12, 21), [(0.390625, 0.046875, 0.984375, 0.828125)]), ('a fire', (41, 47), [(0.171875, 0.015625, 0.484375, 0.890625)])]`
return entities
entities = generate_entities(ov_model)
new_image = draw_entity_boxes_on_image(image, entities)
display(new_image)
Raw model generation: <grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> warming himself by<phrase> a fire</phrase><object><patch_index_0005><patch_index_0911></object>.
Cleaned up generated text: processed_text='An image of a snowman warming himself by a fire.'
Extracted entities: [('a snowman', (12, 21), [(0.390625, 0.046875, 0.984375, 0.828125)]), ('a fire', (41, 47), [(0.171875, 0.015625, 0.484375, 0.890625)])]
Quantization#
NNCF enables
post-training quantization by adding quantization layers into model
graph and then using a subset of the training dataset to initialize the
parameters of these additional quantization layers. Quantized operations
are executed in INT8
instead of FP32
/FP16
making model
inference faster.
Please select below whether you would like to run quantization to improve model inference speed.
NOTE: Quantization is time and memory consuming operation. Running quantization code below may take some time.
to_quantize = widgets.Checkbox(value=True, description="Quantization")
to_quantize
Checkbox(value=True, description='Quantization')
Let’s load skip magic
extension to skip quantization if
to_quantize
is not selected
# Fetch `skip_kernel_extension` module
import requests
r = requests.get(
url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/skip_kernel_extension.py",
)
open("skip_kernel_extension.py", "w").write(r.text)
ov_optimized_model = None
%load_ext skip_kernel_extension
Prepare calibration datasets#
We use a portion of KoalaAI/StockImages-CC0 dataset from Hugging Face as calibration data.
%%skip not $to_quantize.value
INT8_VISION_MODEL_IR_PATH = models_base_folder / "vision_model_int8.xml"
INT8_IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH = models_base_folder / "image_to_text_projection_model_int8.xml"
INT4_FIRST_STAGE_MODEL_PATH = models_base_folder / "kosmos_input_embed_int4.xml"
INT4_SECOND_STAGE_MODEL_PATH = models_base_folder / "kosmos_with_past_int4.xml"
%%skip not $to_quantize.value
import datasets
prompt = "<grounding>An image of"
subset_size = 200
dataset = datasets.load_dataset("KoalaAI/StockImages-CC0", split="train", streaming=True, trust_remote_code=True)
dataset = dataset.shuffle(seed=42).take(subset_size).select_columns(["image"])
To collect intermediate model inputs for calibration we should customize
CompiledModel
and InferRequest
.
%%skip not $to_quantize.value
from tqdm.notebook import tqdm
from transformers import set_seed
set_seed(42)
def collect_calibration_data(pipeline, dataset, subset_size):
calibration_dataset = {
"vision_model": [],
"image_to_text_proj": [],
}
for data in tqdm(dataset, total=subset_size, desc="Collecting calibration dataset"):
img = data["image"]
pixel_values = processor(text=prompt, images=img, return_tensors="pt")["pixel_values"]
vision_model_output = pipeline.vision_model(pixel_values)
image_embeds = model.vision_model.model.post_layernorm(vision_model_output[0])
image_embeds = nn.functional.normalize(image_embeds, dim=-1)
calibration_dataset["vision_model"].append(pixel_values)
calibration_dataset["image_to_text_proj"].append(image_embeds.detach().numpy())
return calibration_dataset
%%skip not $to_quantize.value
if not (INT8_VISION_MODEL_IR_PATH.exists() and INT8_IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH.exists()):
calibration_dataset = collect_calibration_data(ov_model, dataset, subset_size)
Run Quantization#
%%skip not $to_quantize.value
import nncf
if not INT8_VISION_MODEL_IR_PATH.exists():
vision_model = core.read_model(VISION_MODEL_IR_PATH)
quantized_model = nncf.quantize(
model=vision_model,
calibration_dataset=nncf.Dataset(calibration_dataset["vision_model"]),
subset_size=subset_size,
model_type=nncf.ModelType.TRANSFORMER,
)
ov.save_model(quantized_model, INT8_VISION_MODEL_IR_PATH)
%%skip not $to_quantize.value
if not INT8_IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH.exists():
image_to_text_proj_model = core.read_model(IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH)
quantized_model = nncf.quantize(
model=image_to_text_proj_model,
calibration_dataset=nncf.Dataset(calibration_dataset["image_to_text_proj"]),
subset_size=subset_size,
model_type=nncf.ModelType.TRANSFORMER,
)
ov.save_model(quantized_model, INT8_IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH)
Run Weights Compression#
Quantizing of the Text Model does not significantly improve inference performance but can lead to a substantial degradation of accuracy. The weight compression will be applied to footprint reduction.
%%skip not $to_quantize.value
import nncf
if not INT4_FIRST_STAGE_MODEL_PATH.exists():
model_stage_1 = core.read_model(FIRST_STAGE_MODEL_PATH)
quantized_model = nncf.compress_weights(model_stage_1, nncf.CompressWeightsMode.INT4_ASYM)
ov.save_model(quantized_model, INT4_FIRST_STAGE_MODEL_PATH)
if not INT4_SECOND_STAGE_MODEL_PATH.exists():
model_stage_2 = core.read_model(SECOND_STAGE_MODEL_PATH)
quantized_model = nncf.compress_weights(model_stage_2, nncf.CompressWeightsMode.INT4_ASYM)
ov.save_model(quantized_model, INT4_SECOND_STAGE_MODEL_PATH)
Let’s compare the images generated by the original and optimized pipelines.
%%skip not $to_quantize.value
ov_optimized_model = Kosmos2ForConditionalGenerationWrapper(
INT8_VISION_MODEL_IR_PATH,
INT8_IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH,
INT4_FIRST_STAGE_MODEL_PATH,
INT4_SECOND_STAGE_MODEL_PATH,
device,
)
%%skip not $to_quantize.value
import matplotlib.pyplot as plt
def visualize_results(orig_img: Image, optimized_img: Image):
"""
Helper function for results visualization
Parameters:
orig_img (Image.Image): generated image using FP16 models
optimized_img (Image.Image): generated image using quantized models
Returns:
fig (matplotlib.pyplot.Figure): matplotlib generated figure contains drawing result
"""
orig_title = "FP32 model"
control_title = "INT8 model"
figsize = (20, 20)
fig, axs = plt.subplots(1, 2, figsize=figsize, sharex="all", sharey="all")
list_axes = list(axs.flat)
for a in list_axes:
a.set_xticklabels([])
a.set_yticklabels([])
a.get_xaxis().set_visible(False)
a.get_yaxis().set_visible(False)
a.grid(False)
list_axes[0].imshow(np.array(orig_img))
list_axes[1].imshow(np.array(optimized_img))
list_axes[0].set_title(orig_title, fontsize=15)
list_axes[1].set_title(control_title, fontsize=15)
fig.subplots_adjust(wspace=0.01, hspace=0.01)
fig.tight_layout()
%%skip not $to_quantize.value
int_entities = generate_entities(ov_optimized_model)
int_image = draw_entity_boxes_on_image(image, int_entities)
visualize_results(new_image, int_image)
Raw model generation: <grounding> An image of<phrase> a snowman</phrase><object><patch_index_0044><patch_index_0863></object> warming himself by<phrase> a fire</phrase><object><patch_index_0006><patch_index_0879></object>.
Cleaned up generated text: processed_text='An image of a snowman warming himself by a fire.'
Extracted entities: [('a snowman', (12, 21), [(0.390625, 0.046875, 0.984375, 0.828125)]), ('a fire', (41, 47), [(0.203125, 0.015625, 0.484375, 0.859375)])]
Compare model file sizes#
%%skip not $to_quantize.value
fp32_model_paths = [VISION_MODEL_IR_PATH, IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH, FIRST_STAGE_MODEL_PATH, SECOND_STAGE_MODEL_PATH]
int8_model_paths = [INT8_VISION_MODEL_IR_PATH, INT8_IMAGE_TO_TEXT_PROJECTION_MODEL_IR_PATH, INT4_FIRST_STAGE_MODEL_PATH, INT4_SECOND_STAGE_MODEL_PATH]
for fp32_path, int8_path in zip(fp32_model_paths, int8_model_paths):
fp32_ir_model_size = fp32_path.with_suffix(".bin").stat().st_size
int8_model_size = int8_path.with_suffix(".bin").stat().st_size
print(f"{fp32_path.stem} compression rate: {fp32_ir_model_size / int8_model_size:.3f}")
vision_model compression rate: 3.956
image_to_text_projection_model compression rate: 3.899
kosmos_input_embed compression rate: 3.475
kosmos_with_past compression rate: 3.475
Compare inference time of the FP32 and optimized pipelines#
To measure the inference performance of the FP32
and optimized
pipelines, we use mean inference time on 7 samples.
NOTE: For the most accurate performance estimation, it is recommended to run
benchmark_app
in a terminal/command prompt after closing other applications.
%%skip not $to_quantize.value
import time
def calculate_inference_time(pipeline, dataset):
inference_time = []
for data in dataset.take(7):
img = data["image"]
inputs = processor(text=prompt, images=img, return_tensors="pt")
start = time.perf_counter()
_ = pipeline.generate(
pixel_values=inputs["pixel_values"],
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
image_embeds=None,
image_embeds_position_mask=inputs["image_embeds_position_mask"],
max_new_tokens=128,
)
end = time.perf_counter()
delta = end - start
inference_time.append(delta)
return np.mean(inference_time)
%%skip not $to_quantize.value
fp_latency = calculate_inference_time(ov_model, dataset)
print(f"FP32 pipeline: {fp_latency:.3f} seconds")
int_latency = calculate_inference_time(ov_optimized_model, dataset)
print(f"Optimized pipeline: {int_latency:.3f} seconds")
print(f"Performance speed-up: {fp_latency / int_latency:.3f}")
FP32 pipeline: 2.626 seconds
Optimized pipeline: 1.184 seconds
Performance speed-up: 2.217
Interactive inference#
Please select below whether you would like to use the quantized models to launch the interactive demo.
quantized_models_present = ov_optimized_model is not None
use_quantized_models = widgets.Checkbox(
value=quantized_models_present,
description="Use quantized models",
disabled=not quantized_models_present,
)
use_quantized_models
Checkbox(value=True, description='Use quantized models')
import gradio as gr
pipeline = ov_optimized_model if use_quantized_models.value else ov_model
def generate(image, prompt, use_bbox, _=gr.Progress(track_tqdm=True)):
if use_bbox:
prompt = "<grounding> " + prompt
inputs = processor(text=prompt, images=image, return_tensors="pt")
generated_ids_ = pipeline.generate(
pixel_values=inputs["pixel_values"],
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
image_embeds=None,
image_embeds_position_mask=inputs["image_embeds_position_mask"],
max_new_tokens=128,
)
generated_text = processor.batch_decode(generated_ids_, skip_special_tokens=True)[0]
processed_text, entities = processor.post_process_generation(generated_text)
new_image = draw_entity_boxes_on_image(Image.fromarray(image), entities)
return new_image, processed_text
if not Path("gradio_helper.py").exists():
r = requests.get(
url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/notebooks/kosmos2-multimodal-large-language-model/gradio_helper.py"
)
open("gradio_helper.py", "w").write(r.text)
from gradio_helper import make_demo
demo = make_demo(fn=generate)
try:
demo.queue().launch(debug=True)
except Exception:
demo.queue().launch(debug=True, share=True)
# if you are launching remotely, specify server_name and server_port
# demo.launch(server_name='your server name', server_port='server port in int')
# Read more in the docs: https://gradio.app/docs/
# please uncomment and run this cell for stopping gradio interface
# demo.close()