Text-to-speech generation using Bark and OpenVINO#

This Jupyter notebook can be launched after a local installation only.

Github

🐶 Bark is a transformer-based text-to-audio model created by Suno. Bark can generate highly realistic, multilingual speech as well as other audio - including music, background noise and simple sound effects. The model can also produce nonverbal communications like laughing, sighing and crying.

With Bark, users can also produce nonverbal communications like laughing, sighing, and crying, making it a versatile tool for a variety of applications.

image.png

image.png#

Bark is a cutting-edge text-to-speech (TTS) technology that has taken the AI world by storm. Unlike the typical TTS engines that sound robotic and mechanic, Bark offers human-like voices that are highly realistic and natural sounding. Bark uses GPT-style models to generate speech with minimal tweaking, producing highly expressive and emotive voices that can capture nuances such as tone, pitch, and rhythm. It offers a fantastic experience that can leave you wondering if you’re listening to human beings.

Notably, Bark supports multiple languages and can generate speech in Mandarin, French, Italian, Spanish, and other languages with impressive clarity and accuracy. With Bark, you can easily switch between languages and still enjoy high-quality sound effects.

Bark is not only intelligent but also intuitive, making it an ideal tool for individuals and businesses looking to create high-quality voice content for their platforms. Whether you’re looking to create podcasts, audiobooks, video game sounds, or any other form of voice content, Bark has you covered.

So, if you’re looking for a revolutionary text-to-speech technology that can elevate your voice content, Bark is the way to go! In this tutorial we consider how to convert and run bark with OpenVINO.

About model#

Bark uses GPT-style models to generate audio from scratch, but the initial text prompt is embedded into high-level semantic tokens without the use of phonemes. This allows Bark to generalize to arbitrary instructions beyond speech that occur in the training data, such as music lyrics, sound effects, or other non-speech sounds.

A subsequent second model is used to convert the generated semantic tokens into audio codec tokens to generate the full waveform. To enable the community to use Bark via public code, EnCodec codec from Facebook is used to act as an audio representation.

Table of contents:

Installation Instructions#

This is a self-contained example that relies solely on its own code.

We recommend running the notebook in a virtual environment. You only need a Jupyter server to start. For details, please refer to Installation Guide.

Prerequisites#

%pip install -q "torch" "torchvision" "torchaudio" --extra-index-url https://download.pytorch.org/whl/cpu
%pip install -q "openvino>=2023.1.0" "gradio>=4.19"
%pip install -q "git+https://github.com/suno-ai/bark.git" --extra-index-url https://download.pytorch.org/whl/cpu
import requests

r = requests.get(
    url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/notebook_utils.py",
)
open("notebook_utils.py", "w").write(r.text)

Download and Convert models#

from pathlib import Path
from bark.generation import load_model, codec_decode, _flatten_codebooks

models_dir = Path("models")
models_dir.mkdir(exist_ok=True)

Text Encoder#

Text encoder is responsible for embedding initial text prompt into high-level semantic tokens. it uses tokenizer for conversion input text to token ids and predicts semantic text tokens that capture the meaning of the text. There are some differences between text encoder behavior on first step and others. It is the reason why we need to use separated models for that.

text_use_small = True

text_encoder = load_model(model_type="text", use_gpu=False, use_small=text_use_small, force_reload=False)

text_encoder_model = text_encoder["model"]
tokenizer = text_encoder["tokenizer"]
import torch
import openvino as ov

text_model_suffix = "_small" if text_use_small else ""
text_model_dir = models_dir / f"text_encoder{text_model_suffix}"
text_model_dir.mkdir(exist_ok=True)
text_encoder_path1 = text_model_dir / "bark_text_encoder_1.xml"
text_encoder_path0 = text_model_dir / "bark_text_encoder_0.xml"
class TextEncoderModel(torch.nn.Module):
    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder

    def forward(self, idx, past_kv=None):
        return self.encoder(idx, merge_context=True, past_kv=past_kv, use_cache=True)


if not text_encoder_path0.exists() or not text_encoder_path1.exists():
    text_encoder_exportable = TextEncoderModel(text_encoder_model)
    ov_model = ov.convert_model(text_encoder_exportable, example_input=torch.ones((1, 513), dtype=torch.int64))
    ov.save_model(ov_model, text_encoder_path0)
    logits, kv_cache = text_encoder_exportable(torch.ones((1, 513), dtype=torch.int64))
    ov_model = ov.convert_model(
        text_encoder_exportable,
        example_input=(torch.ones((1, 1), dtype=torch.int64), kv_cache),
    )
    ov.save_model(ov_model, text_encoder_path1)
    del ov_model
    del text_encoder_exportable
