Instruction following using Databricks Dolly 2.0 and OpenVINO

This Jupyter notebook can be launched after a local installation only.

Github

The instruction following is one of the cornerstones of the current generation of large language models(LLMs). Reinforcement learning with human preferences (RLHF) and techniques such as InstructGPT has been the core foundation of breakthroughs such as ChatGPT and GPT-4. However, these powerful models remain hidden behind APIs and we know very little about their underlying architecture. Instruction-following models are capable of generating text in response to prompts and are often used for tasks like writing assistance, chatbots, and content generation. Many users now interact with these models regularly and even use them for work but the majority of such models remain closed-source and require massive amounts of computational resources to experiment with.

Dolly 2.0 is the first open-source, instruction-following LLM fine-tuned by Databricks on a transparent and freely available dataset that is also open-sourced to use for commercial purposes. That means Dolly 2.0 is available for commercial applications without the need to pay for API access or share data with third parties. Dolly 2.0 exhibits similar characteristics so ChatGPT despite being much smaller.

In this tutorial, we consider how to run an instruction-following text generation pipeline using Dolly 2.0 and OpenVINO. We will use a pre-trained model from the Hugging Face Transformers library. To simplify the user experience, the Hugging Face Optimum Intel library is used to convert the models to OpenVINO™ IR format.

The tutorial consists of the following steps:

About Dolly 2.0

Dolly 2.0 is an instruction-following large language model trained on the Databricks machine-learning platform that is licensed for commercial use. It is based on Pythia and is trained on ~15k instruction/response fine-tuning records generated by Databricks employees in various capability domains, including brainstorming, classification, closed QA, generation, information extraction, open QA, and summarization. Dolly 2.0 works by processing natural language instructions and generating responses that follow the given instructions. It can be used for a wide range of applications, including closed question-answering, summarization, and generation.

The model training process was inspired by InstructGPT. To train InstructGPT models, the core technique is reinforcement learning from human feedback (RLHF), This technique uses human preferences as a reward signal to fine-tune models, which is important as the safety and alignment problems required to be solved are complex and subjective, and aren’t fully captured by simple automatic metrics. More details about the InstructGPT approach can be found in OpenAI blog post The breakthrough discovered with InstructGPT is that language models don’t need larger and larger training sets. By using human-evaluated question-and-answer training, authors were able to train a better language model using one hundred times fewer parameters than the previous model. Databricks used a similar approach to create a prompt and response dataset called they call databricks-dolly-15k, a corpus of more than 15,000 records generated by thousands of Databricks employees to enable large language models to exhibit the magical interactivity of InstructGPT. More details about the model and dataset can be found in Databricks blog post and repo

Prerequisites

First, we should install the Hugging Face Optimum library accelerated by OpenVINO integration. The Hugging Face Optimum Intel API is a high-level API that enables us to convert and quantize models from the Hugging Face Transformers library to the OpenVINO™ IR format. For more details, refer to the Hugging Face Optimum Intel documentation.

%pip install -q "diffusers>=0.16.1" "transformers>=4.33.0" "openvino>=2023.2.0" "nncf>=2.6.0" onnx gradio --extra-index-url https://download.pytorch.org/whl/cpu
%pip install -q --upgrade "git+https://github.com/huggingface/optimum-intel.git"

select device from dropdown list for running inference using OpenVINO

import ipywidgets as widgets
import openvino as ov

core = ov.Core()

device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value='CPU',
    description='Device:',
    disabled=False,
)

device
Dropdown(description='Device:', options=('CPU', 'GPU', 'AUTO'), value='CPU')

Download and Convert Model

Optimum Intel can be used to load optimized models from the Hugging Face Hub and create pipelines to run an inference with OpenVINO Runtime using Hugging Face APIs. The Optimum Inference models are API compatible with Hugging Face Transformers models. This means we just need to replace AutoModelForXxx class with the corresponding OVModelForXxx class.

Below is an example of the Dolly model

-from transformers import AutoModelForCausalLM
+from optimum.intel.openvino import OVModelForCausalLM
from transformers import AutoTokenizer, pipeline

