Group Threading utilities

group ov_dev_api_threading

Threading API providing task executors for asynchronous operations.

Typedefs

using Task = std::function<void()>

OpenVINO Task Executor can use any copyable callable without parameters and output as a task. It would be wrapped into std::function object.

template<typename T>
using ThreadLocal = tbb::enumerable_thread_specific<T>

A wrapper class to keep object to be thread local.

Template Parameters

T – A type of object to keep thread local.

class ImmediateExecutor : public ov::threading::ITaskExecutor
#include <immediate_executor.hpp>

Task executor implementation that just run tasks in current thread during calling of run() method.

Public Types

using Ptr = std::shared_ptr<ImmediateExecutor>

A shared pointer to a ImmediateExecutor object.

Public Functions

~ImmediateExecutor() override = default

Destroys the object.

inline virtual void run(Task task) override

Execute ov::Task inside task executor context.

Parameters

task – A task to start

class CPUStreamsExecutor : public ov::threading::IStreamsExecutor
#include <cpu_streams_executor.hpp>

CPU Streams executor implementation. The executor splits the CPU into groups of threads, that can be pinned to cores or NUMA nodes. It uses custom threads to pull tasks from single queue.

Public Types

using Ptr = std::shared_ptr<CPUStreamsExecutor>

A shared pointer to a CPUStreamsExecutor object.

Public Functions

explicit CPUStreamsExecutor(const Config &config)

Constructor.

Parameters

config – Stream executor parameters

~CPUStreamsExecutor() override

A class destructor.

virtual void run(Task task) override

Execute ov::Task inside task executor context.

Parameters

task – A task to start

virtual void execute(Task task) override

Execute the task in the current thread using streams executor configuration and constraints.

Parameters

task – A task to start

virtual int get_stream_id() override

Return the index of current stream.

Returns

An index of current stream. Or throw exceptions if called not from stream thread

virtual int get_numa_node_id() override

Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.

Returns

ID of current NUMA Node, or throws exceptions if called not from stream thread

virtual int get_socket_id() override

Return the id of current socket Return 0 when current stream cross some sockets.

Returns

ID of current socket, or throws exceptions if called not from stream thread

interface ExecutorManager
#include <executor_manager.hpp>

Interface for tasks execution manager. This is global point for getting task executor objects by string id. It’s necessary in multiple asynchronous requests for having unique executors to avoid oversubscription. E.g. There 2 task executors for CPU device: one - in FPGA, another - in OneDNN. Parallel execution both of them leads to not optimal CPU usage. More efficient to run the corresponding tasks one by one via single executor.

Public Functions

virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0

Returns executor by unique identificator.

Parameters

id – An unique identificator of device (Usually string representation of TargetDevice)

Returns

A shared pointer to existing or newly ITaskExecutor

virtual std::shared_ptr<ov::threading::IStreamsExecutor> get_idle_cpu_streams_executor(const ov::threading::IStreamsExecutor::Config &config) = 0

Returns idle cpu streams executor.

Parameters

config – Streams executor config

Returns

pointer to streams executor config

virtual void set_property(const ov::AnyMap &properties) = 0

Allows to configure executor manager.

Parameters

properties – map with configuration

virtual ov::Any get_property(const std::string &name) const = 0

Returns configuration.

Parameters

name – property name

Returns

Property value

virtual void execute_task_by_streams_executor(ov::threading::IStreamsExecutor::Config::PreferredCoreType core_type, ov::threading::Task task) = 0

create a temporary executor to execute the specific task

Parameters
  • core_type – cpu core type

  • task – task to be performed

interface IStreamsExecutor : public virtual ov::threading::ITaskExecutor
#include <istreams_executor.hpp>

Interface for Streams Task Executor. This executor groups worker threads into so-called streams.

CPU

The executor executes all parallel tasks using threads from one stream. With proper pinning settings it should reduce cache misses for memory bound workloads.

NUMA

On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream

Subclassed by ov::threading::CPUStreamsExecutor

Public Types

enum ThreadBindingType

Defines inference thread binding type.

Values:

enumerator NONE

Don’t bind the inference threads.

enumerator CORES

Bind inference threads to the CPU cores (round-robin)

enumerator NUMA

Bind to the NUMA nodes (default mode for the non-hybrid CPUs on the Win/MacOS, where the ‘CORES’ is not implemeneted)

enumerator HYBRID_AWARE

Let the runtime bind the inference threads depending on the cores type (default mode for the hybrid CPUs)

using Ptr = std::shared_ptr<IStreamsExecutor>

A shared pointer to IStreamsExecutor interface

Public Functions

~IStreamsExecutor() override

A virtual destructor.

virtual int get_stream_id() = 0

Return the index of current stream.

Returns

An index of current stream. Or throw exceptions if called not from stream thread

virtual int get_numa_node_id() = 0

Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.

Returns

ID of current NUMA Node, or throws exceptions if called not from stream thread

virtual int get_socket_id() = 0

Return the id of current socket Return 0 when current stream cross some sockets.

Returns

ID of current socket, or throws exceptions if called not from stream thread

virtual void execute(Task task) = 0

Execute the task in the current thread using streams executor configuration and constraints.

Parameters

task – A task to start

struct Config
#include <istreams_executor.hpp>

Defines IStreamsExecutor configuration.

Public Functions

void set_property(const ov::AnyMap &properties)

Sets configuration.

Parameters

properties – map of properties

void set_property(const std::string &key, const ov::Any &value)

Sets configuration.

Parameters
  • key – property name

  • value – property value

ov::Any get_property(const std::string &key) const

Return configuration value.

Parameters

key – configuration key

Returns

configuration value wrapped into ov::Any

inline Config(std::string name = "StreamsExecutor", int streams = 1, int threadsPerStream = 0, ThreadBindingType threadBindingType = ThreadBindingType::NONE, int threadBindingStep = 1, int threadBindingOffset = 0, int threads = 0, PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY, std::vector<std::vector<int>> streamsInfoTable = {}, bool cpuReservation = false)

A constructor with arguments.

Parameters
  • name[in] The executor name

  • streams[in] Number of streams.

  • threadsPerStream[in] Number of threads per stream that executes ov_parallel calls.

  • threadBindingType[in]

  • threadBindingStep[in]

  • threadBindingOffset[in]

  • threads[in]

  • threadPreferBigCores[in]

void update_executor_config(int stream_nums, int threads_per_stream, PreferredCoreType core_type, bool cpu_pinning)

Modify _streams_info_table and related configuration according to user-specified parameters, bind threads to cpu cores if cpu_pinning is true.

Parameters
  • stream_nums – Number of streams specified by user

  • threads_per_stream – Number of threads per stream specified by user

  • core_type – Cpu type (Big/Little/Any) specified by user

  • cpu_pinning – Whether to bind the threads to cpu cores

void set_config_zero_stream()

Set _streams_info_table and _cpu_reservation in cpu streams executor config when nstreams = 0, that is, only create one thread with TBB.

Public Members

std::string _name

Used by ITT to name executor threads.

int _streams = 1

Number of streams.

int _threadsPerStream = 0

Number of threads per stream that executes ov_parallel calls.

ThreadBindingType _threadBindingType = ThreadBindingType::NONE

Thread binding to hardware resource type. No binding by default

int _threadBindingStep = 1

In case of CORES binding offset type thread binded to cores with defined step

int _threadBindingOffset = 0

In case of CORES binding offset type thread binded to cores starting from offset

int _threads = 0

Number of threads distributed between streams. Reserved. Should not be used.

int _big_core_streams = 0

Number of streams in Performance-core(big core)

int _small_core_streams = 0

Number of streams in Efficient-core(small core)

int _threads_per_stream_big = 0

Threads per stream in big cores.

int _threads_per_stream_small = 0