del text_encoder_model, text_encoder
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/bark/model.py:176: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
  assert(idx.shape[1] >= 256+256+1)
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/bark/model.py:199: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
  assert position_ids.shape == (1, t)
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/bark/model.py:172: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
  assert t == 1
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/torch/jit/_trace.py:160: UserWarning: The .grad attribute of a Tensor that is not a leaf Tensor is being accessed. Its .grad attribute won't be populated during autograd.backward(). If you indeed want the .grad field to be populated for a non-leaf Tensor, use .retain_grad() on the non-leaf Tensor. If you access the non-leaf Tensor by mistake, make sure you access the leaf Tensor instead. See github.com/pytorch/pytorch/pull/30531 for more informations. (Triggered internally at aten/src/ATen/core/TensorBody.h:489.)
  if a.grad is not None:

Coarse encoder#

Coarse encoder is a causal autoregressive transformer, that takes as input the results of the text encoder model. It aims at predicting the first two audio codebooks necessary for EnCodec. Coarse encoder is autoregressive model, it means that for making prediction on next step, it uses own output from previous step. For reducing model complexity and optimization, caching key and values for attention blocks can be used. past_key_values contains set of precomputed attention keys and values for each attention module in the model from previous step as they will be not changed from step to step and allow us calculate only update for the current step and join to previous. For avoiding to have separated model for first inference, where model does not have “past”, we will provide empty tensor on the first step.

coarse_use_small = True

coarse_model = load_model(
    model_type="coarse",
    use_gpu=False,
    use_small=coarse_use_small,
    force_reload=False,
)

coarse_model_suffix = "_small" if coarse_use_small else ""
coarse_model_dir = models_dir / f"coarse{coarse_model_suffix}"
coarse_model_dir.mkdir(exist_ok=True)
coarse_encoder_path = coarse_model_dir / "bark_coarse_encoder.xml"
import types


class CoarseEncoderModel(torch.nn.Module):
    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder

    def forward(self, idx, past_kv=None):
        return self.encoder(idx, past_kv=past_kv, use_cache=True)


def casual_self_attention_forward(self, x, past_kv=None, use_cache=False):
    B, T, C = x.size()  # batch size, sequence length, embedding dimensionality (n_embd)

    # calculate query, key, values for all heads in batch and move head forward to be the batch dim
    q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
    k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)  # (B, nh, T, hs)
    q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)  # (B, nh, T, hs)
    v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)  # (B, nh, T, hs)
    past_len = 0

    if past_kv is not None:
        past_key = past_kv[0]
        past_value = past_kv[1]
        k = torch.cat((past_key, k), dim=-2)
        v = torch.cat((past_value, v), dim=-2)
        past_len = past_key.shape[-2]

    FULL_T = k.shape[-2]

    if use_cache is True:
        present = (k, v)
    else:
        present = None

    # causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
    if self.flash:
        # efficient attention using Flash Attention
        full_attention_mask = torch.ones(
            B,
            T,
            T,
            device=x.device,
            dtype=torch.float,
        ) * float("-inf")
        full_attention_mask.triu_(diagonal=1)
        if past_len > 0:
            full_attention_mask = torch.cat(
                (
                    torch.zeros(B, T, past_len, device=x.device),
                    full_attention_mask,
                ),
                dim=-1,
            )

        y = torch.nn.functional.scaled_dot_product_attention(q, k, v, dropout_p=self.dropout, attn_mask=full_attention_mask)
    else:
        # manual implementation of attention
        att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
        att = att.masked_fill(self.bias[:, :, FULL_T - T : FULL_T, :FULL_T] == 0, float("-inf"))
        att = F.softmax(att, dim=-1)
        att = self.attn_dropout(att)
        y = att @ v  # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
    y = y.transpose(1, 2).contiguous().view(B, T, C)  # re-assemble all head outputs side by side

    # output projection
    y = self.resid_dropout(self.c_proj(y))
    return (y, present)


if not coarse_encoder_path.exists():
    coarse_encoder_exportable = CoarseEncoderModel(coarse_model)
    for block in coarse_encoder_exportable.encoder.transformer.h:
        block.attn.forward = types.MethodType(casual_self_attention_forward, block.attn)
    logits, kv_cache = coarse_encoder_exportable(torch.ones((1, 886), dtype=torch.int64))
    ov_model = ov.convert_model(
        coarse_encoder_exportable,
        example_input=(torch.ones((1, 1), dtype=torch.int64), kv_cache),
    )
    ov.save_model(ov_model, coarse_encoder_path)
    del ov_model
    del coarse_encoder_exportable