model_id = "databricks/dolly-v2-3b"
-model = AutoModelForCausalLM.from_pretrained(model_id)
+model = OVModelForCausalLM.from_pretrained(model_id, from_transformers=True)

Model class initialization starts with calling from_pretrained method. When downloading and converting Transformers model, the parameter export=True should be added. For models where size more We can save the converted model for the next usage with the save_pretrained method. Tokenizer class and pipelines API are compatible with Optimum models.

from pathlib import Path
from transformers import AutoTokenizer
from optimum.intel.openvino import OVModelForCausalLM

model_id = "databricks/dolly-v2-3b"
model_path = Path("dolly-v2-3b")

tokenizer = AutoTokenizer.from_pretrained(model_id)

current_device = device.value

ov_config = {'PERFORMANCE_HINT': 'LATENCY', 'NUM_STREAMS': '1', "CACHE_DIR": ""}

if model_path.exists():
    ov_model = OVModelForCausalLM.from_pretrained(model_path, device=current_device, ov_config=ov_config)
else:
    ov_model = OVModelForCausalLM.from_pretrained(model_id, device=current_device, export=True, ov_config=ov_config, load_in_8bit=False)
    ov_model.half()
    ov_model.save_pretrained(model_path)
INFO:nncf:NNCF initialized successfully. Supported frameworks detected: torch, tensorflow, onnx, openvino
No CUDA runtime is found, using CUDA_HOME='/usr/local/cuda'
2023-11-17 13:10:43.359093: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable TF_ENABLE_ONEDNN_OPTS=0.
2023-11-17 13:10:43.398436: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-11-17 13:10:44.026743: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT
Compiling the model to CPU ...

NNCF Weights Compression algorithm compresses weights of a model to INT8. This is an alternative to Quantization algorithm that compresses both weights and activations. Weight compression is effective in optimizing footprint and performance of large models where the size of weights is significantly larger than the size of activations, for example, in Large Language Models (LLMs) such as Dolly 2.0. Additionally, Weight Compression usually leads to almost no accuracy drop.

to_compress = widgets.Checkbox(
    value=True,
    description='INT8 Compression',
    disabled=False,
)
print("Click on checkbox for enabling / disabling weights compression")
to_compress
Click on checkbox for enabling / disabling weights compression
Checkbox(value=True, description='INT8 Compression')
import gc
from optimum.intel import OVQuantizer

compressed_model_path = Path(f'{model_path}_compressed')

def calculate_compression_rate(model_path_ov, model_path_ov_compressed):
    model_size_original = model_path_ov.with_suffix(".bin").stat().st_size / 2 ** 20
    model_size_compressed = model_path_ov_compressed.with_suffix(".bin").stat().st_size / 2 ** 20
    print(f"* Original IR model size: {model_size_original:.2f} MB")
    print(f"* Compressed IR model size: {model_size_compressed:.2f} MB")
    print(f"* Model compression rate: {model_size_original / model_size_compressed:.3f}")

if to_compress.value:
    if not compressed_model_path.exists():
        quantizer = OVQuantizer.from_pretrained(ov_model)
        quantizer.quantize(save_directory=compressed_model_path, weights_only=True)
        del quantizer
        gc.collect()

    calculate_compression_rate(model_path / 'openvino_model.xml', compressed_model_path / 'openvino_model.xml')
    ov_model = OVModelForCausalLM.from_pretrained(compressed_model_path, device=current_device, ov_config=ov_config)
* Original IR model size: 5297.21 MB
* Compressed IR model size: 2657.89 MB
* Model compression rate: 1.993
Compiling the model to CPU ...

Create an instruction-following inference pipeline

The run_generation function accepts user-provided text input, tokenizes it, and runs the generation process. Text generation is an iterative process, where each next token depends on previously generated until a maximum number of tokens or stop generation condition is not reached. To obtain intermediate generation results without waiting until when generation is finished, we will use TextIteratorStreamer, provided as part of HuggingFace Streaming API.

The diagram below illustrates how the instruction-following pipeline works

generation pipeline)

generation pipeline)

As can be seen, on the first iteration, the user provided instructions converted to token ids using a tokenizer, then prepared input provided to the model. The model generates probabilities for all tokens in logits format The way the next token will be selected over predicted probabilities is driven by the selected decoding methodology. You can find more information about the most popular decoding methods in this blog.

