Group Threading utilities#

group ov_dev_api_threading

Threading API providing task executors for asynchronous operations.

Typedefs

using Task = std::function<void()>#

OpenVINO Task Executor can use any copyable callable without parameters and output as a task. It would be wrapped into std::function object.

template<typename T>
using ThreadLocal = tbb::enumerable_thread_specific<T>#

A wrapper class to keep object to be thread local.

Template Parameters:

T – A type of object to keep thread local.

class ImmediateExecutor : public ov::threading::ITaskExecutor
#include <immediate_executor.hpp>

Task executor implementation that just run tasks in current thread during calling of run() method.

Public Types

using Ptr = std::shared_ptr<ImmediateExecutor>

A shared pointer to a ImmediateExecutor object.

Public Functions

~ImmediateExecutor() override = default

Destroys the object.

inline virtual void run(Task task) override

Execute ov::Task inside task executor context.

Parameters:

task – A task to start

class CPUStreamsExecutor : public ov::threading::IStreamsExecutor
#include <cpu_streams_executor.hpp>

CPU Streams executor implementation. The executor splits the CPU into groups of threads, that can be pinned to cores or NUMA nodes. It uses custom threads to pull tasks from single queue.

Public Types

using Ptr = std::shared_ptr<CPUStreamsExecutor>

A shared pointer to a CPUStreamsExecutor object.

Public Functions

explicit CPUStreamsExecutor(const Config &config)

Constructor.

Parameters:

config – Stream executor parameters

~CPUStreamsExecutor() override

A class destructor.

virtual void run(Task task) override

Execute ov::Task inside task executor context.

Parameters:

task – A task to start

virtual void execute(Task task) override

Execute the task in the current thread using streams executor configuration and constraints.

Parameters:

task – A task to start

virtual int get_stream_id() override

Return the index of current stream.

Returns:

An index of current stream. Or throw exceptions if called not from stream thread

virtual int get_numa_node_id() override

Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.

Returns:

ID of current NUMA Node, or throws exceptions if called not from stream thread

virtual int get_socket_id() override

Return the id of current socket Return 0 when current stream cross some sockets.

Returns:

ID of current socket, or throws exceptions if called not from stream thread

virtual void run_sub_stream(Task task, int id) override

Execute ov::Task inside sub stream of task executor context.

Parameters:
  • task – A task to start

  • id – Sub stream id

interface ExecutorManager#
#include <executor_manager.hpp>

Interface for tasks execution manager. This is global point for getting task executor objects by string id. It’s necessary in multiple asynchronous requests for having unique executors to avoid oversubscription. E.g. There 2 task executors for CPU device: one - in FPGA, another - in OneDNN. Parallel execution both of them leads to not optimal CPU usage. More efficient to run the corresponding tasks one by one via single executor.

Public Functions

virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0#

Returns executor by unique identificator.

Parameters:

id – An unique identificator of device (Usually string representation of TargetDevice)

Returns:

A shared pointer to existing or newly ITaskExecutor

virtual std::shared_ptr<ov::threading::IStreamsExecutor> get_idle_cpu_streams_executor(const ov::threading::IStreamsExecutor::Config &config) = 0#

Returns idle cpu streams executor.

Parameters:

config – Streams executor config

Returns:

pointer to streams executor config

virtual void set_property(const ov::AnyMap &properties) = 0#

Allows to configure executor manager.

Parameters:

properties – map with configuration

virtual ov::Any get_property(const std::string &name) const = 0#

Returns configuration.

Parameters:

name – property name

Returns:

Property value

virtual void execute_task_by_streams_executor(ov::hint::SchedulingCoreType core_type, ov::threading::Task task) = 0#

create a temporary executor to execute the specific task

Parameters:
  • core_type – cpu core type

  • task – task to be performed

interface IStreamsExecutor : public virtual ov::threading::ITaskExecutor#
#include <istreams_executor.hpp>

Interface for Streams Task Executor. This executor groups worker threads into so-called streams.

CPU

The executor executes all parallel tasks using threads from one stream. With proper pinning settings it should reduce cache misses for memory bound workloads.

NUMA

On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream

Subclassed by ov::threading::CPUStreamsExecutor

Public Types

enum ThreadBindingType#

Defines inference thread binding type.

Values:

enumerator NONE#

Don’t bind the inference threads.

enumerator CORES#

Bind inference threads to the CPU cores (round-robin)

enumerator NUMA#

Bind to the NUMA nodes (default mode for the non-hybrid CPUs on the Win/MacOS, where the ‘CORES’ is not implemeneted)

enumerator HYBRID_AWARE#

Let the runtime bind the inference threads depending on the cores type (default mode for the hybrid CPUs)

using Ptr = std::shared_ptr<IStreamsExecutor>#

A shared pointer to IStreamsExecutor interface

Public Functions

~IStreamsExecutor() override#

A virtual destructor.

virtual int get_stream_id() = 0#

Return the index of current stream.

Returns:

An index of current stream. Or throw exceptions if called not from stream thread

virtual int get_numa_node_id() = 0#

Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.

Returns:

ID of current NUMA Node, or throws exceptions if called not from stream thread

virtual int get_socket_id() = 0#

Return the id of current socket Return 0 when current stream cross some sockets.

Returns:

ID of current socket, or throws exceptions if called not from stream thread

virtual void execute(Task task) = 0#

Execute the task in the current thread using streams executor configuration and constraints.

Parameters:

task – A task to start

virtual void run_sub_stream(Task task, int id) = 0#

Execute ov::Task inside sub stream of task executor context.