del coarse_model
/tmp/ipykernel_1046533/1432960018.py:47: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!
  if past_len > 0:
fine_use_small = False

fine_model = load_model(model_type="fine", use_gpu=False, use_small=fine_use_small, force_reload=False)

fine_model_suffix = "_small" if fine_use_small else ""
fine_model_dir = models_dir / f"fine_model{fine_model_suffix}"
fine_model_dir.mkdir(exist_ok=True)
class FineModel(torch.nn.Module):
    def __init__(self, model):
        super().__init__()
        self.model = model

    def forward(self, pred_idx, idx):
        b, t, codes = idx.size()
        pos = torch.arange(0, t, dtype=torch.long).unsqueeze(0)  # shape (1, t)

        # forward the GPT model itself
        tok_embs = [wte(idx[:, :, i]).unsqueeze(-1) for i, wte in enumerate(self.model.transformer.wtes)]  # token embeddings of shape (b, t, n_embd)
        tok_emb = torch.cat(tok_embs, dim=-1)
        pos_emb = self.model.transformer.wpe(pos)  # position embeddings of shape (1, t, n_embd)
        x = tok_emb[:, :, :, : pred_idx + 1].sum(dim=-1)
        x = self.model.transformer.drop(x + pos_emb)
        for block in self.model.transformer.h:
            x = block(x)
        x = self.model.transformer.ln_f(x)
        return x


fine_feature_extractor_path = fine_model_dir / "bark_fine_feature_extractor.xml"

Fine encoder#

Fine encoder is time a non-causal autoencoder transformer, which iteratively predicts the last codebooks based on the sum of the previous codebooks embeddings obtained using Coarse encoder.

if not fine_feature_extractor_path.exists():
    lm_heads = fine_model.lm_heads
    fine_feature_extractor = FineModel(fine_model)
    feature_extractor_out = fine_feature_extractor(3, torch.zeros((1, 1024, 8), dtype=torch.int32))
    ov_model = ov.convert_model(
        fine_feature_extractor,
        example_input=(
            torch.ones(1, dtype=torch.long),
            torch.zeros((1, 1024, 8), dtype=torch.long),
        ),
    )
    ov.save_model(ov_model, fine_feature_extractor_path)
    for i, lm_head in enumerate(lm_heads):
        ov.save_model(
            ov.convert_model(lm_head, example_input=feature_extractor_out),
            fine_model_dir / f"bark_fine_lm_{i}.xml",
        )

Prepare Inference pipeline#

For better usability, classes for working with models provided below.

class OVBarkTextEncoder:
    def __init__(self, core, device, model_path1, model_path2):
        self.compiled_model1 = core.compile_model(model_path1, device)
        self.compiled_model2 = core.compile_model(model_path2, device)

    def __call__(self, input_ids, past_kv=None):
        if past_kv is None:
            outputs = self.compiled_model1(input_ids, share_outputs=True)
        else:
            outputs = self.compiled_model2([input_ids, *past_kv], share_outputs=True)
        logits, kv_cache = self.postprocess_outputs(outputs, past_kv is None)
        return logits, kv_cache

    def postprocess_outputs(self, outs, is_first_stage):
        net_outs = self.compiled_model1.outputs if is_first_stage else self.compiled_model2.outputs
        logits = outs[net_outs[0]]
        kv_cache = []
        for out_tensor in net_outs[1:]:
            kv_cache.append(outs[out_tensor])
        return logits, kv_cache


class OVBarkEncoder:
    def __init__(self, core, device, model_path):
        self.compiled_model = core.compile_model(model_path, device)

    def __call__(self, idx, past_kv=None):
        if past_kv is None:
            past_kv = self._init_past_kv()
        outs = self.compiled_model([idx, *past_kv], share_outputs=True)
        return self.postprocess_outputs(outs)

    def postprocess_outputs(self, outs):
        net_outs = self.compiled_model.outputs
        logits = outs[net_outs[0]]
        kv_cache = []
        for out_tensor in net_outs[1:]:
            kv_cache.append(outs[out_tensor])
        return logits, kv_cache

    def _init_past_kv(self):
        inputs = []
        for input_t in self.compiled_model.inputs[1:]:
            input_shape = input_t.partial_shape
            input_shape[0] = 1
            input_shape[2] = 0
            inputs.append(ov.Tensor(ov.Type.f32, input_shape.get_shape()))
        return inputs