There are several parameters that can control text generation quality:

  • Temperature is a parameter used to control the level of creativity in AI-generated text. By adjusting the temperature, you can influence the AI model’s probability distribution, making the text more focused or diverse.
    Consider the following example: The AI model has to complete the sentence “The cat is ____.” with the following token probabilities:
    playing: 0.5
    sleeping: 0.25
    eating: 0.15
    driving: 0.05
    flying: 0.05
    • Low temperature (e.g., 0.2): The AI model becomes more focused and deterministic, choosing tokens with the highest probability, such as “playing.”

    • Medium temperature (e.g., 1.0): The AI model maintains a balance between creativity and focus, selecting tokens based on their probabilities without significant bias, such as “playing,” “sleeping,” or “eating.”

    • High temperature (e.g., 2.0): The AI model becomes more adventurous, increasing the chances of selecting less likely tokens, such as “driving” and “flying.”

  • Top-p, also known as nucleus sampling, is a parameter used to control the range of tokens considered by the AI model based on their cumulative probability. By adjusting the top-p value, you can influence the AI model’s token selection, making it more focused or diverse. Using the same example with the cat, consider the following top_p settings:

    • Low top_p (e.g., 0.5): The AI model considers only tokens with the highest cumulative probability, such as “playing.”

    • Medium top_p (e.g., 0.8): The AI model considers tokens with a higher cumulative probability, such as “playing,” “sleeping,” and “eating.”

    • High top_p (e.g., 1.0): The AI model considers all tokens, including those with lower probabilities, such as “driving” and “flying.”

  • Top-k is another popular sampling strategy. In comparison with Top-P, which chooses from the smallest possible set of words whose cumulative probability exceeds the probability P, in Top-K sampling K most likely next words are filtered and the probability mass is redistributed among only those K next words. In our example with cat, if k=3, then only “playing”, “sleeping” and “eating” will be taken into account as possible next word.

To optimize the generation process and use memory more efficiently, the use_cache=True option is enabled. Since the output side is auto-regressive, an output token hidden state remains the same once computed for every further generation step. Therefore, recomputing it every time you want to generate a new token seems wasteful. With the cache, the model saves the hidden state once it has been computed. The model only computes the one for the most recently generated output token at each time step, re-using the saved ones for hidden tokens. This reduces the generation complexity from O(n^3) to O(n^2) for a transformer model. More details about how it works can be found in this article. With this option, the model gets the previous step’s hidden states (cached attention keys and values) as input and additionally provides hidden states for the current step as output. It means for all next iterations, it is enough to provide only a new token obtained from the previous step and cached key values to get the next token prediction.

The generation cycle repeats until the end of the sequence token is reached or it also can be interrupted when maximum tokens will be generated. As already mentioned before, we can enable printing current generated tokens without waiting until when the whole generation is finished using Streaming API, it adds a new token to the output queue and then prints them when they are ready.

from threading import Thread
from time import perf_counter
from typing import List
import gradio as gr
from transformers import AutoTokenizer, TextIteratorStreamer
import numpy as np

For effective generation, model expects to have input in specific format. The code below prepare template for passing user instruction into model with providing additional context.

INSTRUCTION_KEY = "### Instruction:"
RESPONSE_KEY = "### Response:"
END_KEY = "### End"
INTRO_BLURB = (
    "Below is an instruction that describes a task. Write a response that appropriately completes the request."
)

# This is the prompt that is used for generating responses using an already trained model.  It ends with the response
# key, where the job of the model is to provide the completion that follows it (i.e. the response itself).
PROMPT_FOR_GENERATION_FORMAT = """{intro}

{instruction_key}
{instruction}

{response_key}
""".format(
    intro=INTRO_BLURB,
    instruction_key=INSTRUCTION_KEY,
    instruction="{instruction}",
    response_key=RESPONSE_KEY,
)

Model was retrained to finish generation using special token ### End the code below find its id for using it as generation stop-criteria.