Threads per stream in small cores.

int _small_core_offset = 0

Calculate small core start offset when binding cpu cores.

bool _enable_hyper_thread = true

enable hyper thread

enum ov::threading::IStreamsExecutor::Config::PreferredCoreType _threadPreferredCoreType = PreferredCoreType::ANY

In case of HYBRID_AWARE hints the TBB to affinitize.

Public Static Functions

static Config make_default_multi_threaded(const Config &initial, const bool fp_intesive = true)

Create appropriate multithreaded configuration filing unconfigured values from initial configuration using hardware properties.

Parameters
  • initial – Inital configuration

  • fp_intesive – additional hint for the the (Hybrid) core-types selection logic whether the executor should be configured for floating point intensive work (as opposite to int8 intensive)

Returns

configured values

interface ITaskExecutor
#include <itask_executor.hpp>

Interface for Task Executor. OpenVINO uses ov::ITaskExecutor interface to run all asynchronous internal tasks. Different implementations of task executors can be used for different purposes:

  • To improve cache locality of memory bound CPU tasks some executors can limit task’s affinity and maximum concurrency.

  • The executor with one worker thread can be used to serialize access to acceleration device.

  • Immediate task executor can be used to satisfy ov::ITaskExecutor interface restrictions but run tasks in current thread.

Synchronization

It is ov::ITaskExecutor user responsibility to wait for task execution completion. The c++11 standard way to wait task completion is to use std::packaged_task or std::promise with std::future. Here is an example of how to use std::promise to wait task completion and process task’s exceptions:

    // std::promise is move only object so to satisfy copy callable constraint we use std::shared_ptr
    auto promise = std::make_shared<std::promise<void>>();
    // When the promise is created we can get std::future to wait the result
    auto future = promise->get_future();
    // Rather simple task
    ov::threading::Task task = [] {
        std::cout << "Some Output" << std::endl;
    };
    // Create an executor
    ov::threading::ITaskExecutor::Ptr taskExecutor =
        std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{});
    if (taskExecutor == nullptr) {
        // ProcessError(e);
        return;
    }
    // We capture the task and the promise. When the task is executed in the task executor context
    // we munually call std::promise::set_value() method
    taskExecutor->run([task, promise] {
        std::exception_ptr currentException;
        try {
            task();
        } catch(...) {
            // If there is some exceptions store the pointer to current exception
            currentException = std::current_exception();
        }

        if (nullptr == currentException) {
            promise->set_value();  //  <-- If there is no problems just call std::promise::set_value()
        } else {
            promise->set_exception(currentException);    //  <-- If there is an exception forward it to std::future object
        }
    });
    // To wait the task completion we call std::future::wait method
    future.wait();  //  The current thread will be blocked here and wait when std::promise::set_value()
                    //  or std::promise::set_exception() method will be called.

    // If the future store the exception it will be rethrown in std::future::get method
    try {
        future.get();
    } catch(std::exception& /*e*/) {
        // ProcessError(e);
    }

Note

Implementation should guaranty thread safety of all methods

Subclassed by ov::threading::IStreamsExecutor, ov::threading::ImmediateExecutor

Public Types

using Ptr = std::shared_ptr<ITaskExecutor>

A shared pointer to ITaskExecutor interface

Public Functions

virtual ~ITaskExecutor() = default

Destroys the object.

virtual void run(Task task) = 0

Execute ov::Task inside task executor context.

Parameters

task – A task to start

virtual void run_and_wait(const std::vector<Task> &tasks)

Execute all of the tasks and waits for its completion. Default run_and_wait() method implementation uses run() pure virtual method and higher level synchronization primitives from STL. The task is wrapped into std::packaged_task which returns std::future. std::packaged_task will call the task and signal to std::future that the task is finished or the exception is thrown from task Than std::future is used to wait for task execution completion and task exception extraction.

Note

run_and_wait() does not copy or capture tasks!

Parameters

tasks – A vector of tasks to execute