class OVBarkFineEncoder:
    def __init__(self, core, device, model_dir, num_lm_heads=7):
        self.feats_compiled_model = core.compile_model(model_dir / "bark_fine_feature_extractor.xml", device)
        self.feats_out = self.feats_compiled_model.output(0)
        lm_heads = []
        for i in range(num_lm_heads):
            lm_heads.append(core.compile_model(model_dir / f"bark_fine_lm_{i}.xml", device))
        self.lm_heads = lm_heads

    def __call__(self, pred_idx, idx):
        feats = self.feats_compiled_model([ov.Tensor(pred_idx), ov.Tensor(idx)])[self.feats_out]
        lm_id = pred_idx - 1
        logits = self.lm_heads[int(lm_id)](feats)[0]
        return logits

generate_audio function is the main function for starting audio generation process. It accepts input text and optionally history prompt, provided by user and run inference pipeline. The inference pipeline consists from several steps, illustrated on the diagram below:

bark_pipeline

bark_pipeline#

  1. Generation semantic tokens from input text using Text Encoder

  2. Generation coarse acoustic codebooks from semantic tokens using Coarse Encoder

  3. Generation fine acoustic codebooks from coarse codebooks using Fine Encoder

  4. Decode codebooks to audio waveform

from typing import Optional, Union, Dict
import tqdm
import numpy as np


def generate_audio(
    text: str,
    history_prompt: Optional[Union[Dict, str]] = None,
    text_temp: float = 0.7,
    waveform_temp: float = 0.7,
    silent: bool = False,
):
    """Generate audio array from input text.

    Args:
        text: text to be turned into audio
        history_prompt: history choice for audio cloning
        text_temp: generation temperature (1.0 more diverse, 0.0 more conservative)
        waveform_temp: generation temperature (1.0 more diverse, 0.0 more conservative)
        silent: disable progress bar

    Returns:
        numpy audio array at sample frequency 24khz
    """
    semantic_tokens = text_to_semantic(
        text,
        history_prompt=history_prompt,
        temp=text_temp,
        silent=silent,
    )
    out = semantic_to_waveform(
        semantic_tokens,
        history_prompt=history_prompt,
        temp=waveform_temp,
        silent=silent,
    )
    return out
def text_to_semantic(
    text: str,
    history_prompt: Optional[Union[Dict, str]] = None,
    temp: float = 0.7,
    silent: bool = False,
):
    """Generate semantic array from text.

    Args:
        text: text to be turned into audio
        history_prompt: history choice for audio cloning
        temp: generation temperature (1.0 more diverse, 0.0 more conservative)
        silent: disable progress bar

    Returns:
        numpy semantic array to be fed into `semantic_to_waveform`
    """
    x_semantic = generate_text_semantic(
        text,
        history_prompt=history_prompt,
        temp=temp,
        silent=silent,
    )
    return x_semantic
from bark.generation import (
    _load_history_prompt,
    _tokenize,
    _normalize_whitespace,
    TEXT_PAD_TOKEN,
    TEXT_ENCODING_OFFSET,
    SEMANTIC_VOCAB_SIZE,
    SEMANTIC_PAD_TOKEN,
    SEMANTIC_INFER_TOKEN,
    COARSE_RATE_HZ,
    SEMANTIC_RATE_HZ,
    N_COARSE_CODEBOOKS,
    COARSE_INFER_TOKEN,
    CODEBOOK_SIZE,
    N_FINE_CODEBOOKS,
    COARSE_SEMANTIC_PAD_TOKEN,
)
import torch.nn.functional as F
from typing import List, Optional, Union, Dict