def get_special_token_id(tokenizer: AutoTokenizer, key: str) -> int:
    """
    Gets the token ID for a given string that has been added to the tokenizer as a special token.

    When training, we configure the tokenizer so that the sequences like "### Instruction:" and "### End" are
    treated specially and converted to a single, new token.  This retrieves the token ID each of these keys map to.

    Args:
        tokenizer (PreTrainedTokenizer): the tokenizer
        key (str): the key to convert to a single token

    Raises:
        RuntimeError: if more than one ID was generated

    Returns:
        int: the token ID for the given key
    """
    token_ids = tokenizer.encode(key)
    if len(token_ids) > 1:
        raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}")
    return token_ids[0]

tokenizer_response_key = next((token for token in tokenizer.additional_special_tokens if token.startswith(RESPONSE_KEY)), None)

end_key_token_id = None
if tokenizer_response_key:
    try:
        end_key_token_id = get_special_token_id(tokenizer, END_KEY)
        # Ensure generation stops once it generates "### End"
    except ValueError:
        pass

As it was discussed above, run_generation function is the entry point for starting generation. It gets provided input instruction as parameter and returns model response.

def run_generation(user_text:str, top_p:float, temperature:float, top_k:int, max_new_tokens:int, perf_text:str):
    """
    Text generation function

    Parameters:
      user_text (str): User-provided instruction for a generation.
      top_p (float):  Nucleus sampling. If set to < 1, only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for a generation.
      temperature (float): The value used to module the logits distribution.
      top_k (int): The number of highest probability vocabulary tokens to keep for top-k-filtering.
      max_new_tokens (int): Maximum length of generated sequence.
      perf_text (str): Content of text field for printing performance results.
    Returns:
      model_output (str) - model-generated text
      perf_text (str) - updated perf text filed content
    """

    # Prepare input prompt according to model expected template
    prompt_text = PROMPT_FOR_GENERATION_FORMAT.format(instruction=user_text)

    # Tokenize the user text.
    model_inputs = tokenizer(prompt_text, return_tensors="pt")

    # Start generation on a separate thread, so that we don't block the UI. The text is pulled from the streamer
    # in the main thread. Adds timeout to the streamer to handle exceptions in the generation thread.
    streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
    generate_kwargs = dict(
        model_inputs,
        streamer=streamer,
        max_new_tokens=max_new_tokens,
        do_sample=True,
        top_p=top_p,
        temperature=float(temperature),
        top_k=top_k,
        eos_token_id=end_key_token_id
    )
    t = Thread(target=ov_model.generate, kwargs=generate_kwargs)
    t.start()

    # Pull the generated text from the streamer, and update the model output.
    model_output = ""
    per_token_time = []
    num_tokens = 0
    start = perf_counter()
    for new_text in streamer:
        current_time = perf_counter() - start
        model_output += new_text
        perf_text, num_tokens = estimate_latency(current_time, perf_text, new_text, per_token_time, num_tokens)
        yield model_output, perf_text
        start = perf_counter()
    return model_output, perf_text

For making interactive user interface we will use Gradio library. The code bellow provides useful functions used for communication with UI elements.

def estimate_latency(current_time:float, current_perf_text:str, new_gen_text:str, per_token_time:List[float], num_tokens:int):
    """
    Helper function for performance estimation

    Parameters:
      current_time (float): This step time in seconds.
      current_perf_text (str): Current content of performance UI field.
      new_gen_text (str): New generated text.
      per_token_time (List[float]): history of performance from previous steps.
      num_tokens (int): Total number of generated tokens.

    Returns:
      update for performance text field
      update for a total number of tokens
    """
    num_current_toks = len(tokenizer.encode(new_gen_text))
    num_tokens += num_current_toks
    per_token_time.append(num_current_toks / current_time)
    if len(per_token_time) > 10 and len(per_token_time) % 4 == 0:
        current_bucket = per_token_time[:-10]
        return f"Average generation speed: {np.mean(current_bucket):.2f} tokens/s. Total generated tokens: {num_tokens}", num_tokens
    return current_perf_text, num_tokens

def reset_textbox(instruction:str, response:str, perf:str):
    """
    Helper function for resetting content of all text fields

    Parameters:
      instruction (str): Content of user instruction field.
      response (str): Content of model response field.
      perf (str): Content of performance info filed

    Returns:
      empty string for each placeholder
    """
    return "", "", ""


