Group Threading utilities¶
- group ov_dev_api_threading
Threading API providing task executors for asynchronous operations.
Typedefs
-
using Task = std::function<void()>¶
OpenVINO Task Executor can use any copyable callable without parameters and output as a task. It would be wrapped into std::function object.
-
class ImmediateExecutor : public ov::threading::ITaskExecutor
- #include <immediate_executor.hpp>
Task executor implementation that just run tasks in current thread during calling of run() method.
Public Types
-
using Ptr = std::shared_ptr<ImmediateExecutor>
A shared pointer to a ImmediateExecutor object.
Public Functions
-
~ImmediateExecutor() override = default
Destroys the object.
-
inline virtual void run(Task task) override
Execute ov::Task inside task executor context.
- Parameters
task – A task to start
-
using Ptr = std::shared_ptr<ImmediateExecutor>
-
class CPUStreamsExecutor : public ov::threading::IStreamsExecutor
- #include <cpu_streams_executor.hpp>
CPU Streams executor implementation. The executor splits the CPU into groups of threads, that can be pinned to cores or NUMA nodes. It uses custom threads to pull tasks from single queue.
Public Types
-
using Ptr = std::shared_ptr<CPUStreamsExecutor>
A shared pointer to a CPUStreamsExecutor object.
Public Functions
-
explicit CPUStreamsExecutor(const Config &config)
Constructor.
- Parameters
config – Stream executor parameters
-
~CPUStreamsExecutor() override
A class destructor.
-
virtual void run(Task task) override
Execute ov::Task inside task executor context.
- Parameters
task – A task to start
-
virtual void execute(Task task) override
Execute the task in the current thread using streams executor configuration and constraints.
- Parameters
task – A task to start
-
virtual int get_stream_id() override
Return the index of current stream.
- Returns
An index of current stream. Or throw exceptions if called not from stream thread
-
virtual int get_numa_node_id() override
Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.
- Returns
ID
of current NUMA Node, or throws exceptions if called not from stream thread
-
virtual int get_socket_id() override
Return the id of current socket Return 0 when current stream cross some sockets.
- Returns
ID
of current socket, or throws exceptions if called not from stream thread
-
using Ptr = std::shared_ptr<CPUStreamsExecutor>
-
interface ExecutorManager
- #include <executor_manager.hpp>
Interface for tasks execution manager. This is global point for getting task executor objects by string id. It’s necessary in multiple asynchronous requests for having unique executors to avoid oversubscription. E.g. There 2 task executors for CPU device: one - in FPGA, another - in OneDNN. Parallel execution both of them leads to not optimal CPU usage. More efficient to run the corresponding tasks one by one via single executor.
Public Functions
-
virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0
Returns executor by unique identificator.
- Parameters
id – An unique identificator of device (Usually string representation of TargetDevice)
- Returns
A shared pointer to existing or newly ITaskExecutor
-
virtual std::shared_ptr<ov::threading::IStreamsExecutor> get_idle_cpu_streams_executor(const ov::threading::IStreamsExecutor::Config &config) = 0
Returns idle cpu streams executor.
- Parameters
config – Streams executor config
- Returns
pointer to streams executor config
-
virtual void set_property(const ov::AnyMap &properties) = 0
Allows to configure executor manager.
- Parameters
properties – map with configuration
-
virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0
-
interface IStreamsExecutor : public virtual ov::threading::ITaskExecutor
- #include <istreams_executor.hpp>
Interface for Streams Task Executor. This executor groups worker threads into so-called
streams
.- CPU
The executor executes all parallel tasks using threads from one stream. With proper pinning settings it should reduce cache misses for memory bound workloads.
- NUMA
On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream
Subclassed by ov::threading::CPUStreamsExecutor
Public Types
-
enum ThreadBindingType
Defines inference thread binding type.
Values:
-
enumerator NONE
Don’t bind the inference threads.
-
enumerator CORES
Bind inference threads to the CPU cores (round-robin)
-
enumerator NUMA
Bind to the NUMA nodes (default mode for the non-hybrid CPUs on the Win/MacOS, where the ‘CORES’ is not implemeneted)
-
enumerator HYBRID_AWARE
Let the runtime bind the inference threads depending on the cores type (default mode for the hybrid CPUs)
-
enumerator NONE
-
using Ptr = std::shared_ptr<IStreamsExecutor>
A shared pointer to IStreamsExecutor interface
Public Functions
-
~IStreamsExecutor() override
A virtual destructor.
-
virtual int get_stream_id() = 0
Return the index of current stream.
- Returns
An index of current stream. Or throw exceptions if called not from stream thread
-
virtual int get_numa_node_id() = 0
Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.
- Returns
ID
of current NUMA Node, or throws exceptions if called not from stream thread
-
virtual int get_socket_id() = 0
Return the id of current socket Return 0 when current stream cross some sockets.
- Returns
ID
of current socket, or throws exceptions if called not from stream thread
-
virtual void execute(Task task) = 0
Execute the task in the current thread using streams executor configuration and constraints.
- Parameters
task – A task to start
-
struct Config
- #include <istreams_executor.hpp>
Defines IStreamsExecutor configuration.
Public Functions
-
void set_property(const ov::AnyMap &properties)
Sets configuration.
- Parameters
properties – map of properties
-
void set_property(const std::string &key, const ov::Any &value)
Sets configuration.
- Parameters
key – property name
value – property value
-
ov::Any get_property(const std::string &key) const
Return configuration value.
- Parameters
key – configuration key
- Returns
configuration value wrapped into ov::Any
-
inline Config(std::string name = "StreamsExecutor", int streams = 1, int threadsPerStream = 0, ThreadBindingType threadBindingType = ThreadBindingType::NONE, int threadBindingStep = 1, int threadBindingOffset = 0, int threads = 0, PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY, std::vector<std::vector<int>> streamsInfoTable = {}, bool cpuReservation = false)
A constructor with arguments.
- Parameters
name – [in] The executor name
streams – [in] Number of streams.
threadsPerStream – [in] Number of threads per stream that executes
ov_parallel
calls.threadBindingType – [in]
threadBindingStep – [in]
threadBindingOffset – [in]
threads – [in]
threadPreferBigCores – [in]
-
void update_executor_config(int stream_nums, int threads_per_stream, PreferredCoreType core_type, bool cpu_pinning)
Modify _streams_info_table and related configuration according to user-specified parameters, bind threads to cpu cores if cpu_pinning is true.
- Parameters
stream_nums – Number of streams specified by user
threads_per_stream – Number of threads per stream specified by user
core_type – Cpu type (Big/Little/Any) specified by user
cpu_pinning – Whether to bind the threads to cpu cores
-
void set_config_zero_stream()
Set _streams_info_table and _cpu_reservation in cpu streams executor config when nstreams = 0, that is, only create one thread with TBB.
Public Members
-
std::string _name
Used by
ITT
to name executor threads.
-
int _streams = 1
Number of streams.
-
int _threadsPerStream = 0
Number of threads per stream that executes
ov_parallel
calls.
-
ThreadBindingType _threadBindingType = ThreadBindingType::NONE
Thread binding to hardware resource type. No binding by default
-
int _threadBindingStep = 1
In case of CORES binding offset type thread binded to cores with defined step
-
int _threadBindingOffset = 0
In case of CORES binding offset type thread binded to cores starting from offset
-
int _threads = 0
Number of threads distributed between streams. Reserved. Should not be used.
-
int _big_core_streams = 0
Number of streams in Performance-core(big core)
-
int _small_core_streams = 0
Number of streams in Efficient-core(small core)
-
int _threads_per_stream_big = 0
Threads per stream in big cores.
-
int _threads_per_stream_small = 0
Threads per stream in small cores.
-
int _small_core_offset = 0
Calculate small core start offset when binding cpu cores.
-
bool _enable_hyper_thread = true
enable hyper thread
-
enum ov::threading::IStreamsExecutor::Config::PreferredCoreType _threadPreferredCoreType = PreferredCoreType::ANY
In case of HYBRID_AWARE hints the TBB to affinitize.
Public Static Functions
-
static Config make_default_multi_threaded(const Config &initial, const bool fp_intesive = true)
Create appropriate multithreaded configuration filing unconfigured values from initial configuration using hardware properties.
- Parameters
initial – Inital configuration
fp_intesive – additional hint for the the (Hybrid) core-types selection logic whether the executor should be configured for floating point intensive work (as opposite to int8 intensive)
- Returns
configured values
-
void set_property(const ov::AnyMap &properties)
-
interface ITaskExecutor
- #include <itask_executor.hpp>
Interface for Task Executor. OpenVINO uses
ov::ITaskExecutor
interface to run all asynchronous internal tasks. Different implementations of task executors can be used for different purposes:To improve cache locality of memory bound CPU tasks some executors can limit task’s affinity and maximum concurrency.
The executor with one worker thread can be used to serialize access to acceleration device.
Immediate task executor can be used to satisfy
ov::ITaskExecutor
interface restrictions but run tasks in current thread.
Synchronization¶
It is
ov::ITaskExecutor
user responsibility to wait for task execution completion. Thec++11
standard way to wait task completion is to usestd::packaged_task
orstd::promise
withstd::future
. Here is an example of how to usestd::promise
to wait task completion and process task’s exceptions:// std::promise is move only object so to satisfy copy callable constraint we use std::shared_ptr auto promise = std::make_shared<std::promise<void>>(); // When the promise is created we can get std::future to wait the result auto future = promise->get_future(); // Rather simple task ov::threading::Task task = [] { std::cout << "Some Output" << std::endl; }; // Create an executor ov::threading::ITaskExecutor::Ptr taskExecutor = std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{}); if (taskExecutor == nullptr) { // ProcessError(e); return; } // We capture the task and the promise. When the task is executed in the task executor context // we munually call std::promise::set_value() method taskExecutor->run([task, promise] { std::exception_ptr currentException; try { task(); } catch(...) { // If there is some exceptions store the pointer to current exception currentException = std::current_exception(); } if (nullptr == currentException) { promise->set_value(); // <-- If there is no problems just call std::promise::set_value() } else { promise->set_exception(currentException); // <-- If there is an exception forward it to std::future object } }); // To wait the task completion we call std::future::wait method future.wait(); // The current thread will be blocked here and wait when std::promise::set_value() // or std::promise::set_exception() method will be called. // If the future store the exception it will be rethrown in std::future::get method try { future.get(); } catch(std::exception& /*e*/) { // ProcessError(e); }
Note
Implementation should guaranty thread safety of all methods
Subclassed by ov::threading::IStreamsExecutor, ov::threading::ImmediateExecutor
Public Types
-
using Ptr = std::shared_ptr<ITaskExecutor>
A shared pointer to ITaskExecutor interface
Public Functions
-
virtual ~ITaskExecutor() = default
Destroys the object.
-
virtual void run(Task task) = 0
Execute ov::Task inside task executor context.
- Parameters
task – A task to start
-
virtual void run_and_wait(const std::vector<Task> &tasks)
Execute all of the tasks and waits for its completion. Default run_and_wait() method implementation uses run() pure virtual method and higher level synchronization primitives from STL. The task is wrapped into std::packaged_task which returns std::future. std::packaged_task will call the task and signal to std::future that the task is finished or the exception is thrown from task Than std::future is used to wait for task execution completion and task exception extraction.
Note
run_and_wait() does not copy or capture tasks!
- Parameters
tasks – A vector of tasks to execute
-
using Task = std::function<void()>¶