def generate_text_semantic(
    text: str,
    history_prompt: List[str] = None,
    temp: float = 0.7,
    top_k: int = None,
    top_p: float = None,
    silent: bool = False,
    min_eos_p: float = 0.2,
    max_gen_duration_s: int = None,
    allow_early_stop: bool = True,
):
    """
    Generate semantic tokens from text.
    Args:
        text: text to be turned into audio
        history_prompt: history choice for audio cloning
        temp: generation temperature (1.0 more diverse, 0.0 more conservative)
        top_k: top k number of probabilities for considering during generation
        top_p: top probabilities higher than p for considering during generation
        silent: disable progress bar
        min_eos_p: minimum probability to select end of string token
        max_gen_duration_s: maximum duration for generation in seconds
        allow_early_stop: allow to stop generation if maximum duration is not reached
    Returns:
        numpy semantic array to be fed into `semantic_to_waveform`

    """
    text = _normalize_whitespace(text)
    if history_prompt is not None:
        history_prompt = _load_history_prompt(history_prompt)
        semantic_history = history_prompt["semantic_prompt"]
    else:
        semantic_history = None
    encoded_text = np.ascontiguousarray(_tokenize(tokenizer, text)) + TEXT_ENCODING_OFFSET
    if len(encoded_text) > 256:
        p = round((len(encoded_text) - 256) / len(encoded_text) * 100, 1)
        logger.warning(f"warning, text too long, lopping of last {p}%")
        encoded_text = encoded_text[:256]
    encoded_text = np.pad(
        encoded_text,
        (0, 256 - len(encoded_text)),
        constant_values=TEXT_PAD_TOKEN,
        mode="constant",
    )
    if semantic_history is not None:
        semantic_history = semantic_history.astype(np.int64)
        # lop off if history is too long, pad if needed
        semantic_history = semantic_history[-256:]
        semantic_history = np.pad(
            semantic_history,
            (0, 256 - len(semantic_history)),
            constant_values=SEMANTIC_PAD_TOKEN,
            mode="constant",
        )
    else:
        semantic_history = np.array([SEMANTIC_PAD_TOKEN] * 256)
    x = np.hstack([encoded_text, semantic_history, np.array([SEMANTIC_INFER_TOKEN])]).astype(np.int64)[None]
    assert x.shape[1] == 256 + 256 + 1
    n_tot_steps = 768
    # custom tqdm updates since we don't know when eos will occur
    pbar = tqdm.tqdm(disable=silent, total=100)
    pbar_state = 0
    tot_generated_duration_s = 0
    kv_cache = None
    for n in range(n_tot_steps):
        if kv_cache is not None:
            x_input = x[:, [-1]]
        else:
            x_input = x
        logits, kv_cache = ov_text_model(ov.Tensor(x_input), kv_cache)
        relevant_logits = logits[0, 0, :SEMANTIC_VOCAB_SIZE]
        if allow_early_stop:
            relevant_logits = np.hstack((relevant_logits, logits[0, 0, [SEMANTIC_PAD_TOKEN]]))  # eos
        if top_p is not None:
            sorted_indices = np.argsort(relevant_logits)[::-1]
            sorted_logits = relevant_logits[sorted_indices]
            cumulative_probs = np.cumsum(F.softmax(sorted_logits))
            sorted_indices_to_remove = cumulative_probs > top_p
            sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].copy()
            sorted_indices_to_remove[0] = False
            relevant_logits[sorted_indices[sorted_indices_to_remove]] = -np.inf
            relevant_logits = torch.from_numpy(relevant_logits)
        if top_k is not None:
            relevant_logits = torch.from_numpy(relevant_logits)
            v, _ = torch.topk(relevant_logits, min(top_k, relevant_logits.size(-1)))
            relevant_logits[relevant_logits < v[-1]] = -float("Inf")
        probs = F.softmax(torch.from_numpy(relevant_logits) / temp, dim=-1)
        item_next = torch.multinomial(probs, num_samples=1)
        if allow_early_stop and (item_next == SEMANTIC_VOCAB_SIZE or (min_eos_p is not None and probs[-1] >= min_eos_p)):
            # eos found, so break
            pbar.update(100 - pbar_state)
            break
        x = torch.cat((torch.from_numpy(x), item_next[None]), dim=1).numpy()
        tot_generated_duration_s += 1 / SEMANTIC_RATE_HZ
        if max_gen_duration_s is not None and tot_generated_duration_s > max_gen_duration_s:
            pbar.update(100 - pbar_state)
            break
        if n == n_tot_steps - 1:
            pbar.update(100 - pbar_state)
            break
        del logits, relevant_logits, probs, item_next
        req_pbar_state = np.min([100, int(round(100 * n / n_tot_steps))])
        if req_pbar_state > pbar_state:
            pbar.update(req_pbar_state - pbar_state)
        pbar_state = req_pbar_state
    pbar.close()
    out = x.squeeze()[256 + 256 + 1 :]
    return out