def select_device(device_str:str, current_text:str = "", progress:gr.Progress = gr.Progress()):
    """
    Helper function for uploading model on the device.

    Parameters:
      device_str (str): Device name.
      current_text (str): Current content of user instruction field (used only for backup purposes, temporally replacing it on the progress bar during model loading).
      progress (gr.Progress): gradio progress tracker
    Returns:
      current_text
    """
    if device_str != ov_model._device:
        ov_model.request = None
        ov_model._device = device_str

        for i in progress.tqdm(range(1), desc=f"Model loading on {device_str}"):
            ov_model.compile()
    return current_text

Run instruction-following pipeline

Now, we are ready to explore model capabilities. This demo provides a simple interface that allows communication with a model using text instruction. Type your instruction into the User instruction field or select one from predefined examples and click on the Submit button to start generation. Additionally, you can modify advanced generation parameters:

  • Device - allows switching inference device. Please note, every time when new device is selected, model will be recompiled and this takes some time.

  • Max New Tokens - maximum size of generated text.

  • Top-p (nucleus sampling) - if set to < 1, only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for a generation.

  • Top-k - the number of highest probability vocabulary tokens to keep for top-k-filtering.

  • Temperature - the value used to module the logits distribution.

available_devices = ov.Core().available_devices + ["AUTO"]

examples = [
    "Give me recipe for pizza with pineapple",
    "Write me a tweet about new OpenVINO release",
    "Explain difference between CPU and GPU",
    "Give five ideas for great weekend with family",
    "Do Androids dream of Electric sheep?",
    "Who is Dolly?",
    "Please give me advice how to write resume?",
    "Name 3 advantages to be a cat",
    "Write instructions on how to become a good AI engineer",
    "Write a love letter to my best friend",
]

with gr.Blocks() as demo:
    gr.Markdown(
        "# Instruction following using Databricks Dolly 2.0 and OpenVINO.\n"
        "Provide insturction which describes a task below or select among predefined examples and model writes response that performs requested task."
    )

    with gr.Row():
        with gr.Column(scale=4):
            user_text = gr.Textbox(
                placeholder="Write an email about an alpaca that likes flan",
                label="User instruction"
            )
            model_output = gr.Textbox(label="Model response", interactive=False)
            performance = gr.Textbox(label="Performance", lines=1, interactive=False)
            with gr.Column(scale=1):
                button_clear = gr.Button(value="Clear")
                button_submit = gr.Button(value="Submit")
            gr.Examples(examples, user_text)
        with gr.Column(scale=1):
            device = gr.Dropdown(choices=available_devices, value=current_device, label="Device")
            max_new_tokens = gr.Slider(
                minimum=1, maximum=1000, value=256, step=1, interactive=True, label="Max New Tokens",
            )
            top_p = gr.Slider(
                minimum=0.05, maximum=1.0, value=0.92, step=0.05, interactive=True, label="Top-p (nucleus sampling)",
            )
            top_k = gr.Slider(
                minimum=0, maximum=50, value=0, step=1, interactive=True, label="Top-k",
            )
            temperature = gr.Slider(
                minimum=0.1, maximum=5.0, value=0.8, step=0.1, interactive=True, label="Temperature",
            )

    user_text.submit(run_generation, [user_text, top_p, temperature, top_k, max_new_tokens, performance], [model_output, performance])
    button_submit.click(select_device, [device, user_text], [user_text])
    button_submit.click(run_generation, [user_text, top_p, temperature, top_k, max_new_tokens, performance], [model_output, performance])
    button_clear.click(reset_textbox, [user_text, model_output, performance], [user_text, model_output, performance])
    device.change(select_device, [device, user_text], [user_text])

if __name__ == "__main__":
    try:
        demo.queue().launch(debug=False, height=800)
    except Exception:
        demo.queue().launch(debug=False, share=True, height=800)

# If you are launching remotely, specify server_name and server_port
# EXAMPLE: `demo.launch(server_name='your server name', server_port='server port in int')`
# To learn more please refer to the Gradio docs: https://gradio.app/docs/