Group Threading utilities

group ov_dev_api_threading

Threading API providing task executors for asynchronous operations.

Typedefs

using Task = std::function<void()>

OpenVINO Task Executor can use any copyable callable without parameters and output as a task. It would be wrapped into std::function object.

template<typename T>
using ThreadLocal = tbb::enumerable_thread_specific<T>

A wrapper class to keep object to be thread local.

Template Parameters

T – A type of object to keep thread local.

class ImmediateExecutor : public ov::threading::ITaskExecutor
#include <immediate_executor.hpp>

Task executor implementation that just run tasks in current thread during calling of run() method.

Public Types

using Ptr = std::shared_ptr<ImmediateExecutor>

A shared pointer to a ImmediateExecutor object.

Public Functions

~ImmediateExecutor() override = default

Destroys the object.

inline virtual void run(Task task) override

Execute ov::Task inside task executor context.

Parameters

task – A task to start

class CPUStreamsExecutor : public ov::threading::IStreamsExecutor
#include <cpu_streams_executor.hpp>

CPU Streams executor implementation. The executor splits the CPU into groups of threads, that can be pinned to cores or NUMA nodes. It uses custom threads to pull tasks from single queue.

Public Types

using Ptr = std::shared_ptr<CPUStreamsExecutor>

A shared pointer to a CPUStreamsExecutor object.

Public Functions

explicit CPUStreamsExecutor(const Config &config)

Constructor.

Parameters

config – Stream executor parameters

~CPUStreamsExecutor() override

A class destructor.

virtual void run(Task task) override

Execute ov::Task inside task executor context.

Parameters

task – A task to start

virtual void execute(Task task) override

Execute the task in the current thread using streams executor configuration and constraints.

Parameters

task – A task to start

virtual int get_stream_id() override

Return the index of current stream.

Returns

An index of current stream. Or throw exceptions if called not from stream thread

virtual int get_numa_node_id() override

Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.

Returns

ID of current NUMA Node, or throws exceptions if called not from stream thread

virtual int get_socket_id() override

Return the id of current socket Return 0 when current stream cross some sockets.

Returns

ID of current socket, or throws exceptions if called not from stream thread

virtual void run_sub_stream(Task task, int id) override

Execute ov::Task inside sub stream of task executor context.

Parameters
  • task – A task to start

  • id – Sub stream id

interface ExecutorManager
#include <executor_manager.hpp>

Interface for tasks execution manager. This is global point for getting task executor objects by string id. It’s necessary in multiple asynchronous requests for having unique executors to avoid oversubscription. E.g. There 2 task executors for CPU device: one - in FPGA, another - in OneDNN. Parallel execution both of them leads to not optimal CPU usage. More efficient to run the corresponding tasks one by one via single executor.

Public Functions

virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0

Returns executor by unique identificator.

Parameters

id – An unique identificator of device (Usually string representation of TargetDevice)

Returns

A shared pointer to existing or newly ITaskExecutor

virtual std::shared_ptr<ov::threading::IStreamsExecutor> get_idle_cpu_streams_executor(const ov::threading::IStreamsExecutor::Config &config) = 0

Returns idle cpu streams executor.

Parameters

config – Streams executor config

Returns

pointer to streams executor config

virtual void set_property(const ov::AnyMap &properties) = 0

Allows to configure executor manager.

Parameters

properties – map with configuration

virtual ov::Any get_property(const std::string &name) const = 0

Returns configuration.

Parameters

name – property name

Returns

Property value

virtual void execute_task_by_streams_executor(ov::threading::IStreamsExecutor::Config::PreferredCoreType core_type, ov::threading::Task task) = 0

create a temporary executor to execute the specific task

Parameters
  • core_type – cpu core type

  • task – task to be performed

interface IStreamsExecutor : public virtual ov::threading::ITaskExecutor
#include <istreams_executor.hpp>

Interface for Streams Task Executor. This executor groups worker threads into so-called streams.

CPU

The executor executes all parallel tasks using threads from one stream. With proper pinning settings it should reduce cache misses for memory bound workloads.

NUMA

On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream

Subclassed by ov::threading::CPUStreamsExecutor

Public Types

enum ThreadBindingType

Defines inference thread binding type.

Values:

enumerator NONE

Don’t bind the inference threads.

enumerator CORES

Bind inference threads to the CPU cores (round-robin)

enumerator NUMA

Bind to the NUMA nodes (default mode for the non-hybrid CPUs on the Win/MacOS, where the ‘CORES’ is not implemeneted)

enumerator HYBRID_AWARE

Let the runtime bind the inference threads depending on the cores type (default mode for the hybrid CPUs)

using Ptr = std::shared_ptr<IStreamsExecutor>

A shared pointer to IStreamsExecutor interface

Public Functions

~IStreamsExecutor() override

A virtual destructor.

virtual int get_stream_id() = 0

Return the index of current stream.

Returns

An index of current stream. Or throw exceptions if called not from stream thread

virtual int get_numa_node_id() = 0

Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.

Returns

ID of current NUMA Node, or throws exceptions if called not from stream thread

virtual int get_socket_id() = 0

Return the id of current socket Return 0 when current stream cross some sockets.

Returns

ID of current socket, or throws exceptions if called not from stream thread

virtual void execute(Task task) = 0

Execute the task in the current thread using streams executor configuration and constraints.

Parameters

task – A task to start

virtual void run_sub_stream(Task task, int id) = 0

Execute ov::Task inside sub stream of task executor context.

Parameters
  • task – A task to start

  • id – Sub stream id

void run_sub_stream_and_wait(const std::vector<Task> &tasks)