def semantic_to_waveform(
    semantic_tokens: np.ndarray,
    history_prompt: Optional[Union[Dict, str]] = None,
    temp: float = 0.7,
    silent: bool = False,
):
    """Generate audio array from semantic input.

    Args:
        semantic_tokens: semantic token output from `text_to_semantic`
        history_prompt: history choice for audio cloning
        temp: generation temperature (1.0 more diverse, 0.0 more conservative)
        silent: disable progress bar

    Returns:
        numpy audio array at sample frequency 24khz
    """
    coarse_tokens = generate_coarse(
        semantic_tokens,
        history_prompt=history_prompt,
        temp=temp,
        silent=silent,
    )
    fine_tokens = generate_fine(
        coarse_tokens,
        history_prompt=history_prompt,
        temp=0.5,
    )
    audio_arr = codec_decode(fine_tokens)
    return audio_arr
def generate_coarse(
    x_semantic: np.ndarray,
    history_prompt: Optional[Union[Dict, str]] = None,
    temp: float = 0.7,
    top_k: int = None,
    top_p: float = None,
    silent: bool = False,
    max_coarse_history: int = 630,  # min 60 (faster), max 630 (more context)
    sliding_window_len: int = 60,
):
    """
    Generate coarse audio codes from semantic tokens.
    Args:
         x_semantic: semantic token output from `text_to_semantic`
         history_prompt: history prompt, will be prepened to generated if provided
         temp: generation temperature (1.0 more diverse, 0.0 more conservative)
         top_k: top k number of probabilities for considering during generation
         top_p: top probabilities higher than p for considering during generation
         silent: disable progress bar
         max_coarse_history: threshold for cutting coarse history (minimum 60 for faster generation, maximum 630 for more context)
         sliding_window_len: size of sliding window for generation cycle
    Returns:
        numpy audio array with coarse audio codes

    """
    semantic_to_coarse_ratio = COARSE_RATE_HZ / SEMANTIC_RATE_HZ * N_COARSE_CODEBOOKS
    max_semantic_history = int(np.floor(max_coarse_history / semantic_to_coarse_ratio))
    if history_prompt is not None:
        history_prompt = _load_history_prompt(history_prompt)
        x_semantic_history = history_prompt["semantic_prompt"]
        x_coarse_history = history_prompt["coarse_prompt"]
        x_coarse_history = _flatten_codebooks(x_coarse_history) + SEMANTIC_VOCAB_SIZE
        # trim histories correctly
        n_semantic_hist_provided = np.min(
            [
                max_semantic_history,
                len(x_semantic_history) - len(x_semantic_history) % 2,
                int(np.floor(len(x_coarse_history) / semantic_to_coarse_ratio)),
            ]
        )
        n_coarse_hist_provided = int(round(n_semantic_hist_provided * semantic_to_coarse_ratio))
        x_semantic_history = x_semantic_history[-n_semantic_hist_provided:].astype(np.int32)
        x_coarse_history = x_coarse_history[-n_coarse_hist_provided:].astype(np.int32)
        x_coarse_history = x_coarse_history[:-2]
    else:
        x_semantic_history = np.array([], dtype=np.int32)
        x_coarse_history = np.array([], dtype=np.int32)
    # start loop
    n_steps = int(round(np.floor(len(x_semantic) * semantic_to_coarse_ratio / N_COARSE_CODEBOOKS) * N_COARSE_CODEBOOKS))
    x_semantic = np.hstack([x_semantic_history, x_semantic]).astype(np.int32)
    x_coarse = x_coarse_history.astype(np.int32)
    base_semantic_idx = len(x_semantic_history)
    x_semantic_in = x_semantic[None]
    x_coarse_in = x_coarse[None]
    n_window_steps = int(np.ceil(n_steps / sliding_window_len))
    n_step = 0
    for _ in tqdm.tqdm(range(n_window_steps), total=n_window_steps, disable=silent):
        semantic_idx = base_semantic_idx + int(round(n_step / semantic_to_coarse_ratio))
        # pad from right side
        x_in = x_semantic_in[:, np.max([0, semantic_idx - max_semantic_history]) :]
        x_in = x_in[:, :256]
        x_in = F.pad(
            torch.from_numpy(x_in),
            (0, 256 - x_in.shape[-1]),
            "constant",
            COARSE_SEMANTIC_PAD_TOKEN,
        )
        x_in = torch.hstack(
            [
                x_in,
                torch.tensor([COARSE_INFER_TOKEN])[None],
                torch.from_numpy(x_coarse_in[:, -max_coarse_history:]),
            ]
        ).numpy()
        kv_cache = None
        for _ in range(sliding_window_len):
            if n_step >= n_steps:
                continue
            is_major_step = n_step % N_COARSE_CODEBOOKS == 0

            if kv_cache is not None:
                x_input = x_in[:, [-1]]
            else:
                x_input = x_in

            logits, kv_cache = ov_coarse_model(x_input, past_kv=kv_cache)
            logit_start_idx = SEMANTIC_VOCAB_SIZE + (1 - int(is_major_step)) * CODEBOOK_SIZE
            logit_end_idx = SEMANTIC_VOCAB_SIZE + (2 - int(is_major_step)) * CODEBOOK_SIZE
            relevant_logits = logits[0, 0, logit_start_idx:logit_end_idx]
            if top_p is not None:
                sorted_indices = np.argsort(relevant_logits)[::-1]
                sorted_logits = relevant_logits[sorted_indices]
                cumulative_probs = np.cumsum(F.softmax(sorted_logits))
                sorted_indices_to_remove = cumulative_probs > top_p
                sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].copy()
                sorted_indices_to_remove[0] = False
                relevant_logits[sorted_indices[sorted_indices_to_remove]] = -np.inf
                relevant_logits = torch.from_numpy(relevant_logits)
            if top_k is not None:
                relevant_logits = torch.from_numpy(relevant_logits)
                v, _ = torch.topk(relevant_logits, min(top_k, relevant_logits.size(-1)))
                relevant_logits[relevant_logits < v[-1]] = -float("Inf")
            probs = F.softmax(torch.from_numpy(relevant_logits) / temp, dim=-1)
            item_next = torch.multinomial(probs, num_samples=1)
            item_next = item_next
            item_next += logit_start_idx
            x_coarse_in = torch.cat((torch.from_numpy(x_coarse_in), item_next[None]), dim=1).numpy()
            x_in = torch.cat((torch.from_numpy(x_in), item_next[None]), dim=1).numpy()
            del logits, relevant_logits, probs, item_next
            n_step += 1
        del x_in
    del x_semantic_in
    gen_coarse_arr = x_coarse_in.squeeze()[len(x_coarse_history) :]
    del x_coarse_in
    gen_coarse_audio_arr = gen_coarse_arr.reshape(-1, N_COARSE_CODEBOOKS).T - SEMANTIC_VOCAB_SIZE
    for n in range(1, N_COARSE_CODEBOOKS):
        gen_coarse_audio_arr[n, :] -= n * CODEBOOK_SIZE
    return gen_coarse_audio_arr


