Image generation with Segmind Stable Diffusion 1B (SSD-1B) model and OpenVINO¶
This Jupyter notebook can be launched after a local installation only.
The Segmind Stable Diffusion Model (SSD-1B) is a distilled 50% smaller version of the Stable Diffusion XL (SDXL), offering a 60% speedup while maintaining high-quality text-to-image generation capabilities. It has been trained on diverse datasets, including Grit and Midjourney scrape data, to enhance its ability to create a wide range of visual content based on textual prompts. This model employs a knowledge distillation strategy, where it leverages the teachings of several expert models in succession, including SDXL, ZavyChromaXL, and JuggernautXL, to combine their strengths and produce impressive visual outputs.
In this tutorial, we consider how to run the SSD-1B model using OpenVINO. Then we will consider LCM distilled version of segmind/SSD-1B that allows to reduce the number of inference steps to only between 2 - 8 steps.
We will use a pre-trained model from the Hugging Face Diffusers library. To simplify the user experience, the Hugging Face Optimum Intel library is used to convert the models to OpenVINO™ IR format.
Table of contents:¶
Install prerequisites¶
%pip install -q "git+https://github.com/huggingface/optimum-intel.git"
%pip install -q "openvino>=2023.1.0"
%pip install -q --upgrade-strategy eager "invisible-watermark>=0.2.0" "transformers>=4.33" "accelerate" "onnx" "onnxruntime" safetensors "diffusers>=0.22.0"
%pip install -q gradio
SSD-1B Base model¶
We will start with the base model part, which is responsible for the generation of images of the desired output size. SSD-1B is available for downloading via the HuggingFace hub. It already provides a ready-to-use model in OpenVINO format compatible with Optimum Intel.
To load an OpenVINO model and run an inference with OpenVINO Runtime,
you need to replace diffusers StableDiffusionXLPipeline
with Optimum
OVStableDiffusionXLPipeline
. In case you want to load a PyTorch
model and convert it to the OpenVINO format on the fly, you can set
export=True
.
You can save the model on disk using the save_pretrained
method.
from pathlib import Path
from optimum.intel.openvino import OVStableDiffusionXLPipeline
model_id = "segmind/SSD-1B"
model_dir = Path("openvino-ssd-1b")
Select inference device SSD-1B Base model¶
select device from dropdown list for running inference using OpenVINO
import ipywidgets as widgets
import openvino as ov
core = ov.Core()
device = widgets.Dropdown(
options=core.available_devices + ["AUTO"],
value='AUTO',
description='Device:',
disabled=False,
)
device
Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')
import gc
if not model_dir.exists():
text2image_pipe = OVStableDiffusionXLPipeline.from_pretrained(model_id, compile=False, device=device.value, export=True, load_in_8bit=False)
text2image_pipe.half()
text2image_pipe.save_pretrained(model_dir)
text2image_pipe.compile()
gc.collect()
else:
text2image_pipe = OVStableDiffusionXLPipeline.from_pretrained(model_dir, device=device.value)
Compiling the vae_decoder to AUTO ...
Compiling the unet to AUTO ...
Compiling the text_encoder_2 to AUTO ...
Compiling the vae_encoder to AUTO ...
Compiling the text_encoder to AUTO ...
Run Text2Image generation pipeline¶
Now, we can run the model for the generation of images using text
prompts. To speed up evaluation and reduce the required memory we
decrease num_inference_steps
and image size (using height
and
width
). You can modify them to suit your needs and depend on the
target hardware. We also specified a generator
parameter based on a
numpy random state with a specific seed for results reproducibility.
NOTE: Generating a default size 1024x1024 image requires about 53GB for the SSD-1B model in case if the converted model is loaded from disk and up to 64GB RAM for the SDXL model after exporting.
prompt = "An astronaut riding a green horse" # Your prompt here
neg_prompt = "ugly, blurry, poor quality" # Negative prompt here
image = text2image_pipe(prompt=prompt, num_inference_steps=15, negative_prompt=neg_prompt).images[0]
image
0%| | 0/15 [00:00<?, ?it/s]
Generating a 512x512 image requires about 27GB for the SSD-1B model and about 42GB RAM for the SDXL model in case if the converted model is loaded from disk.
import numpy as np
prompt = "cute cat 4k, high-res, masterpiece, best quality, soft lighting, dynamic angle"
image = text2image_pipe(prompt, num_inference_steps=15, height=512, width=512, generator=np.random.RandomState(314)).images[0]
image
0%| | 0/15 [00:00<?, ?it/s]
Image2Image Generation Interactive Demo¶
import gradio as gr
prompt = "An astronaut riding a green horse"
neg_prompt = "ugly, blurry, poor quality"
def generate_from_text(text_promt, neg_prompt, seed, num_steps):
result = text2image_pipe(text_promt, negative_prompt=neg_prompt, num_inference_steps=num_steps, generator=np.random.RandomState(seed), height=512, width=512).images[0]
return result
with gr.Blocks() as demo:
with gr.Column():
positive_input = gr.Textbox(label="Text prompt")
neg_input = gr.Textbox(label="Negative prompt")
with gr.Row():
seed_input = gr.Slider(0, 10_000_000, value=42, label="Seed")
steps_input = gr.Slider(label="Steps", value=10, step=1)
btn = gr.Button()
out = gr.Image(label="Result", type="pil", width=512)
btn.click(generate_from_text, [positive_input, neg_input, seed_input, steps_input], out)
gr.Examples([
[prompt, neg_prompt, 999, 20],
["underwater world coral reef, colorful jellyfish, 35mm, cinematic lighting, shallow depth of field, ultra quality, masterpiece, realistic", neg_prompt, 89, 20],
["a photo realistic happy white poodle dog playing in the grass, extremely detailed, high res, 8k, masterpiece, dynamic angle", neg_prompt, 1569, 15],
["Astronaut on Mars watching sunset, best quality, cinematic effects,", neg_prompt, 65245, 12],
["Black and white street photography of a rainy night in New York, reflections on wet pavement", neg_prompt, 48199, 10]
], [positive_input, neg_input, seed_input, steps_input])
try:
demo.queue().launch(debug=False)
except Exception:
demo.queue().launch(debug=False, share=True)
# if you are launching remotely, specify server_name and server_port
# demo.launch(server_name='your server name', server_port='server port in int')
# Read more in the docs: https://gradio.app/docs/
Latent Consistency Model (LCM)¶
Latent Consistency Model (LCM) was proposed in Latent Consistency Models: Synthesizing High-Resolution Images with Few-Step Inference and was successfully applied the same approach to create LCM for SDXL.
This checkpoint is a LCM distilled version of segmind/SSD-1B that allows to reduce the number of inference steps to only between 2 - 8 steps.
The model can be loaded with its base pipeline segmind/SSD-1B. Next, the scheduler needs to be changed to LCMScheduler, so we can reduce the number of inference steps to just 2 to 8 steps.
Infer the original model¶
from diffusers import UNet2DConditionModel, DiffusionPipeline, LCMScheduler
unet = UNet2DConditionModel.from_pretrained("latent-consistency/lcm-ssd-1b", variant="fp16")
pipe = DiffusionPipeline.from_pretrained("segmind/SSD-1B", unet=unet, variant="fp16")
pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config)
pipe.to("cpu")
prompt = "a close-up picture of an old man standing in the rain"
image = pipe(prompt, num_inference_steps=4, guidance_scale=1.0).images[0]
image
Loading pipeline components...: 0%| | 0/7 [00:00<?, ?it/s]
The config attributes {'skip_prk_steps': True} were passed to LCMScheduler, but are not expected and will be ignored. Please verify your scheduler_config.json configuration file.
0%| | 0/4 [00:00<?, ?it/s]
Convert the model to OpenVINO IR¶
The pipeline consists of four important parts:
Two Text Encoders to create condition to generate an image from a text prompt.
U-Net for step-by-step denoising latent image representation.
Autoencoder (VAE) for decoding latent space to image.
Let us convert each part:
Imports¶
from pathlib import Path
import numpy as np
import torch
import openvino as ov
Let’s define the conversion function for PyTorch modules. We use
ov.convert_model
function to obtain OpenVINO Intermediate
Representation object and ov.save_model
function to save it as XML
file.
def convert(model: torch.nn.Module, xml_path: str, example_input):
xml_path = Path(xml_path)
if not xml_path.exists():
xml_path.parent.mkdir(parents=True, exist_ok=True)
with torch.no_grad():
converted_model = ov.convert_model(model, example_input=example_input)
ov.save_model(converted_model, xml_path)
# cleanup memory
torch._C._jit_clear_class_registry()
torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore()
torch.jit._state._clear_class_state()
Convert VAE¶
The VAE model has two parts, an encoder and a decoder. The encoder is used to convert the image into a low dimensional latent representation, which will serve as the input to the U-Net model. The decoder, conversely, transforms the latent representation back into an image.
During latent diffusion training, the encoder is used to get the latent representations (latents) of the images for the forward diffusion process, which applies more and more noise at each step. During inference, the denoised latents generated by the reverse diffusion process are converted back into images using the VAE decoder. When you run inference for Text-to-Image, there is no initial image as a starting point. You can skip this step and directly generate initial random noise.
When running Text-to-Image pipeline, we will see that we only need the VAE decoder.
VAE_OV_PATH = Path('model/vae_decoder.xml')
class VAEDecoderWrapper(torch.nn.Module):
def __init__(self, vae):
super().__init__()
self.vae = vae
def forward(self, latents):
return self.vae.decode(latents)
pipe.vae.eval()
vae_decoder = VAEDecoderWrapper(pipe.vae)
latents = torch.zeros((1, 4, 64, 64))
convert(vae_decoder, str(VAE_OV_PATH), latents)
Convert U-NET¶
U-Net model gradually denoises latent image representation guided by text encoder hidden state.
UNET_OV_PATH = Path('model/unet_ir.xml')
class UNETWrapper(torch.nn.Module):
def __init__(self, unet):
super().__init__()
self.unet = unet
def forward(self, sample=None, timestep=None, encoder_hidden_states=None, timestep_cond=None, text_embeds=None, time_ids=None):
return self.unet.forward(
sample,
timestep,
encoder_hidden_states,
timestep_cond=timestep_cond,
added_cond_kwargs={'text_embeds': text_embeds, 'time_ids': time_ids}
)
example_input = {
'sample': torch.rand([1, 4, 128, 128], dtype=torch.float32),
'timestep': torch.from_numpy(np.array(1, dtype=float)),
'encoder_hidden_states': torch.rand([1, 77, 2048], dtype=torch.float32),
'timestep_cond': torch.rand([1, 256], dtype=torch.float32),
'text_embeds': torch.rand([1, 1280], dtype=torch.float32),
'time_ids': torch.rand([1, 6], dtype=torch.float32),
}
pipe.unet.eval()
w_unet = UNETWrapper(pipe.unet)
convert(w_unet, UNET_OV_PATH, example_input)
Convert Encoders¶
The text-encoder is responsible for transforming the input prompt into an embedding space that can be understood by the U-Net. It is usually a simple transformer-based encoder that maps a sequence of input tokens to a sequence of latent text embeddings.
class EncoderWrapper(torch.nn.Module):
def __init__(self, encoder):
super().__init__()
self.encoder = encoder
def forward(
self,
input_ids=None,
output_hidden_states=None,
):
encoder_outputs = self.encoder(input_ids, output_hidden_states=output_hidden_states, return_dict=torch.tensor(True))
return encoder_outputs[0], list(encoder_outputs.hidden_states)
TEXT_ENCODER_1_OV_PATH = Path('model/text_encoder_1.xml')
TEXT_ENCODER_2_OV_PATH = Path('model/text_encoder_2.xml')
inputs = {
'input_ids': torch.ones((1, 77), dtype=torch.long),
'output_hidden_states': torch.tensor(True),
}
pipe.text_encoder.eval()
w_encoder = EncoderWrapper(pipe.text_encoder)
convert(w_encoder, TEXT_ENCODER_1_OV_PATH, inputs)
pipe.text_encoder_2.eval()
w_encoder = EncoderWrapper(pipe.text_encoder_2)
convert(w_encoder, TEXT_ENCODER_2_OV_PATH, inputs)
Select device from dropdown list for running inference using OpenVINO.
import ipywidgets as widgets
core = ov.Core()
device = widgets.Dropdown(
options=core.available_devices + ["AUTO"],
value='CPU',
description='Device:',
disabled=False,
)
device
Dropdown(description='Device:', options=('CPU', 'AUTO'), value='CPU')
compiled_unet = core.compile_model(UNET_OV_PATH, device.value)
compiled_text_encoder = core.compile_model(TEXT_ENCODER_1_OV_PATH, device.value)
compiled_text_encoder_2 = core.compile_model(TEXT_ENCODER_2_OV_PATH, device.value)
compiled_vae = core.compile_model(VAE_OV_PATH, device.value)
Let’s create callable wrapper classes for compiled models to allow
interaction with original DiffusionPipeline
class.
from collections import namedtuple
class EncoderWrapper:
dtype = torch.float32 # accessed in the original workflow
def __init__(self, encoder, orig_encoder):
self.encoder = encoder
self.modules = orig_encoder.modules # accessed in the original workflow
self.config = orig_encoder.config # accessed in the original workflow
def __call__(self, input_ids, **kwargs):
output_hidden_states = kwargs['output_hidden_states']
inputs = {
'input_ids': input_ids,
'output_hidden_states': output_hidden_states
}
output = self.encoder(inputs)
hidden_states = []
hidden_states_len = len(output)
for i in range(1, hidden_states_len):
hidden_states.append(torch.from_numpy(output[i]))
BaseModelOutputWithPooling = namedtuple("BaseModelOutputWithPooling", 'last_hidden_state hidden_states')
output = BaseModelOutputWithPooling(torch.from_numpy(output[0]), hidden_states)
return output
class UnetWrapper:
def __init__(self, unet, unet_orig):
self.unet = unet
self.config = unet_orig.config # accessed in the original workflow
self.add_embedding = unet_orig.add_embedding # accessed in the original workflow
def __call__(self, *args, **kwargs):
latent_model_input, t = args
inputs = {
'sample': latent_model_input,
'timestep': t,
'encoder_hidden_states': kwargs['encoder_hidden_states'],
'timestep_cond': kwargs['timestep_cond'],
'text_embeds': kwargs['added_cond_kwargs']['text_embeds'],
'time_ids': kwargs['added_cond_kwargs']['time_ids']
}
output = self.unet(inputs)
return torch.from_numpy(output[0])
class VAEWrapper:
dtype = torch.float32 # accessed in the original workflow
def __init__(self, vae, vae_orig):
self.vae = vae
self.config = vae_orig.config # accessed in the original workflow
def decode(self, latents, return_dict=False):
output = self.vae(latents)[0]
output = torch.from_numpy(output)
return [output]
And insert wrappers instances in the pipeline:
pipe.unet = UnetWrapper(compiled_unet,pipe.unet)
pipe.text_encoder = EncoderWrapper(compiled_text_encoder, pipe.text_encoder)
pipe.text_encoder_2 = EncoderWrapper(compiled_text_encoder_2, pipe.text_encoder_2)
pipe.vae = VAEWrapper(compiled_vae, pipe.vae)
image = pipe(prompt, num_inference_steps=4, guidance_scale=1.0).images[0]
image
0%| | 0/4 [00:00<?, ?it/s]
import gradio as gr
prompt = "An astronaut riding a green horse"
neg_prompt = "ugly, blurry, poor quality"
def generate_from_text(text_promt, neg_prompt, seed, num_steps):
result = pipe(text_promt, negative_prompt=neg_prompt, num_inference_steps=num_steps, guidance_scale=1.0, generator=torch.Generator().manual_seed(seed), height=1024, width=1024).images[0]
return result
with gr.Blocks() as demo:
with gr.Column():
positive_input = gr.Textbox(label="Text prompt")
neg_input = gr.Textbox(label="Negative prompt")
with gr.Row():
seed_input = gr.Slider(0, 10_000_000, value=42, label="Seed")
steps_input = gr.Slider(label="Steps", value=4, minimum=2, maximum=8, step=1)
btn = gr.Button()
out = gr.Image(label="Result", type="pil", width=1024)
btn.click(generate_from_text, [positive_input, neg_input, seed_input, steps_input], out)
gr.Examples([
[prompt, neg_prompt, 999, 4],
["underwater world coral reef, colorful jellyfish, 35mm, cinematic lighting, shallow depth of field, ultra quality, masterpiece, realistic", neg_prompt, 89, 4],
["a photo realistic happy white poodle dog playing in the grass, extremely detailed, high res, 8k, masterpiece, dynamic angle", neg_prompt, 1569, 4],
["Astronaut on Mars watching sunset, best quality, cinematic effects,", neg_prompt, 65245, 4],
["Black and white street photography of a rainy night in New York, reflections on wet pavement", neg_prompt, 48199, 4]
], [positive_input, neg_input, seed_input, steps_input])
try:
demo.queue().launch(debug=False)
except Exception:
demo.queue().launch(debug=False, share=True)
# if you are launching remotely, specify server_name and server_port
# demo.launch(server_name='your server name', server_port='server port in int')
# Read more in the docs: https://gradio.app/docs/