Execute all of the tasks and waits for its completion. Default run_sub_stream_and_wait() method implementation uses run_sub_stream() pure virtual method and higher level synchronization primitives from STL. The task is wrapped into std::packaged_task which returns std::future. std::packaged_task will call the task and signal to std::future that the task is finished or the exception is thrown from task Than std::future is used to wait for task execution completion and task exception extraction.

Note

run_sub_stream_and_wait() does not copy or capture tasks!

Parameters

tasks – A vector of tasks to execute

struct Config
#include <istreams_executor.hpp>

Defines IStreamsExecutor configuration.

Public Types

enum class StreamsMode

This enum contains definition of each sub streams mode, indicating the main stream situation.

Values:

enumerator SUB_STREAMS_NULL

Do not create sub streams.

enumerator SUB_STREAMS_FOR_SOCKET

Create sub streams for multiple sockets in main stream.

enumerator LATENCY

latency mode

enumerator THROUGHPUT

throughput mode

Public Functions

inline Config(std::string name = "StreamsExecutor", int streams = 1, int threadsPerStream = 0, ThreadBindingType threadBindingType = ThreadBindingType::NONE, int threadBindingStep = 1, int threadBindingOffset = 0, int threads = 0, PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY, std::vector<std::vector<int>> streamsInfoTable = {}, bool cpuReservation = false)

A constructor with arguments.

Parameters
  • name[in] The executor name

  • streams[in]

  • threadsPerStream[in]

  • threadBindingType[in]

  • threadBindingStep[in]

  • threadBindingOffset[in]

  • threads[in]

  • threadPreferredCoreType[in]

  • streamsInfoTable[in]

  • cpuReservation[in]

void set_property(const ov::AnyMap &properties)

Sets configuration.

Parameters

properties – map of properties

void set_property(const std::string &key, const ov::Any &value)

Sets configuration.

Parameters
  • key – property name

  • value – property value

ov::Any get_property(const std::string &key) const

Return configuration value.

Parameters

key – configuration key

Returns

configuration value wrapped into ov::Any

Public Static Functions

static Config make_default_multi_threaded(const Config &initial)

Create appropriate multithreaded configuration filing unconfigured values from initial configuration using hardware properties.

Parameters

initial – Inital configuration

Returns

configured values

static Config reserve_cpu_threads(const Config &initial)

Get and reserve cpu ids based on configuration and hardware information streams_info_table must be present in the configuration.

Parameters

initial – Inital configuration

Returns

configured values

interface ITaskExecutor
#include <itask_executor.hpp>

Interface for Task Executor. OpenVINO uses ov::ITaskExecutor interface to run all asynchronous internal tasks. Different implementations of task executors can be used for different purposes:

  • To improve cache locality of memory bound CPU tasks some executors can limit task’s affinity and maximum concurrency.

  • The executor with one worker thread can be used to serialize access to acceleration device.

  • Immediate task executor can be used to satisfy ov::ITaskExecutor interface restrictions but run tasks in current thread.

Synchronization

It is ov::ITaskExecutor user responsibility to wait for task execution completion. The c++11 standard way to wait task completion is to use std::packaged_task or std::promise with std::future. Here is an example of how to use std::promise to wait task completion and process task’s exceptions:

    // std::promise is move only object so to satisfy copy callable constraint we use std::shared_ptr
    auto promise = std::make_shared<std::promise<void>>();
    // When the promise is created we can get std::future to wait the result
    auto future = promise->get_future();
    // Rather simple task
    ov::threading::Task task = [] {
        std::cout << "Some Output" << std::endl;
    };
    // Create an executor
    ov::threading::ITaskExecutor::Ptr taskExecutor =
        std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{});
    if (taskExecutor == nullptr) {
        // ProcessError(e);
        return;
    }
    // We capture the task and the promise. When the task is executed in the task executor context
    // we munually call std::promise::set_value() method
    taskExecutor->run([task, promise] {
        std::exception_ptr currentException;
        try {
            task();
        } catch(...) {
            // If there is some exceptions store the pointer to current exception
            currentException = std::current_exception();
        }

        if (nullptr == currentException) {
            promise->set_value();  //  <-- If there is no problems just call std::promise::set_value()
        } else {
            promise->set_exception(currentException);    //  <-- If there is an exception forward it to std::future object
        }
    });
    // To wait the task completion we call std::future::wait method
    future.wait();  //  The current thread will be blocked here and wait when std::promise::set_value()
                    //  or std::promise::set_exception() method will be called.

    // If the future store the exception it will be rethrown in std::future::get method
    try {
        future.get();
    } catch(std::exception& /*e*/) {
        // ProcessError(e);
    }

Note

Implementation should guaranty thread safety of all methods

Subclassed by ov::threading::IStreamsExecutor, ov::threading::ImmediateExecutor

Public Types

using Ptr = std::shared_ptr<ITaskExecutor>

A shared pointer to ITaskExecutor interface

Public Functions

virtual ~ITaskExecutor() = default

Destroys the object.

virtual void run(Task task) = 0

Execute ov::Task inside task executor context.

Parameters

task – A task to start

virtual void run_and_wait(const std::vector<Task> &tasks)

Execute all of the tasks and waits for its completion. Default run_and_wait() method implementation uses run() pure virtual method and higher level synchronization primitives from STL. The task is wrapped into std::packaged_task which returns std::future. std::packaged_task will call the task and signal to std::future that the task is finished or the exception is thrown from task Than std::future is used to wait for task execution completion and task exception extraction.

Note

run_and_wait() does not copy or capture tasks!

Parameters

tasks – A vector of tasks to execute