def generate_fine(
    x_coarse_gen: np.ndarray,
    history_prompt: Optional[Union[Dict, str]] = None,
    temp: float = 0.5,
    silent: bool = True,
):
    """
    Generate full audio codes from coarse audio codes.
    Args:
         x_coarse_gen: generated coarse codebooks from `generate_coarse`
         history_prompt: history prompt, will be prepended to generated
         temp: generation temperature (1.0 more diverse, 0.0 more conservative)
         silent: disable progress bar
    Returns:
         numpy audio array with coarse audio codes

    """
    if history_prompt is not None:
        history_prompt = _load_history_prompt(history_prompt)
        x_fine_history = history_prompt["fine_prompt"]
    else:
        x_fine_history = None
    n_coarse = x_coarse_gen.shape[0]
    # make input arr
    in_arr = np.vstack(
        [
            x_coarse_gen,
            np.zeros((N_FINE_CODEBOOKS - n_coarse, x_coarse_gen.shape[1])) + CODEBOOK_SIZE,
        ]
    ).astype(
        np.int32
    )  # padding
    # prepend history if available (max 512)
    if x_fine_history is not None:
        x_fine_history = x_fine_history.astype(np.int32)
        in_arr = np.hstack([x_fine_history[:, -512:].astype(np.int32), in_arr])
        n_history = x_fine_history[:, -512:].shape[1]
    else:
        n_history = 0
    n_remove_from_end = 0
    # need to pad if too short (since non-causal model)
    if in_arr.shape[1] < 1024:
        n_remove_from_end = 1024 - in_arr.shape[1]
        in_arr = np.hstack(
            [
                in_arr,
                np.zeros((N_FINE_CODEBOOKS, n_remove_from_end), dtype=np.int32) + CODEBOOK_SIZE,
            ]
        )
    n_loops = np.max([0, int(np.ceil((x_coarse_gen.shape[1] - (1024 - n_history)) / 512))]) + 1
    in_arr = in_arr.T
    for n in tqdm.tqdm(range(n_loops), disable=silent):
        start_idx = np.min([n * 512, in_arr.shape[0] - 1024])
        start_fill_idx = np.min([n_history + n * 512, in_arr.shape[0] - 512])
        rel_start_fill_idx = start_fill_idx - start_idx
        in_buffer = in_arr[start_idx : start_idx + 1024, :][None]
        for nn in range(n_coarse, N_FINE_CODEBOOKS):
            logits = ov_fine_model(np.array([nn]).astype(np.int64), in_buffer.astype(np.int64))
            if temp is None:
                relevant_logits = logits[0, rel_start_fill_idx:, :CODEBOOK_SIZE]
                codebook_preds = torch.argmax(relevant_logits, -1)
            else:
                relevant_logits = logits[0, :, :CODEBOOK_SIZE] / temp
                probs = F.softmax(torch.from_numpy(relevant_logits), dim=-1)
                codebook_preds = torch.hstack([torch.multinomial(probs[nnn], num_samples=1) for nnn in range(rel_start_fill_idx, 1024)])
            in_buffer[0, rel_start_fill_idx:, nn] = codebook_preds.numpy()
            del logits, codebook_preds
        for nn in range(n_coarse, N_FINE_CODEBOOKS):
            in_arr[start_fill_idx : start_fill_idx + (1024 - rel_start_fill_idx), nn] = in_buffer[0, rel_start_fill_idx:, nn]
        del in_buffer
    gen_fine_arr = in_arr.squeeze().T
    del in_arr
    gen_fine_arr = gen_fine_arr[:, n_history:]
    if n_remove_from_end > 0:
        gen_fine_arr = gen_fine_arr[:, :-n_remove_from_end]
    return gen_fine_arr

