11 #include <cpp_interfaces/interface/ie_iinfer_request_internal.hpp>
41 enum InferState {Idle, Busy, Canceled, Stop};
42 using Futures = std::vector<std::shared_future<void>>;
43 using Promise = std::shared_ptr<std::promise<void>>;
44 enum Stage_e : std::uint8_t { executor, task };
47 friend struct DisableCallbackGuard;
48 struct DisableCallbackGuard {
51 std::lock_guard<std::mutex> lock{_this->_mutex};
54 ~DisableCallbackGuard() {
55 std::lock_guard<std::mutex> lock{_this->_mutex};
63 explicit ImmediateStreamsExecutor(
const IStreamsExecutor::Ptr& streamsExecutor) : _streamsExecutor{streamsExecutor} {}
70 _syncRequest->checkBlobs();
71 InferState state = InferState::Idle;
73 std::lock_guard<std::mutex> lock{_mutex};
76 case InferState::Busy :
78 case InferState::Canceled :
80 case InferState::Idle : {
81 _futures.erase(std::remove_if(std::begin(_futures), std::end(_futures),
82 [](
const std::shared_future<void>& future) {
84 return (std::future_status::ready ==
85 future.wait_for(std::chrono::milliseconds {0}));
92 _futures.emplace_back(_promise.get_future().share());
94 case InferState::Stop :
break;
96 _state = InferState::Busy;
98 if (state != InferState::Stop) {
102 _promise.set_exception(std::current_exception());
103 std::lock_guard<std::mutex> lock{_mutex};
104 _state = InferState::Idle;
115 std::lock_guard<std::mutex> lock {_mutex};
117 case InferState::Busy :
119 case InferState::Canceled :
129 using Ptr = std::shared_ptr<AsyncInferRequestThreadSafeDefault>;
143 _syncRequest {request},
144 _requestExecutor {taskExecutor},
145 _callbackExecutor {callbackExecutor},
146 _pipeline {{taskExecutor, [
this] {_syncRequest->InferImpl();}}},
147 _syncPipeline {{std::make_shared<ImmediateExecutor>(), [
this] {_syncRequest->InferImpl();}}} {
148 auto streamsExecutor = std::dynamic_pointer_cast<IStreamsExecutor>(taskExecutor);
149 if (streamsExecutor !=
nullptr) {
150 _syncPipeline = {{std::make_shared<ImmediateStreamsExecutor>(std::move(streamsExecutor)), [
this] {_syncRequest->InferImpl();}}};
168 if (millis_timeout < InferRequest::WaitMode::RESULT_READY) {
170 <<
" Timeout can't be less "
171 << InferRequest::WaitMode::RESULT_READY <<
" for InferRequest::Wait\n";
173 auto status = std::future_status::deferred;
177 std::lock_guard<std::mutex> lock {_mutex};
178 return _futures.empty() ? std::shared_future<void> {} : _futures.back();
181 if (!future.valid()) {
182 return StatusCode::INFER_NOT_STARTED;
185 switch (millis_timeout) {
186 case InferRequest::WaitMode::RESULT_READY: {
188 status = std::future_status::ready;
190 case InferRequest::WaitMode::STATUS_ONLY: {
191 status = future.wait_for(std::chrono::milliseconds {0});
194 status = future.wait_for(std::chrono::milliseconds {millis_timeout});
198 if (std::future_status::ready == status) {
200 return StatusCode::OK;
202 return StatusCode::RESULT_NOT_READY;
207 InferImpl([&] {StartAsync_ThreadUnsafe();});
211 DisableCallbackGuard disableCallbackGuard{
this};
212 InferImpl([&] {Infer_ThreadUnsafe();});
213 Wait(InferRequest::WaitMode::RESULT_READY);
218 return _syncRequest->GetPerformanceCounts();
223 _syncRequest->SetBlob(name, data);
228 _syncRequest->SetBlob(name, data, info);
233 return _syncRequest->GetBlob(name);
237 return _syncRequest->GetPreProcess(name);
242 _syncRequest->SetBatch(batch);
247 _callback = std::move(callback);
250 std::vector<std::shared_ptr<InferenceEngine::IVariableStateInternal>>
QueryState()
override {
252 return _syncRequest->QueryState();
255 void ThrowIfCanceled()
const {
256 std::lock_guard<std::mutex> lock{_mutex};
257 if (_state == InferState::Canceled) {
263 std::lock_guard<std::mutex> lock{_mutex};
264 if (_state == InferState::Busy) {
265 _state = InferState::Canceled;
273 using Stage = std::pair<ITaskExecutor::Ptr, Task>;
287 void RunFirstStage(
const Pipeline::iterator itBeginStage,
const Pipeline::iterator itEndStage,
289 auto& firstStageExecutor = std::get<Stage_e::executor>(*itBeginStage);
290 IE_ASSERT(
nullptr != firstStageExecutor);
291 firstStageExecutor->run(MakeNextStageTask(itBeginStage, itEndStage, std::move(callbackExecutor)));
301 InferState state = InferState::Idle;
303 std::lock_guard<std::mutex> lock{_mutex};
305 if (state != InferState::Stop) {
307 _state = InferState::Stop;
308 futures = std::move(_futures);
311 if (state != InferState::Stop) {
312 for (
auto&& future : futures) {
313 if (future.valid()) {
332 RunFirstStage(_pipeline.begin(), _pipeline.end(), _callbackExecutor);
340 RunFirstStage(_syncPipeline.begin(), _syncPipeline.end(), _syncCallbackExecutor);
347 StartAsync_ThreadUnsafe();
363 Task MakeNextStageTask(
const Pipeline::iterator itStage,
const Pipeline::iterator itEndStage,
365 return std::bind([
this, itStage, itEndStage](
ITaskExecutor::Ptr& callbackExecutor)
mutable {
366 std::exception_ptr currentException =
nullptr;
367 auto& thisStage = *itStage;
368 auto itNextStage = itStage + 1;
370 auto& stageTask = std::get<Stage_e::task>(thisStage);
373 if (itEndStage != itNextStage) {
374 auto& nextStage = *itNextStage;
375 auto& nextStageExecutor = std::get<Stage_e::executor>(nextStage);
377 nextStageExecutor->run(MakeNextStageTask(itNextStage, itEndStage, std::move(callbackExecutor)));
380 currentException = std::current_exception();
383 if ((itEndStage == itNextStage) || (
nullptr != currentException)) {
384 auto lastStageTask = [
this, currentException]()
mutable {
385 auto promise = std::move(_promise);
388 std::lock_guard<std::mutex> lock{_mutex};
389 _state = InferState::Idle;
390 callback = _callback;
394 auto local_callback = std::move(callback);
395 local_callback(currentException);
397 currentException = std::current_exception();
400 if (
nullptr == currentException) {
403 promise.set_exception(currentException);
407 if (
nullptr == callbackExecutor) {
410 callbackExecutor->run(std::move(lastStageTask));
413 }, std::move(callbackExecutor));
416 std::promise<void> _promise;
417 mutable std::mutex _mutex;
419 InferState _state = InferState::Idle;
Base class with default implementation of asynchronous multi staged inference request....
Definition: ie_infer_async_request_thread_safe_default.hpp:40
void InferUsingAsync()
Implements Infer() using StartAsync() and Wait()
Definition: ie_infer_async_request_thread_safe_default.hpp:346
StatusCode Wait(int64_t millis_timeout) override
Waits for completion of all pipeline stages If the pipeline raises an exception it will be rethrown h...
Definition: ie_infer_async_request_thread_safe_default.hpp:167
std::map< std::string, InferenceEngineProfileInfo > GetPerformanceCounts() const override
Queries performance measures per layer to get feedback of what is the most time consuming layer....
Definition: ie_infer_async_request_thread_safe_default.hpp:216
void RunFirstStage(const Pipeline::iterator itBeginStage, const Pipeline::iterator itEndStage, const ITaskExecutor::Ptr callbackExecutor={})
Creates and run the first stage task. If destructor was not called add a new std::future to the Async...
Definition: ie_infer_async_request_thread_safe_default.hpp:287
std::pair< ITaskExecutor::Ptr, Task > Stage
Each pipeline stage is a Task that is executed by specified ITaskExecutor implementation.
Definition: ie_infer_async_request_thread_safe_default.hpp:273
void SetBlob(const std::string &name, const Blob::Ptr &data) override
Set input/output data to infer.
Definition: ie_infer_async_request_thread_safe_default.hpp:221
std::shared_ptr< AsyncInferRequestThreadSafeDefault > Ptr
A shared pointer to AsyncInferRequestThreadSafeDefault.
Definition: ie_infer_async_request_thread_safe_default.hpp:129
void Cancel() override
Cancel current inference request execution.
Definition: ie_infer_async_request_thread_safe_default.hpp:262
void Infer() override
Infers specified input(s) in synchronous mode.
Definition: ie_infer_async_request_thread_safe_default.hpp:210
void SetBatch(int batch) override
Sets new batch size when dynamic batching is enabled in executable network that created this request.
Definition: ie_infer_async_request_thread_safe_default.hpp:240
ITaskExecutor::Ptr _syncCallbackExecutor
Used to run post inference callback in synchronous pipline.
Definition: ie_infer_async_request_thread_safe_default.hpp:323
void SetCallback(Callback callback) override
Set callback function which will be called on success or failure of asynchronous request.
Definition: ie_infer_async_request_thread_safe_default.hpp:245
AsyncInferRequestThreadSafeDefault(const IInferRequestInternal::Ptr &request, const ITaskExecutor::Ptr &taskExecutor, const ITaskExecutor::Ptr &callbackExecutor)
Wraps a IInferRequestInternal::Ptr implementation and constructs a AsyncInferRequestThreadSafeDefault...
Definition: ie_infer_async_request_thread_safe_default.hpp:140
Blob::Ptr GetBlob(const std::string &name) override
Get input/output data to infer.
Definition: ie_infer_async_request_thread_safe_default.hpp:231
virtual void StartAsync_ThreadUnsafe()
Starts an asynchronous pipeline thread unsafe.
Definition: ie_infer_async_request_thread_safe_default.hpp:331
std::vector< Stage > Pipeline
Pipeline is vector of stages.
Definition: ie_infer_async_request_thread_safe_default.hpp:277
void StartAsync() override
Start inference of specified input(s) in asynchronous mode.
Definition: ie_infer_async_request_thread_safe_default.hpp:206
Pipeline _syncPipeline
Synchronous pipeline variable that should be filled by inherited class.
Definition: ie_infer_async_request_thread_safe_default.hpp:325
void CheckState() const
Throws exception if inference request is busy or canceled.
Definition: ie_infer_async_request_thread_safe_default.hpp:114
void SetBlob(const std::string &name, const Blob::Ptr &data, const PreProcessInfo &info) override
Sets pre-process for input data.
Definition: ie_infer_async_request_thread_safe_default.hpp:226
~AsyncInferRequestThreadSafeDefault()
Destroys the object, stops AsyncInferRequestThreadSafeDefault::_pipeline and waits for a finish.
Definition: ie_infer_async_request_thread_safe_default.hpp:157
ITaskExecutor::Ptr _callbackExecutor
Used to run post inference callback in asynchronous pipline.
Definition: ie_infer_async_request_thread_safe_default.hpp:322
virtual void Infer_ThreadUnsafe()
Performs inference of pipeline in syncronous mode.
Definition: ie_infer_async_request_thread_safe_default.hpp:339
std::vector< std::shared_ptr< InferenceEngine::IVariableStateInternal > > QueryState() override
Queries memory states.
Definition: ie_infer_async_request_thread_safe_default.hpp:250
Pipeline _pipeline
Pipeline variable that should be filled by inherited class.
Definition: ie_infer_async_request_thread_safe_default.hpp:324
void StopAndWait()
Forbids pipeline start and wait for all started pipelines.
Definition: ie_infer_async_request_thread_safe_default.hpp:299
ITaskExecutor::Ptr _requestExecutor
Used to run inference CPU tasks.
Definition: ie_infer_async_request_thread_safe_default.hpp:321
const PreProcessInfo & GetPreProcess(const std::string &name) const override
Gets pre-process for input data.
Definition: ie_infer_async_request_thread_safe_default.hpp:236
std::shared_ptr< Blob > Ptr
An internal API of synchronous inference request to be implemented by plugin, which is used in InferR...
Definition: ie_iinfer_request_internal.hpp:28
virtual void InferImpl()
The minimal infer function to be implemented by plugins. It infers specified input(s) in synchronous ...
Callback _callback
A callback.
Definition: ie_iinfer_request_internal.hpp:239
std::shared_ptr< IInferRequestInternal > Ptr
A shared pointer to a IInferRequestInternal interface.
Definition: ie_iinfer_request_internal.hpp:33
std::function< void(std::exception_ptr)> Callback
Alias for callback type.
Definition: ie_iinfer_request_internal.hpp:147
std::shared_ptr< IStreamsExecutor > Ptr
Definition: ie_istreams_executor.hpp:36
Interface for Task Executor. Inference Engine uses InferenceEngine::ITaskExecutor interface to run al...
Definition: ie_itask_executor.hpp:46
std::shared_ptr< ITaskExecutor > Ptr
Definition: ie_itask_executor.hpp:51
std::function< void()> Task
Inference Engine Task Executor can use any copyable callable without parameters and output as a task....
Definition: ie_itask_executor.hpp:25
#define IE_ASSERT(EXPRESSION)
A header file for Inference Engine Streams-based Executor Interface.
A header file for Inference Engine Task Executor Interface.
Inference Engine Plugin API namespace.