Parameters:
  • task – A task to start

  • id – Sub stream id

void run_sub_stream_and_wait(const std::vector<Task> &tasks)#

Execute all of the tasks and waits for its completion. Default run_sub_stream_and_wait() method implementation uses run_sub_stream() pure virtual method and higher level synchronization primitives from STL. The task is wrapped into std::packaged_task which returns std::future. std::packaged_task will call the task and signal to std::future that the task is finished or the exception is thrown from task Than std::future is used to wait for task execution completion and task exception extraction.

Note

run_sub_stream_and_wait() does not copy or capture tasks!

Parameters:

tasks – A vector of tasks to execute

struct Config#
#include <istreams_executor.hpp>

Defines IStreamsExecutor configuration.

Public Types

enum class StreamsMode#

This enum contains definition of each sub streams mode, indicating the main stream situation.

Values:

enumerator SUB_STREAMS_NULL#

Do not create sub streams.

enumerator SUB_STREAMS_FOR_SOCKET#

Create sub streams for multiple sockets in main stream.

enumerator LATENCY#

latency mode

enumerator THROUGHPUT#

throughput mode

Public Functions

inline Config(std::string name = "StreamsExecutor", int streams = 1, int threads_per_stream = 0, ov::hint::SchedulingCoreType thread_preferred_core_type = ov::hint::SchedulingCoreType::ANY_CORE, bool cpu_reservation = false, bool cpu_pinning = false, std::vector<std::vector<int>> streams_info_table = {})#

A constructor with arguments.

Parameters:
  • name[in] The executor name

  • streams[in]

  • threads_per_stream[in]

  • thread_preferred_core_type[in]

  • cpu_reservation[in]

  • cpu_pinning[in]

  • streams_info_table[in]

void set_property(const ov::AnyMap &properties)#

Sets configuration.

Parameters:

properties – map of properties

void set_property(const std::string &key, const ov::Any &value)#

Sets configuration.

Parameters:
  • key – property name

  • value – property value

ov::Any get_property(const std::string &key) const#

Return configuration value.

Parameters:

key – configuration key

Returns:

configuration value wrapped into ov::Any

Public Static Functions

static Config make_default_multi_threaded(const Config &initial)#

Create appropriate multithreaded configuration filing unconfigured values from initial configuration using hardware properties.

Parameters:

initial – Inital configuration

Returns:

configured values

interface ITaskExecutor#
#include <itask_executor.hpp>

Interface for Task Executor. OpenVINO uses ov::ITaskExecutor interface to run all asynchronous internal tasks. Different implementations of task executors can be used for different purposes:

  • To improve cache locality of memory bound CPU tasks some executors can limit task’s affinity and maximum concurrency.

  • The executor with one worker thread can be used to serialize access to acceleration device.

  • Immediate task executor can be used to satisfy ov::ITaskExecutor interface restrictions but run tasks in current thread.

Synchronization#

It is ov::ITaskExecutor user responsibility to wait for task execution completion. The c++11 standard way to wait task completion is to use std::packaged_task or std::promise with std::future. Here is an example of how to use std::promise to wait task completion and process task’s exceptions:

    // std::promise is move only object so to satisfy copy callable constraint we use std::shared_ptr
    auto promise = std::make_shared<std::promise<void>>();
    // When the promise is created we can get std::future to wait the result
    auto future = promise->get_future();
    // Rather simple task
    ov::threading::Task task = [] {
        std::cout << "Some Output" << std::endl;
    };
    // Create an executor
    ov::threading::ITaskExecutor::Ptr taskExecutor =
        std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{});
    if (taskExecutor == nullptr) {
        // ProcessError(e);
        return;
    }
    // We capture the task and the promise. When the task is executed in the task executor context
    // we munually call std::promise::set_value() method
    taskExecutor->run([task, promise] {
        std::exception_ptr currentException;
        try {
            task();
        } catch(...) {
            // If there is some exceptions store the pointer to current exception
            currentException = std::current_exception();
        }

        if (nullptr == currentException) {
            promise->set_value();  //  <-- If there is no problems just call std::promise::set_value()
        } else {
            promise->set_exception(currentException);    //  <-- If there is an exception forward it to std::future object
        }
    });
    // To wait the task completion we call std::future::wait method
    future.wait();  //  The current thread will be blocked here and wait when std::promise::set_value()
                    //  or std::promise::set_exception() method will be called.

    // If the future store the exception it will be rethrown in std::future::get method
    try {
        future.get();
    } catch(std::exception& /*e*/) {
        // ProcessError(e);
    }

Note

Implementation should guaranty thread safety of all methods

Subclassed by ov::threading::IStreamsExecutor, ov::threading::ImmediateExecutor

Public Types

using Ptr = std::shared_ptr<ITaskExecutor>#

A shared pointer to ITaskExecutor interface

Public Functions

virtual ~ITaskExecutor() = default#

Destroys the object.

virtual void run(Task task) = 0#

Execute ov::Task inside task executor context.

Parameters:

task – A task to start

virtual void run_and_wait(const std::vector<Task> &tasks)#

Execute all of the tasks and waits for its completion. Default run_and_wait() method implementation uses run() pure virtual method and higher level synchronization primitives from STL. The task is wrapped into std::packaged_task which returns std::future. std::packaged_task will call the task and signal to std::future that the task is finished or the exception is thrown from task Than std::future is used to wait for task execution completion and task exception extraction.

Note

run_and_wait() does not copy or capture tasks!

Parameters:

tasks – A vector of tasks to execute