Text-to-speech generation using Bark and OpenVINO#
This Jupyter notebook can be launched after a local installation only.
🐶 Bark is a transformer-based text-to-audio model created by Suno. Bark can generate highly realistic, multilingual speech as well as other audio - including music, background noise and simple sound effects. The model can also produce nonverbal communications like laughing, sighing and crying.
With Bark, users can also produce nonverbal communications like laughing, sighing, and crying, making it a versatile tool for a variety of applications.
Bark is a cutting-edge text-to-speech (TTS) technology that has taken the AI world by storm. Unlike the typical TTS engines that sound robotic and mechanic, Bark offers human-like voices that are highly realistic and natural sounding. Bark uses GPT-style models to generate speech with minimal tweaking, producing highly expressive and emotive voices that can capture nuances such as tone, pitch, and rhythm. It offers a fantastic experience that can leave you wondering if you’re listening to human beings.
Notably, Bark supports multiple languages and can generate speech in Mandarin, French, Italian, Spanish, and other languages with impressive clarity and accuracy. With Bark, you can easily switch between languages and still enjoy high-quality sound effects.
Bark is not only intelligent but also intuitive, making it an ideal tool for individuals and businesses looking to create high-quality voice content for their platforms. Whether you’re looking to create podcasts, audiobooks, video game sounds, or any other form of voice content, Bark has you covered.
So, if you’re looking for a revolutionary text-to-speech technology that can elevate your voice content, Bark is the way to go! In this tutorial we consider how to convert and run bark with OpenVINO.
About model#
Bark uses GPT-style models to generate audio from scratch, but the initial text prompt is embedded into high-level semantic tokens without the use of phonemes. This allows Bark to generalize to arbitrary instructions beyond speech that occur in the training data, such as music lyrics, sound effects, or other non-speech sounds.
A subsequent second model is used to convert the generated semantic tokens into audio codec tokens to generate the full waveform. To enable the community to use Bark via public code, EnCodec codec from Facebook is used to act as an audio representation.
Table of contents:
Installation Instructions#
This is a self-contained example that relies solely on its own code.
We recommend running the notebook in a virtual environment. You only need a Jupyter server to start. For details, please refer to Installation Guide.
Prerequisites#
%pip install -q "torch" "torchvision" "torchaudio" --extra-index-url https://download.pytorch.org/whl/cpu
%pip install -q "openvino>=2023.1.0" "gradio>=4.19"
%pip install -q "git+https://github.com/suno-ai/bark.git" --extra-index-url https://download.pytorch.org/whl/cpu
import requests
r = requests.get(
url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/notebook_utils.py",
)
open("notebook_utils.py", "w").write(r.text)
Download and Convert models#
from pathlib import Path
from bark.generation import load_model, codec_decode, _flatten_codebooks
models_dir = Path("models")
models_dir.mkdir(exist_ok=True)
Text Encoder#
Text encoder is responsible for embedding initial text prompt into high-level semantic tokens. it uses tokenizer for conversion input text to token ids and predicts semantic text tokens that capture the meaning of the text. There are some differences between text encoder behavior on first step and others. It is the reason why we need to use separated models for that.
text_use_small = True
text_encoder = load_model(model_type="text", use_gpu=False, use_small=text_use_small, force_reload=False)
text_encoder_model = text_encoder["model"]
tokenizer = text_encoder["tokenizer"]
import torch
import openvino as ov
text_model_suffix = "_small" if text_use_small else ""
text_model_dir = models_dir / f"text_encoder{text_model_suffix}"
text_model_dir.mkdir(exist_ok=True)
text_encoder_path1 = text_model_dir / "bark_text_encoder_1.xml"
text_encoder_path0 = text_model_dir / "bark_text_encoder_0.xml"
class TextEncoderModel(torch.nn.Module):
def __init__(self, encoder):
super().__init__()
self.encoder = encoder
def forward(self, idx, past_kv=None):
return self.encoder(idx, merge_context=True, past_kv=past_kv, use_cache=True)
if not text_encoder_path0.exists() or not text_encoder_path1.exists():
text_encoder_exportable = TextEncoderModel(text_encoder_model)
ov_model = ov.convert_model(text_encoder_exportable, example_input=torch.ones((1, 513), dtype=torch.int64))
ov.save_model(ov_model, text_encoder_path0)
logits, kv_cache = text_encoder_exportable(torch.ones((1, 513), dtype=torch.int64))
ov_model = ov.convert_model(
text_encoder_exportable,
example_input=(torch.ones((1, 1), dtype=torch.int64), kv_cache),
)
ov.save_model(ov_model, text_encoder_path1)
del ov_model
del text_encoder_exportable
del text_encoder_model, text_encoder
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/bark/model.py:176: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
assert(idx.shape[1] >= 256+256+1)
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/bark/model.py:199: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
assert position_ids.shape == (1, t)
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/bark/model.py:172: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
assert t == 1
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/torch/jit/_trace.py:160: UserWarning: The .grad attribute of a Tensor that is not a leaf Tensor is being accessed. Its .grad attribute won't be populated during autograd.backward(). If you indeed want the .grad field to be populated for a non-leaf Tensor, use .retain_grad() on the non-leaf Tensor. If you access the non-leaf Tensor by mistake, make sure you access the leaf Tensor instead. See github.com/pytorch/pytorch/pull/30531 for more informations. (Triggered internally at aten/src/ATen/core/TensorBody.h:489.)
if a.grad is not None:
Coarse encoder#
Coarse encoder is a causal autoregressive transformer, that takes as input the results of the text encoder model. It aims at predicting the first two audio codebooks necessary for EnCodec. Coarse encoder is autoregressive model, it means that for making prediction on next step, it uses own output from previous step. For reducing model complexity and optimization, caching key and values for attention blocks can be used. past_key_values contains set of precomputed attention keys and values for each attention module in the model from previous step as they will be not changed from step to step and allow us calculate only update for the current step and join to previous. For avoiding to have separated model for first inference, where model does not have “past”, we will provide empty tensor on the first step.
coarse_use_small = True
coarse_model = load_model(
model_type="coarse",
use_gpu=False,
use_small=coarse_use_small,
force_reload=False,
)
coarse_model_suffix = "_small" if coarse_use_small else ""
coarse_model_dir = models_dir / f"coarse{coarse_model_suffix}"
coarse_model_dir.mkdir(exist_ok=True)
coarse_encoder_path = coarse_model_dir / "bark_coarse_encoder.xml"
import types
class CoarseEncoderModel(torch.nn.Module):
def __init__(self, encoder):
super().__init__()
self.encoder = encoder
def forward(self, idx, past_kv=None):
return self.encoder(idx, past_kv=past_kv, use_cache=True)
def casual_self_attention_forward(self, x, past_kv=None, use_cache=False):
B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)
# calculate query, key, values for all heads in batch and move head forward to be the batch dim
q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
past_len = 0
if past_kv is not None:
past_key = past_kv[0]
past_value = past_kv[1]
k = torch.cat((past_key, k), dim=-2)
v = torch.cat((past_value, v), dim=-2)
past_len = past_key.shape[-2]
FULL_T = k.shape[-2]
if use_cache is True:
present = (k, v)
else:
present = None
# causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
if self.flash:
# efficient attention using Flash Attention
full_attention_mask = torch.ones(
B,
T,
T,
device=x.device,
dtype=torch.float,
) * float("-inf")
full_attention_mask.triu_(diagonal=1)
if past_len > 0:
full_attention_mask = torch.cat(
(
torch.zeros(B, T, past_len, device=x.device),
full_attention_mask,
),
dim=-1,
)
y = torch.nn.functional.scaled_dot_product_attention(q, k, v, dropout_p=self.dropout, attn_mask=full_attention_mask)
else:
# manual implementation of attention
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
att = att.masked_fill(self.bias[:, :, FULL_T - T : FULL_T, :FULL_T] == 0, float("-inf"))
att = F.softmax(att, dim=-1)
att = self.attn_dropout(att)
y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side
# output projection
y = self.resid_dropout(self.c_proj(y))
return (y, present)
if not coarse_encoder_path.exists():
coarse_encoder_exportable = CoarseEncoderModel(coarse_model)
for block in coarse_encoder_exportable.encoder.transformer.h:
block.attn.forward = types.MethodType(casual_self_attention_forward, block.attn)
logits, kv_cache = coarse_encoder_exportable(torch.ones((1, 886), dtype=torch.int64))
ov_model = ov.convert_model(
coarse_encoder_exportable,
example_input=(torch.ones((1, 1), dtype=torch.int64), kv_cache),
)
ov.save_model(ov_model, coarse_encoder_path)
del ov_model
del coarse_encoder_exportable
del coarse_model
/tmp/ipykernel_1046533/1432960018.py:47: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
if past_len > 0:
fine_use_small = False
fine_model = load_model(model_type="fine", use_gpu=False, use_small=fine_use_small, force_reload=False)
fine_model_suffix = "_small" if fine_use_small else ""
fine_model_dir = models_dir / f"fine_model{fine_model_suffix}"
fine_model_dir.mkdir(exist_ok=True)
class FineModel(torch.nn.Module):
def __init__(self, model):
super().__init__()
self.model = model
def forward(self, pred_idx, idx):
b, t, codes = idx.size()
pos = torch.arange(0, t, dtype=torch.long).unsqueeze(0) # shape (1, t)
# forward the GPT model itself
tok_embs = [wte(idx[:, :, i]).unsqueeze(-1) for i, wte in enumerate(self.model.transformer.wtes)] # token embeddings of shape (b, t, n_embd)
tok_emb = torch.cat(tok_embs, dim=-1)
pos_emb = self.model.transformer.wpe(pos) # position embeddings of shape (1, t, n_embd)
x = tok_emb[:, :, :, : pred_idx + 1].sum(dim=-1)
x = self.model.transformer.drop(x + pos_emb)
for block in self.model.transformer.h:
x = block(x)
x = self.model.transformer.ln_f(x)
return x
fine_feature_extractor_path = fine_model_dir / "bark_fine_feature_extractor.xml"
Fine encoder#
Fine encoder is time a non-causal autoencoder transformer, which iteratively predicts the last codebooks based on the sum of the previous codebooks embeddings obtained using Coarse encoder.
if not fine_feature_extractor_path.exists():
lm_heads = fine_model.lm_heads
fine_feature_extractor = FineModel(fine_model)
feature_extractor_out = fine_feature_extractor(3, torch.zeros((1, 1024, 8), dtype=torch.int32))
ov_model = ov.convert_model(
fine_feature_extractor,
example_input=(
torch.ones(1, dtype=torch.long),
torch.zeros((1, 1024, 8), dtype=torch.long),
),
)
ov.save_model(ov_model, fine_feature_extractor_path)
for i, lm_head in enumerate(lm_heads):
ov.save_model(
ov.convert_model(lm_head, example_input=feature_extractor_out),
fine_model_dir / f"bark_fine_lm_{i}.xml",
)
Prepare Inference pipeline#
For better usability, classes for working with models provided below.
class OVBarkTextEncoder:
def __init__(self, core, device, model_path1, model_path2):
self.compiled_model1 = core.compile_model(model_path1, device)
self.compiled_model2 = core.compile_model(model_path2, device)
def __call__(self, input_ids, past_kv=None):
if past_kv is None:
outputs = self.compiled_model1(input_ids, share_outputs=True)
else:
outputs = self.compiled_model2([input_ids, *past_kv], share_outputs=True)
logits, kv_cache = self.postprocess_outputs(outputs, past_kv is None)
return logits, kv_cache
def postprocess_outputs(self, outs, is_first_stage):
net_outs = self.compiled_model1.outputs if is_first_stage else self.compiled_model2.outputs
logits = outs[net_outs[0]]
kv_cache = []
for out_tensor in net_outs[1:]:
kv_cache.append(outs[out_tensor])
return logits, kv_cache
class OVBarkEncoder:
def __init__(self, core, device, model_path):
self.compiled_model = core.compile_model(model_path, device)
def __call__(self, idx, past_kv=None):
if past_kv is None:
past_kv = self._init_past_kv()
outs = self.compiled_model([idx, *past_kv], share_outputs=True)
return self.postprocess_outputs(outs)
def postprocess_outputs(self, outs):
net_outs = self.compiled_model.outputs
logits = outs[net_outs[0]]
kv_cache = []
for out_tensor in net_outs[1:]:
kv_cache.append(outs[out_tensor])
return logits, kv_cache
def _init_past_kv(self):
inputs = []
for input_t in self.compiled_model.inputs[1:]:
input_shape = input_t.partial_shape
input_shape[0] = 1
input_shape[2] = 0
inputs.append(ov.Tensor(ov.Type.f32, input_shape.get_shape()))
return inputs
class OVBarkFineEncoder:
def __init__(self, core, device, model_dir, num_lm_heads=7):
self.feats_compiled_model = core.compile_model(model_dir / "bark_fine_feature_extractor.xml", device)
self.feats_out = self.feats_compiled_model.output(0)
lm_heads = []
for i in range(num_lm_heads):
lm_heads.append(core.compile_model(model_dir / f"bark_fine_lm_{i}.xml", device))
self.lm_heads = lm_heads
def __call__(self, pred_idx, idx):
feats = self.feats_compiled_model([ov.Tensor(pred_idx), ov.Tensor(idx)])[self.feats_out]
lm_id = pred_idx - 1
logits = self.lm_heads[int(lm_id)](feats)[0]
return logits
generate_audio
function is the main function for starting audio
generation process. It accepts input text and optionally history prompt,
provided by user and run inference pipeline. The inference pipeline
consists from several steps, illustrated on the diagram below:
Generation semantic tokens from input text using Text Encoder
Generation coarse acoustic codebooks from semantic tokens using Coarse Encoder
Generation fine acoustic codebooks from coarse codebooks using Fine Encoder
Decode codebooks to audio waveform
from typing import Optional, Union, Dict
import tqdm
import numpy as np
def generate_audio(
text: str,
history_prompt: Optional[Union[Dict, str]] = None,
text_temp: float = 0.7,
waveform_temp: float = 0.7,
silent: bool = False,
):
"""Generate audio array from input text.
Args:
text: text to be turned into audio
history_prompt: history choice for audio cloning
text_temp: generation temperature (1.0 more diverse, 0.0 more conservative)
waveform_temp: generation temperature (1.0 more diverse, 0.0 more conservative)
silent: disable progress bar
Returns:
numpy audio array at sample frequency 24khz
"""
semantic_tokens = text_to_semantic(
text,
history_prompt=history_prompt,
temp=text_temp,
silent=silent,
)
out = semantic_to_waveform(
semantic_tokens,
history_prompt=history_prompt,
temp=waveform_temp,
silent=silent,
)
return out
def text_to_semantic(
text: str,
history_prompt: Optional[Union[Dict, str]] = None,
temp: float = 0.7,
silent: bool = False,
):
"""Generate semantic array from text.
Args:
text: text to be turned into audio
history_prompt: history choice for audio cloning
temp: generation temperature (1.0 more diverse, 0.0 more conservative)
silent: disable progress bar
Returns:
numpy semantic array to be fed into `semantic_to_waveform`
"""
x_semantic = generate_text_semantic(
text,
history_prompt=history_prompt,
temp=temp,
silent=silent,
)
return x_semantic
from bark.generation import (
_load_history_prompt,
_tokenize,
_normalize_whitespace,
TEXT_PAD_TOKEN,
TEXT_ENCODING_OFFSET,
SEMANTIC_VOCAB_SIZE,
SEMANTIC_PAD_TOKEN,
SEMANTIC_INFER_TOKEN,
COARSE_RATE_HZ,
SEMANTIC_RATE_HZ,
N_COARSE_CODEBOOKS,
COARSE_INFER_TOKEN,
CODEBOOK_SIZE,
N_FINE_CODEBOOKS,
COARSE_SEMANTIC_PAD_TOKEN,
)
import torch.nn.functional as F
from typing import List, Optional, Union, Dict
def generate_text_semantic(
text: str,
history_prompt: List[str] = None,
temp: float = 0.7,
top_k: int = None,
top_p: float = None,
silent: bool = False,
min_eos_p: float = 0.2,
max_gen_duration_s: int = None,
allow_early_stop: bool = True,
):
"""
Generate semantic tokens from text.
Args:
text: text to be turned into audio
history_prompt: history choice for audio cloning
temp: generation temperature (1.0 more diverse, 0.0 more conservative)
top_k: top k number of probabilities for considering during generation
top_p: top probabilities higher than p for considering during generation
silent: disable progress bar
min_eos_p: minimum probability to select end of string token
max_gen_duration_s: maximum duration for generation in seconds
allow_early_stop: allow to stop generation if maximum duration is not reached
Returns:
numpy semantic array to be fed into `semantic_to_waveform`
"""
text = _normalize_whitespace(text)
if history_prompt is not None:
history_prompt = _load_history_prompt(history_prompt)
semantic_history = history_prompt["semantic_prompt"]
else:
semantic_history = None
encoded_text = np.ascontiguousarray(_tokenize(tokenizer, text)) + TEXT_ENCODING_OFFSET
if len(encoded_text) > 256:
p = round((len(encoded_text) - 256) / len(encoded_text) * 100, 1)
logger.warning(f"warning, text too long, lopping of last {p}%")
encoded_text = encoded_text[:256]
encoded_text = np.pad(
encoded_text,
(0, 256 - len(encoded_text)),
constant_values=TEXT_PAD_TOKEN,
mode="constant",
)
if semantic_history is not None:
semantic_history = semantic_history.astype(np.int64)
# lop off if history is too long, pad if needed
semantic_history = semantic_history[-256:]
semantic_history = np.pad(
semantic_history,
(0, 256 - len(semantic_history)),
constant_values=SEMANTIC_PAD_TOKEN,
mode="constant",
)
else:
semantic_history = np.array([SEMANTIC_PAD_TOKEN] * 256)
x = np.hstack([encoded_text, semantic_history, np.array([SEMANTIC_INFER_TOKEN])]).astype(np.int64)[None]
assert x.shape[1] == 256 + 256 + 1
n_tot_steps = 768
# custom tqdm updates since we don't know when eos will occur
pbar = tqdm.tqdm(disable=silent, total=100)
pbar_state = 0
tot_generated_duration_s = 0
kv_cache = None
for n in range(n_tot_steps):
if kv_cache is not None:
x_input = x[:, [-1]]
else:
x_input = x
logits, kv_cache = ov_text_model(ov.Tensor(x_input), kv_cache)
relevant_logits = logits[0, 0, :SEMANTIC_VOCAB_SIZE]
if allow_early_stop:
relevant_logits = np.hstack((relevant_logits, logits[0, 0, [SEMANTIC_PAD_TOKEN]])) # eos
if top_p is not None:
sorted_indices = np.argsort(relevant_logits)[::-1]
sorted_logits = relevant_logits[sorted_indices]
cumulative_probs = np.cumsum(F.softmax(sorted_logits))
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].copy()
sorted_indices_to_remove[0] = False
relevant_logits[sorted_indices[sorted_indices_to_remove]] = -np.inf
relevant_logits = torch.from_numpy(relevant_logits)
if top_k is not None:
relevant_logits = torch.from_numpy(relevant_logits)
v, _ = torch.topk(relevant_logits, min(top_k, relevant_logits.size(-1)))
relevant_logits[relevant_logits < v[-1]] = -float("Inf")
probs = F.softmax(torch.from_numpy(relevant_logits) / temp, dim=-1)
item_next = torch.multinomial(probs, num_samples=1)
if allow_early_stop and (item_next == SEMANTIC_VOCAB_SIZE or (min_eos_p is not None and probs[-1] >= min_eos_p)):
# eos found, so break
pbar.update(100 - pbar_state)
break
x = torch.cat((torch.from_numpy(x), item_next[None]), dim=1).numpy()
tot_generated_duration_s += 1 / SEMANTIC_RATE_HZ
if max_gen_duration_s is not None and tot_generated_duration_s > max_gen_duration_s:
pbar.update(100 - pbar_state)
break
if n == n_tot_steps - 1:
pbar.update(100 - pbar_state)
break
del logits, relevant_logits, probs, item_next
req_pbar_state = np.min([100, int(round(100 * n / n_tot_steps))])
if req_pbar_state > pbar_state:
pbar.update(req_pbar_state - pbar_state)
pbar_state = req_pbar_state
pbar.close()
out = x.squeeze()[256 + 256 + 1 :]
return out
def semantic_to_waveform(
semantic_tokens: np.ndarray,
history_prompt: Optional[Union[Dict, str]] = None,
temp: float = 0.7,
silent: bool = False,
):
"""Generate audio array from semantic input.
Args:
semantic_tokens: semantic token output from `text_to_semantic`
history_prompt: history choice for audio cloning
temp: generation temperature (1.0 more diverse, 0.0 more conservative)
silent: disable progress bar
Returns:
numpy audio array at sample frequency 24khz
"""
coarse_tokens = generate_coarse(
semantic_tokens,
history_prompt=history_prompt,
temp=temp,
silent=silent,
)
fine_tokens = generate_fine(
coarse_tokens,
history_prompt=history_prompt,
temp=0.5,
)
audio_arr = codec_decode(fine_tokens)
return audio_arr
def generate_coarse(
x_semantic: np.ndarray,
history_prompt: Optional[Union[Dict, str]] = None,
temp: float = 0.7,
top_k: int = None,
top_p: float = None,
silent: bool = False,
max_coarse_history: int = 630, # min 60 (faster), max 630 (more context)
sliding_window_len: int = 60,
):
"""
Generate coarse audio codes from semantic tokens.
Args:
x_semantic: semantic token output from `text_to_semantic`
history_prompt: history prompt, will be prepened to generated if provided
temp: generation temperature (1.0 more diverse, 0.0 more conservative)
top_k: top k number of probabilities for considering during generation
top_p: top probabilities higher than p for considering during generation
silent: disable progress bar
max_coarse_history: threshold for cutting coarse history (minimum 60 for faster generation, maximum 630 for more context)
sliding_window_len: size of sliding window for generation cycle
Returns:
numpy audio array with coarse audio codes
"""
semantic_to_coarse_ratio = COARSE_RATE_HZ / SEMANTIC_RATE_HZ * N_COARSE_CODEBOOKS
max_semantic_history = int(np.floor(max_coarse_history / semantic_to_coarse_ratio))
if history_prompt is not None:
history_prompt = _load_history_prompt(history_prompt)
x_semantic_history = history_prompt["semantic_prompt"]
x_coarse_history = history_prompt["coarse_prompt"]
x_coarse_history = _flatten_codebooks(x_coarse_history) + SEMANTIC_VOCAB_SIZE
# trim histories correctly
n_semantic_hist_provided = np.min(
[
max_semantic_history,
len(x_semantic_history) - len(x_semantic_history) % 2,
int(np.floor(len(x_coarse_history) / semantic_to_coarse_ratio)),
]
)
n_coarse_hist_provided = int(round(n_semantic_hist_provided * semantic_to_coarse_ratio))
x_semantic_history = x_semantic_history[-n_semantic_hist_provided:].astype(np.int32)
x_coarse_history = x_coarse_history[-n_coarse_hist_provided:].astype(np.int32)
x_coarse_history = x_coarse_history[:-2]
else:
x_semantic_history = np.array([], dtype=np.int32)
x_coarse_history = np.array([], dtype=np.int32)
# start loop
n_steps = int(round(np.floor(len(x_semantic) * semantic_to_coarse_ratio / N_COARSE_CODEBOOKS) * N_COARSE_CODEBOOKS))
x_semantic = np.hstack([x_semantic_history, x_semantic]).astype(np.int32)
x_coarse = x_coarse_history.astype(np.int32)
base_semantic_idx = len(x_semantic_history)
x_semantic_in = x_semantic[None]
x_coarse_in = x_coarse[None]
n_window_steps = int(np.ceil(n_steps / sliding_window_len))
n_step = 0
for _ in tqdm.tqdm(range(n_window_steps), total=n_window_steps, disable=silent):
semantic_idx = base_semantic_idx + int(round(n_step / semantic_to_coarse_ratio))
# pad from right side
x_in = x_semantic_in[:, np.max([0, semantic_idx - max_semantic_history]) :]
x_in = x_in[:, :256]
x_in = F.pad(
torch.from_numpy(x_in),
(0, 256 - x_in.shape[-1]),
"constant",
COARSE_SEMANTIC_PAD_TOKEN,
)
x_in = torch.hstack(
[
x_in,
torch.tensor([COARSE_INFER_TOKEN])[None],
torch.from_numpy(x_coarse_in[:, -max_coarse_history:]),
]
).numpy()
kv_cache = None
for _ in range(sliding_window_len):
if n_step >= n_steps:
continue
is_major_step = n_step % N_COARSE_CODEBOOKS == 0
if kv_cache is not None:
x_input = x_in[:, [-1]]
else:
x_input = x_in
logits, kv_cache = ov_coarse_model(x_input, past_kv=kv_cache)
logit_start_idx = SEMANTIC_VOCAB_SIZE + (1 - int(is_major_step)) * CODEBOOK_SIZE
logit_end_idx = SEMANTIC_VOCAB_SIZE + (2 - int(is_major_step)) * CODEBOOK_SIZE
relevant_logits = logits[0, 0, logit_start_idx:logit_end_idx]
if top_p is not None:
sorted_indices = np.argsort(relevant_logits)[::-1]
sorted_logits = relevant_logits[sorted_indices]
cumulative_probs = np.cumsum(F.softmax(sorted_logits))
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].copy()
sorted_indices_to_remove[0] = False
relevant_logits[sorted_indices[sorted_indices_to_remove]] = -np.inf
relevant_logits = torch.from_numpy(relevant_logits)
if top_k is not None:
relevant_logits = torch.from_numpy(relevant_logits)
v, _ = torch.topk(relevant_logits, min(top_k, relevant_logits.size(-1)))
relevant_logits[relevant_logits < v[-1]] = -float("Inf")
probs = F.softmax(torch.from_numpy(relevant_logits) / temp, dim=-1)
item_next = torch.multinomial(probs, num_samples=1)
item_next = item_next
item_next += logit_start_idx
x_coarse_in = torch.cat((torch.from_numpy(x_coarse_in), item_next[None]), dim=1).numpy()
x_in = torch.cat((torch.from_numpy(x_in), item_next[None]), dim=1).numpy()
del logits, relevant_logits, probs, item_next
n_step += 1
del x_in
del x_semantic_in
gen_coarse_arr = x_coarse_in.squeeze()[len(x_coarse_history) :]
del x_coarse_in
gen_coarse_audio_arr = gen_coarse_arr.reshape(-1, N_COARSE_CODEBOOKS).T - SEMANTIC_VOCAB_SIZE
for n in range(1, N_COARSE_CODEBOOKS):
gen_coarse_audio_arr[n, :] -= n * CODEBOOK_SIZE
return gen_coarse_audio_arr
def generate_fine(
x_coarse_gen: np.ndarray,
history_prompt: Optional[Union[Dict, str]] = None,
temp: float = 0.5,
silent: bool = True,
):
"""
Generate full audio codes from coarse audio codes.
Args:
x_coarse_gen: generated coarse codebooks from `generate_coarse`
history_prompt: history prompt, will be prepended to generated
temp: generation temperature (1.0 more diverse, 0.0 more conservative)
silent: disable progress bar
Returns:
numpy audio array with coarse audio codes
"""
if history_prompt is not None:
history_prompt = _load_history_prompt(history_prompt)
x_fine_history = history_prompt["fine_prompt"]
else:
x_fine_history = None
n_coarse = x_coarse_gen.shape[0]
# make input arr
in_arr = np.vstack(
[
x_coarse_gen,
np.zeros((N_FINE_CODEBOOKS - n_coarse, x_coarse_gen.shape[1])) + CODEBOOK_SIZE,
]
).astype(
np.int32
) # padding
# prepend history if available (max 512)
if x_fine_history is not None:
x_fine_history = x_fine_history.astype(np.int32)
in_arr = np.hstack([x_fine_history[:, -512:].astype(np.int32), in_arr])
n_history = x_fine_history[:, -512:].shape[1]
else:
n_history = 0
n_remove_from_end = 0
# need to pad if too short (since non-causal model)
if in_arr.shape[1] < 1024:
n_remove_from_end = 1024 - in_arr.shape[1]
in_arr = np.hstack(
[
in_arr,
np.zeros((N_FINE_CODEBOOKS, n_remove_from_end), dtype=np.int32) + CODEBOOK_SIZE,
]
)
n_loops = np.max([0, int(np.ceil((x_coarse_gen.shape[1] - (1024 - n_history)) / 512))]) + 1
in_arr = in_arr.T
for n in tqdm.tqdm(range(n_loops), disable=silent):
start_idx = np.min([n * 512, in_arr.shape[0] - 1024])
start_fill_idx = np.min([n_history + n * 512, in_arr.shape[0] - 512])
rel_start_fill_idx = start_fill_idx - start_idx
in_buffer = in_arr[start_idx : start_idx + 1024, :][None]
for nn in range(n_coarse, N_FINE_CODEBOOKS):
logits = ov_fine_model(np.array([nn]).astype(np.int64), in_buffer.astype(np.int64))
if temp is None:
relevant_logits = logits[0, rel_start_fill_idx:, :CODEBOOK_SIZE]
codebook_preds = torch.argmax(relevant_logits, -1)
else:
relevant_logits = logits[0, :, :CODEBOOK_SIZE] / temp
probs = F.softmax(torch.from_numpy(relevant_logits), dim=-1)
codebook_preds = torch.hstack([torch.multinomial(probs[nnn], num_samples=1) for nnn in range(rel_start_fill_idx, 1024)])
in_buffer[0, rel_start_fill_idx:, nn] = codebook_preds.numpy()
del logits, codebook_preds
for nn in range(n_coarse, N_FINE_CODEBOOKS):
in_arr[start_fill_idx : start_fill_idx + (1024 - rel_start_fill_idx), nn] = in_buffer[0, rel_start_fill_idx:, nn]
del in_buffer
gen_fine_arr = in_arr.squeeze().T
del in_arr
gen_fine_arr = gen_fine_arr[:, n_history:]
if n_remove_from_end > 0:
gen_fine_arr = gen_fine_arr[:, :-n_remove_from_end]
return gen_fine_arr
Run model inference#
Now is time to see model in action. We need only wrap our models to
classes and run generate_audio
function.
Select Inference device#
select device from dropdown list for running inference using OpenVINO
from notebook_utils import device_widget
device = device_widget()
device
Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')
core = ov.Core()
ov_text_model = OVBarkTextEncoder(core, device.value, text_encoder_path0, text_encoder_path1)
ov_coarse_model = OVBarkEncoder(core, device.value, coarse_encoder_path)
ov_fine_model = OVBarkFineEncoder(core, device.value, fine_model_dir)
import time
from bark import SAMPLE_RATE
torch.manual_seed(42)
t0 = time.time()
text = "Hello, my name is Suno. And, uh — and I like banana and apples. [laughs] But I also have other interests such as playing tic tac toe."
audio_array = generate_audio(text)
generation_duration_s = time.time() - t0
audio_duration_s = audio_array.shape[0] / SAMPLE_RATE
print(f"took {generation_duration_s:.0f}s to generate {audio_duration_s:.0f}s of audio")
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:10<00:00, 9.63it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 32/32 [00:37<00:00, 1.17s/it]
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/torch/nn/utils/weight_norm.py:30: UserWarning: torch.nn.utils.weight_norm is deprecated in favor of torch.nn.utils.parametrizations.weight_norm.
warnings.warn("torch.nn.utils.weight_norm is deprecated in favor of torch.nn.utils.parametrizations.weight_norm.")
took 53s to generate 13s of audio
from IPython.display import Audio
from bark import SAMPLE_RATE
Audio(audio_array, rate=SAMPLE_RATE)
Interactive demo#
import numpy as np
from bark import SAMPLE_RATE
from bark.generation import SUPPORTED_LANGS
AVAILABLE_PROMPTS = ["Unconditional", "Announcer"]
PROMPT_LOOKUP = {}
for _, lang in SUPPORTED_LANGS:
for n in range(10):
label = f"Speaker {n} ({lang})"
AVAILABLE_PROMPTS.append(label)
PROMPT_LOOKUP[label] = f"{lang}_speaker_{n}"
PROMPT_LOOKUP["Unconditional"] = None
PROMPT_LOOKUP["Announcer"] = "announcer"
def gen_tts(text, history_prompt):
history_prompt = PROMPT_LOOKUP[history_prompt]
audio_arr = generate_audio(text, history_prompt=history_prompt)
audio_arr = (audio_arr * 32767).astype(np.int16)
return (SAMPLE_RATE, audio_arr)
import requests
if not Path("gradio_helper.py").exists():
r = requests.get(url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/notebooks/bark-text-to-audio/gradio_helper.py")
open("gradio_helper.py", "w").write(r.text)
from gradio_helper import make_demo
demo = make_demo(fn=gen_tts, available_prompts=AVAILABLE_PROMPTS)
try:
demo.launch(debug=False)
except Exception:
demo.launch(share=True, debug=False)
# if you are launching remotely, specify server_name and server_port
# demo.launch(server_name='your server name', server_port='server port in int')
# Read more in the docs: https://gradio.app/docs/