Run model inference#

Now is time to see model in action. We need only wrap our models to classes and run generate_audio function.

Select Inference device#

select device from dropdown list for running inference using OpenVINO

from notebook_utils import device_widget

device = device_widget()

device
Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')
core = ov.Core()

ov_text_model = OVBarkTextEncoder(core, device.value, text_encoder_path0, text_encoder_path1)
ov_coarse_model = OVBarkEncoder(core, device.value, coarse_encoder_path)
ov_fine_model = OVBarkFineEncoder(core, device.value, fine_model_dir)
import time
from bark import SAMPLE_RATE

torch.manual_seed(42)
t0 = time.time()
text = "Hello, my name is Suno. And, uh — and I like banana and apples. [laughs] But I also have other interests such as playing tic tac toe."
audio_array = generate_audio(text)
generation_duration_s = time.time() - t0
audio_duration_s = audio_array.shape[0] / SAMPLE_RATE

print(f"took {generation_duration_s:.0f}s to generate {audio_duration_s:.0f}s of audio")
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:10<00:00,  9.63it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 32/32 [00:37<00:00,  1.17s/it]
/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/torch/nn/utils/weight_norm.py:30: UserWarning: torch.nn.utils.weight_norm is deprecated in favor of torch.nn.utils.parametrizations.weight_norm.
  warnings.warn("torch.nn.utils.weight_norm is deprecated in favor of torch.nn.utils.parametrizations.weight_norm.")
took 53s to generate 13s of audio
from IPython.display import Audio
from bark import SAMPLE_RATE

Audio(audio_array, rate=SAMPLE_RATE)

Interactive demo#

import numpy as np

from bark import SAMPLE_RATE
from bark.generation import SUPPORTED_LANGS

AVAILABLE_PROMPTS = ["Unconditional", "Announcer"]
PROMPT_LOOKUP = {}
for _, lang in SUPPORTED_LANGS:
    for n in range(10):
        label = f"Speaker {n} ({lang})"
        AVAILABLE_PROMPTS.append(label)
        PROMPT_LOOKUP[label] = f"{lang}_speaker_{n}"
PROMPT_LOOKUP["Unconditional"] = None
PROMPT_LOOKUP["Announcer"] = "announcer"


def gen_tts(text, history_prompt):
    history_prompt = PROMPT_LOOKUP[history_prompt]
    audio_arr = generate_audio(text, history_prompt=history_prompt)
    audio_arr = (audio_arr * 32767).astype(np.int16)
    return (SAMPLE_RATE, audio_arr)
import requests

if not Path("gradio_helper.py").exists():
    r = requests.get(url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/notebooks/bark-text-to-audio/gradio_helper.py")
    open("gradio_helper.py", "w").write(r.text)

from gradio_helper import make_demo

demo = make_demo(fn=gen_tts, available_prompts=AVAILABLE_PROMPTS)

try:
    demo.launch(debug=False)
except Exception:
    demo.launch(share=True, debug=False)
# if you are launching remotely, specify server_name and server_port
# demo.launch(server_name='your server name', server_port='server port in int')
# Read more in the docs: https://gradio.app/docs/