11 #include <cpp_interfaces/impl/ie_infer_request_internal.hpp>
12 #include <cpp_interfaces/interface/ie_iinfer_async_request_internal.hpp>
44 enum InferState {Idle, Busy, Canceled, Stop};
45 using Futures = std::vector<std::shared_future<void>>;
46 using Promise = std::shared_ptr<std::promise<void>>;
47 enum Stage_e : std::uint8_t { executor, task };
50 friend struct DisableCallbackGuard;
51 struct DisableCallbackGuard {
54 std::lock_guard<std::mutex> lock{_this->_mutex};
55 std::swap(_callback, _this->_callback);
57 ~DisableCallbackGuard() {
58 std::lock_guard<std::mutex> lock{_this->_mutex};
59 _this->_callback = _callback;
66 explicit ImmediateStreamsExecutor(
const IStreamsExecutor::Ptr& streamsExecutor) : _streamsExecutor{streamsExecutor} {}
72 void InferImpl(
const F& f) {
73 _syncRequest->checkBlobs();
74 InferState state = InferState::Idle;
76 std::lock_guard<std::mutex> lock{_mutex};
79 case InferState::Busy :
81 case InferState::Canceled :
83 case InferState::Idle : {
84 _futures.erase(std::remove_if(std::begin(_futures), std::end(_futures),
85 [](
const std::shared_future<void>& future) {
87 return (std::future_status::ready ==
88 future.wait_for(std::chrono::milliseconds {0}));
95 _futures.emplace_back(_promise.get_future().share());
97 case InferState::Stop :
break;
99 _state = InferState::Busy;
101 if (state != InferState::Stop) {
105 _promise.set_exception(std::current_exception());
106 std::lock_guard<std::mutex> lock{_mutex};
107 _state = InferState::Idle;
118 std::lock_guard<std::mutex> lock {_mutex};
120 case InferState::Busy :
122 case InferState::Canceled :
132 using Ptr = std::shared_ptr<AsyncInferRequestThreadSafeDefault>;
146 _syncRequest {request},
147 _requestExecutor {taskExecutor},
148 _callbackExecutor {callbackExecutor},
149 _pipeline {{taskExecutor, [
this] {_syncRequest->InferImpl();}}},
150 _syncPipeline {{std::make_shared<ImmediateExecutor>(), [
this] {_syncRequest->InferImpl();}}} {
151 auto streamsExecutor = std::dynamic_pointer_cast<IStreamsExecutor>(taskExecutor);
152 if (streamsExecutor !=
nullptr) {
153 _syncPipeline = {{std::make_shared<ImmediateStreamsExecutor>(std::move(streamsExecutor)), [
this] {_syncRequest->InferImpl();}}};
171 if (millis_timeout < IInferRequest::WaitMode::RESULT_READY) {
173 <<
" Timeout can't be less "
174 << IInferRequest::WaitMode::RESULT_READY <<
" for InferRequest::Wait\n";
176 auto status = std::future_status::deferred;
180 std::lock_guard<std::mutex> lock {_mutex};
181 return _futures.empty() ? std::shared_future<void> {} : _futures.back();
184 if (!future.valid()) {
185 return StatusCode::INFER_NOT_STARTED;
188 switch (millis_timeout) {
189 case IInferRequest::WaitMode::RESULT_READY: {
191 status = std::future_status::ready;
193 case IInferRequest::WaitMode::STATUS_ONLY: {
194 status = future.wait_for(std::chrono::milliseconds {0});
197 status = future.wait_for(std::chrono::milliseconds {millis_timeout});
201 if (std::future_status::ready == status) {
203 return StatusCode::OK;
205 return StatusCode::RESULT_NOT_READY;
210 InferImpl([&] {StartAsync_ThreadUnsafe();});
214 DisableCallbackGuard disableCallbackGuard{
this};
215 InferImpl([&] {Infer_ThreadUnsafe();});
216 Wait(InferenceEngine::IInferRequest::WaitMode::RESULT_READY);
221 return _syncRequest->GetPerformanceCounts();
226 _syncRequest->SetBlob(name, data);
231 _syncRequest->SetBlob(name, data, info);
236 return _syncRequest->GetBlob(name);
240 return _syncRequest->GetPreProcess(name);
245 _syncRequest->SetBatch(batch);
261 _callback = callback;
270 _publicInterface = std::shared_ptr<IInferRequest>(ptr.get(), [](
IInferRequest*) {});
273 std::vector<InferenceEngine::IVariableStateInternal::Ptr>
QueryState()
override {
274 return _syncRequest->QueryState();
277 void ThrowIfCanceled()
const {
278 std::lock_guard<std::mutex> lock{_mutex};
279 if (_state == InferState::Canceled) {
285 std::lock_guard<std::mutex> lock{_mutex};
286 if (_state == InferState::Busy) {
287 _state = InferState::Canceled;
295 using Stage = std::pair<ITaskExecutor::Ptr, Task>;
309 void RunFirstStage(
const Pipeline::iterator itBeginStage,
const Pipeline::iterator itEndStage,
311 auto& firstStageExecutor = std::get<Stage_e::executor>(*itBeginStage);
312 IE_ASSERT(
nullptr != firstStageExecutor);
313 firstStageExecutor->run(MakeNextStageTask(itBeginStage, itEndStage, std::move(callbackExecutor)));
324 InferState state = InferState::Idle;
326 std::lock_guard<std::mutex> lock{_mutex};
328 if (state != InferState::Stop) {
329 _state = InferState::Stop;
330 futures = std::move(_futures);
333 if (state != InferState::Stop) {
334 for (
auto&& future : futures) {
335 if (future.valid()) {
354 RunFirstStage(_pipeline.begin(), _pipeline.end(), _callbackExecutor);
362 RunFirstStage(_syncPipeline.begin(), _syncPipeline.end(), _syncCallbackExecutor);
369 StartAsync_ThreadUnsafe();
385 Task MakeNextStageTask(
const Pipeline::iterator itStage,
const Pipeline::iterator itEndStage,
387 return std::bind([
this, itStage, itEndStage](
ITaskExecutor::Ptr& callbackExecutor)
mutable {
389 std::exception_ptr localCurrentException =
nullptr;
390 auto& thisStage = *itStage;
391 auto itNextStage = itStage + 1;
394 auto& stageTask = std::get<Stage_e::task>(thisStage);
397 if (itEndStage != itNextStage) {
398 auto& nextStage = *itNextStage;
399 auto& nextStageExecutor = std::get<Stage_e::executor>(nextStage);
401 nextStageExecutor->run(MakeNextStageTask(itNextStage, itEndStage, std::move(callbackExecutor)));
403 }
catch (InferenceEngine::details::InferenceEngineException& ie_ex) {
404 requestStatus = ie_ex.hasStatus() ? ie_ex.getStatus() : StatusCode::GENERAL_ERROR;
405 localCurrentException = std::make_exception_ptr(ie_ex);
407 requestStatus = StatusCode::GENERAL_ERROR;
408 localCurrentException = std::current_exception();
411 if ((itEndStage == itNextStage) || (
nullptr != localCurrentException)) {
412 auto lastStageTask = [
this, requestStatus, localCurrentException]()
mutable {
413 auto promise = std::move(_promise);
414 IInferRequest::CompletionCallback callback =
nullptr;
416 std::lock_guard<std::mutex> lock{_mutex};
417 _state = InferState::Idle;
418 callback = _callback;
420 if (
nullptr != callback) {
423 callback(_publicInterface, requestStatus);
425 localCurrentException = std::current_exception();
429 if (
nullptr == localCurrentException) {
432 promise.set_exception(localCurrentException);
436 if (
nullptr == callbackExecutor) {
439 callbackExecutor->run(std::move(lastStageTask));
442 }, std::move(callbackExecutor));
445 void* _userData =
nullptr;
446 IInferRequest::CompletionCallback _callback =
nullptr;
447 IInferRequest::Ptr _publicInterface;
448 std::promise<void> _promise;
449 mutable std::mutex _mutex;
451 InferState _state = InferState::Idle;
Base class with default implementation of asynchronous multi staged inference request....
Definition: ie_infer_async_request_thread_safe_default.hpp:43
void InferUsingAsync()
Implements Infer() using StartAsync() and Wait()
Definition: ie_infer_async_request_thread_safe_default.hpp:368
void SetUserData(void *data) override
Set arbitrary data for the request.
Definition: ie_infer_async_request_thread_safe_default.hpp:254
StatusCode Wait(int64_t millis_timeout) override
Waits for completion of all pipeline stages If the pipeline raises an exception it will be rethrown h...
Definition: ie_infer_async_request_thread_safe_default.hpp:170
std::map< std::string, InferenceEngineProfileInfo > GetPerformanceCounts() const override
Queries performance measures per layer to get feedback of what is the most time consuming layer....
Definition: ie_infer_async_request_thread_safe_default.hpp:219
void RunFirstStage(const Pipeline::iterator itBeginStage, const Pipeline::iterator itEndStage, const ITaskExecutor::Ptr callbackExecutor={})
Creates and run the first stage task. If destructor was not called add a new std::future to the Async...
Definition: ie_infer_async_request_thread_safe_default.hpp:309
std::pair< ITaskExecutor::Ptr, Task > Stage
Each pipeline stage is a Task that is executed by specified ITaskExecutor implementation.
Definition: ie_infer_async_request_thread_safe_default.hpp:295
void SetBlob(const std::string &name, const Blob::Ptr &data) override
Set input/output data to infer.
Definition: ie_infer_async_request_thread_safe_default.hpp:224
std::shared_ptr< AsyncInferRequestThreadSafeDefault > Ptr
A shared pointer to AsyncInferRequestThreadSafeDefault.
Definition: ie_infer_async_request_thread_safe_default.hpp:132
void Cancel() override
Cancel current inference request execution.
Definition: ie_infer_async_request_thread_safe_default.hpp:284
void Infer() override
Infers specified input(s) in synchronous mode.
Definition: ie_infer_async_request_thread_safe_default.hpp:213
void SetBatch(int batch) override
Sets new batch size when dynamic batching is enabled in executable network that created this request.
Definition: ie_infer_async_request_thread_safe_default.hpp:243
ITaskExecutor::Ptr _syncCallbackExecutor
Used to run post inference callback in synchronous pipline.
Definition: ie_infer_async_request_thread_safe_default.hpp:345
void SetCompletionCallback(IInferRequest::CompletionCallback callback) override
Set callback function which will be called on success or failure of asynchronous request.
Definition: ie_infer_async_request_thread_safe_default.hpp:259
void SetPointerToPublicInterface(InferenceEngine::IInferRequest::Ptr ptr)
Sets the pointer to public interface.
Definition: ie_infer_async_request_thread_safe_default.hpp:269
Blob::Ptr GetBlob(const std::string &name) override
Get input/output data to infer.
Definition: ie_infer_async_request_thread_safe_default.hpp:234
virtual void StartAsync_ThreadUnsafe()
Starts an asynchronous pipeline thread unsafe.
Definition: ie_infer_async_request_thread_safe_default.hpp:353
std::vector< Stage > Pipeline
Pipeline is vector of stages.
Definition: ie_infer_async_request_thread_safe_default.hpp:299
std::vector< InferenceEngine::IVariableStateInternal::Ptr > QueryState() override
Queries memory states.
Definition: ie_infer_async_request_thread_safe_default.hpp:273
void StartAsync() override
Start inference of specified input(s) in asynchronous mode.
Definition: ie_infer_async_request_thread_safe_default.hpp:209
Pipeline _syncPipeline
Synchronous pipeline variable that should be filled by inherited class.
Definition: ie_infer_async_request_thread_safe_default.hpp:347
void CheckState() const
Throws exception if inference request is busy or canceled.
Definition: ie_infer_async_request_thread_safe_default.hpp:117
void SetBlob(const std::string &name, const Blob::Ptr &data, const PreProcessInfo &info) override
Sets pre-process for input data.
Definition: ie_infer_async_request_thread_safe_default.hpp:229
~AsyncInferRequestThreadSafeDefault()
Destroys the object, stops AsyncInferRequestThreadSafeDefault::_pipeline and waits for a finish.
Definition: ie_infer_async_request_thread_safe_default.hpp:160
ITaskExecutor::Ptr _callbackExecutor
Used to run post inference callback in asynchronous pipline.
Definition: ie_infer_async_request_thread_safe_default.hpp:344
virtual void Infer_ThreadUnsafe()
Performs inference of pipeline in syncronous mode.
Definition: ie_infer_async_request_thread_safe_default.hpp:361
void GetUserData(void **data) override
Get arbitrary data for the request.
Definition: ie_infer_async_request_thread_safe_default.hpp:248
Pipeline _pipeline
Pipeline variable that should be filled by inherited class.
Definition: ie_infer_async_request_thread_safe_default.hpp:346
AsyncInferRequestThreadSafeDefault(const InferRequestInternal::Ptr &request, const ITaskExecutor::Ptr &taskExecutor, const ITaskExecutor::Ptr &callbackExecutor)
Wraps a InferRequestInternal::Ptr implementation and constructs a AsyncInferRequestThreadSafeDefault:...
Definition: ie_infer_async_request_thread_safe_default.hpp:143
void StopAndWait()
Forbids pipeline start and wait for all started pipelines.
Definition: ie_infer_async_request_thread_safe_default.hpp:321
ITaskExecutor::Ptr _requestExecutor
Used to run inference CPU tasks.
Definition: ie_infer_async_request_thread_safe_default.hpp:343
const PreProcessInfo & GetPreProcess(const std::string &name) const override
Gets pre-process for input data.
Definition: ie_infer_async_request_thread_safe_default.hpp:239
std::shared_ptr< Blob > Ptr
An internal API of asynchronous inference request to be implemented by plugin, which is used in Infer...
Definition: ie_iinfer_async_request_internal.hpp:22
void(* CompletionCallback)(InferenceEngine::IInferRequest::Ptr context, InferenceEngine::StatusCode code)
std::shared_ptr< IInferRequest > Ptr
std::shared_ptr< IStreamsExecutor > Ptr
Definition: ie_istreams_executor.hpp:36
Interface for Task Executor. Inference Engine uses InferenceEngine::ITaskExecutor interface to run al...
Definition: ie_itask_executor.hpp:46
std::shared_ptr< ITaskExecutor > Ptr
Definition: ie_itask_executor.hpp:51
std::shared_ptr< InferRequestInternal > Ptr
A shared pointer to a InferRequestInternal implementation.
Definition: ie_infer_request_internal.hpp:37
Wrappers from c++ function to c-style one.
#define THROW_IE_EXCEPTION_WITH_STATUS(__status)
Throws an exception along with the status (which is eventually converted to the typed exception)
Definition: exception2status.hpp:19
#define NOT_ALLOCATED_str
Defines the not allocated message.
Definition: exception2status.hpp:142
std::function< void()> Task
Inference Engine Task Executor can use any copyable callable without parameters and output as a task....
Definition: ie_itask_executor.hpp:25
#define THROW_IE_EXCEPTION
#define IE_ASSERT(EXPRESSION)
A header file for Inference Engine Streams-based Executor Interface.
A header file for Inference Engine Task Executor Interface.
Abstraction over platform specific implementations.
Inference Engine Plugin API namespace.
std::exception_ptr & CurrentException()
Provides the reference to static thread_local std::exception_ptr.