ie_parallel.hpp
Go to the documentation of this file.
1 // Copyright (C) 2018-2020 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 /**
6  * @brief Contains declarations and definitions for sequential and multi-threading implementations.
7  *
8  * Multi-threading support is implemented in two variants: using the Threading Building Blocks library and OpenMP*
9  * product. To build a particular implementation, use the corresponding identifier: IE_THREAD_TBB, IE_THREAD_TBB_AUTO,
10  * IE_THREAD_OMP or IE_THREAD_SEQ.
11  *
12  * @file ie_parallel.hpp
13  */
14 
15 #pragma once
16 
17 #include <cstddef>
18 
19 #define IE_THREAD_TBB 0
20 #define IE_THREAD_OMP 1
21 #define IE_THREAD_SEQ 2
22 #define IE_THREAD_TBB_AUTO 3
23 
24 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
25 #define TBB_PREVIEW_LOCAL_OBSERVER 1
26 #ifndef TBB_PREVIEW_NUMA_SUPPORT
27 #define TBB_PREVIEW_NUMA_SUPPORT 1
28 #endif
29 #include "tbb/blocked_range.h"
30 #include "tbb/blocked_range2d.h"
31 #include "tbb/blocked_range3d.h"
32 #include "tbb/parallel_for.h"
33 #include "tbb/parallel_reduce.h"
34 #include "tbb/parallel_sort.h"
35 #include "tbb/task_arena.h"
36 #include "tbb/task_scheduler_observer.h"
37 
38 inline int parallel_get_max_threads() {
39  return tbb::this_task_arena::max_concurrency();
40 }
41 inline int parallel_get_num_threads() {
42  return parallel_get_max_threads();
43 }
44 inline int parallel_get_thread_num() {
45  return tbb::this_task_arena::current_thread_index();
46 }
47 inline void parallel_set_num_threads(int n) {
48  return;
49 }
50 inline int parallel_get_env_threads() {
51  return 0;
52 }
53 #if IE_THREAD == IE_THREAD_TBB
54 #define PARTITIONING , tbb::static_partitioner()
55 #else
56 #define PARTITIONING
57 #endif
58 #elif IE_THREAD == IE_THREAD_OMP
59 #include <omp.h>
60 
61 #include <algorithm>
62 #include <cstdlib>
63 #include <string>
64 
65 /* MSVC still supports omp 2.0 only */
66 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
67 #define collapse(x)
68 #endif // defined(_MSC_VER) && !defined(__INTEL_COMPILER)
69 inline int parallel_get_max_threads() {
70  return omp_get_max_threads();
71 }
72 inline int parallel_get_num_threads() {
73  return omp_get_num_threads();
74 }
75 inline int parallel_get_thread_num() {
76  return omp_get_thread_num();
77 }
78 inline void parallel_set_num_threads(int n) {
79  omp_set_num_threads(n);
80 }
81 inline int parallel_get_env_threads() {
82  int env_cores = 0;
83  if (getenv("OMP_NUM_THREADS") != nullptr) {
84  try {
85  env_cores = std::stoi(getenv("OMP_NUM_THREADS"));
86  } catch (const std::exception&) {
87  env_cores = 0;
88  }
89  }
90  return env_cores;
91 }
92 
93 #elif IE_THREAD == IE_THREAD_SEQ
94 #include <algorithm> // NOLINT
95 inline int parallel_get_env_threads() {
96  return 1;
97 }
98 inline int parallel_get_max_threads() {
99  return 1;
100 }
101 inline int parallel_get_num_threads() {
102  return 1;
103 }
104 inline int parallel_get_thread_num() {
105  return 0;
106 }
107 inline void parallel_set_num_threads(int n) {
108  return;
109 }
110 #endif
111 
112 namespace InferenceEngine {
113 
114 template <typename F>
115 void parallel_nt(int nthr, const F& func) {
116 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
117  if (nthr == 0) nthr = parallel_get_max_threads();
118  if (nthr == 1) {
119  func(0, 1);
120  return;
121  }
122 
123  tbb::parallel_for(0, nthr, [&](int ithr) {
124  func(ithr, nthr);
125  });
126 #elif IE_THREAD == IE_THREAD_OMP
127  if (nthr == 1) {
128  func(0, 1);
129  return;
130  }
131 
132 #pragma omp parallel num_threads(nthr)
133  func(parallel_get_thread_num(), parallel_get_num_threads());
134 #elif IE_THREAD == IE_THREAD_SEQ
135  func(0, 1);
136 #endif
137 }
138 
139 template <typename F>
140 void parallel_nt_static(int nthr, const F& func) {
141 #if IE_THREAD == IE_THREAD_SEQ
142  const bool serial = true;
143 #else
144  const bool serial = false;
145 #endif
146 
147  if (serial || nthr == 1) {
148  func(0, 1);
149  return;
150  }
151 
152  if (nthr == 0) nthr = parallel_get_max_threads();
153 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
154  tbb::parallel_for(
155  0, nthr,
156  [&](int ithr) {
157  func(ithr, nthr);
158  },
159  tbb::static_partitioner {});
160 
161 #elif IE_THREAD == IE_THREAD_OMP
162 
163 #pragma omp parallel num_threads(nthr)
164  { func(parallel_get_thread_num(), parallel_get_num_threads()); }
165 #endif
166 }
167 
168 template <typename I, typename F>
169 void parallel_sort(I begin, I end, const F& comparator) {
170 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
171  tbb::parallel_sort(begin, end, comparator);
172 #elif IE_THREAD == IE_THREAD_OMP
173  // TODO: propose OpenMP version
174  std::sort(begin, end, comparator);
175 #elif IE_THREAD == IE_THREAD_SEQ
176  std::sort(begin, end, comparator);
177 #endif
178 }
179 
180 template <typename T0, typename R, typename F>
181 R parallel_sum(const T0& D0, const R& input, const F& func) {
182 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
183  return tbb::parallel_reduce(
184  tbb::blocked_range<T0>(0, D0), input,
185  [&](const tbb::blocked_range<T0>& r, R init) -> R {
186  R sum = init;
187  for (T0 dim1 = r.begin(); dim1 < r.end(); ++dim1) sum += func(dim1);
188  return sum;
189  },
190  [](R x, R y) -> R {
191  return x + y;
192  } PARTITIONING);
193 #else
194  R sum = input;
195 
196 #ifdef _MSC_VER
197  using T0_IT = typename std::make_signed<T0>::type;
198 #else
199  using T0_IT = T0;
200 #endif
201 
202 #if IE_THREAD == IE_THREAD_OMP
203 #pragma omp parallel for reduction(+ : sum) schedule(static)
204 #endif
205  for (T0_IT dim1 = 0; dim1 < static_cast<T0_IT>(D0); dim1++) {
206  sum += static_cast<R>(func(dim1));
207  }
208  return sum;
209 #endif
210 }
211 
212 template <typename T0, typename T1, typename R, typename F>
213 R parallel_sum2d(const T0& D0, const T1& D1, const R& input, const F& func) {
214 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
215  return tbb::parallel_reduce(
216  tbb::blocked_range2d<T0, T1>(0, D0, 0, D1), input,
217  [&](const tbb::blocked_range2d<T0, T1>& r, R init) -> R {
218  R sum = init;
219  for (T0 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
220  for (T1 dim1 = r.cols().begin(); dim1 < r.cols().end(); dim1++) {
221  sum += func(dim2, dim1);
222  }
223  }
224  return sum;
225  },
226  [](R x, R y) -> R {
227  return x + y;
228  } PARTITIONING);
229 #else
230  R sum = input;
231 
232 #ifdef _MSC_VER
233  using T0_IT = typename std::make_signed<T0>::type;
234  using T1_IT = typename std::make_signed<T1>::type;
235 #else
236  using T0_IT = T0;
237  using T1_IT = T1;
238 #endif
239 
240 #if IE_THREAD == IE_THREAD_OMP
241 #pragma omp parallel for collapse(2) reduction(+ : sum) schedule(static)
242 #endif
243  for (T0_IT dim2 = 0; dim2 < D0; dim2++) {
244  for (T1_IT dim1 = 0; dim1 < D1; dim1++) {
245  sum += func(dim2, dim1);
246  }
247  }
248  return sum;
249 #endif
250 }
251 template <typename T0, typename T1, typename T2, typename R, typename F>
252 R parallel_sum3d(const T0& D0, const T1& D1, const T2& D2, const R& input, const F& func) {
253 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
254  return tbb::parallel_reduce(
255  tbb::blocked_range3d<T0, T1, T2>(0, D0, 0, D1, 0, D2), input,
256  [&](const tbb::blocked_range3d<T0, T1, T2>& r, R init) -> R {
257  R sum = init;
258  for (T0 dim1 = r.pages().begin(); dim1 < r.pages().end(); dim1++) {
259  for (T1 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
260  for (T2 dim3 = r.cols().begin(); dim3 < r.cols().end(); dim3++) {
261  sum += func(dim1, dim2, dim3);
262  }
263  }
264  }
265  return sum;
266  },
267  [](R x, R y) -> R {
268  return x + y;
269  } PARTITIONING);
270 #else
271  R sum = input;
272 
273 #ifdef _MSC_VER
274  using T0_IT = typename std::make_signed<T0>::type;
275  using T1_IT = typename std::make_signed<T1>::type;
276  using T2_IT = typename std::make_signed<T2>::type;
277 #else
278  using T0_IT = T0;
279  using T1_IT = T1;
280  using T2_IT = T2;
281 #endif
282 
283 #if IE_THREAD == IE_THREAD_OMP
284 #pragma omp parallel for collapse(3) reduction(+ : sum) schedule(static)
285 #endif
286  for (T0_IT dim1 = 0; dim1 < static_cast<T0_IT>(D0); dim1++) {
287  for (T1_IT dim2 = 0; dim2 < static_cast<T1_IT>(D1); dim2++) {
288  for (T2_IT dim3 = 0; dim3 < static_cast<T2_IT>(D2); dim3++) {
289  sum += func(dim1, dim2, dim3);
290  }
291  }
292  }
293  return sum;
294 #endif
295 }
296 
297 template <typename T>
298 inline T parallel_it_init(T start) {
299  return start;
300 }
301 template <typename T, typename Q, typename R, typename... Args>
302 inline T parallel_it_init(T start, Q& x, const R& X, Args&&... tuple) {
303  start = parallel_it_init(start, static_cast<Args>(tuple)...);
304  x = start % X;
305  return start / X;
306 }
307 
308 inline bool parallel_it_step() {
309  return true;
310 }
311 template <typename Q, typename R, typename... Args>
312 inline bool parallel_it_step(Q& x, const R& X, Args&&... tuple) {
313  if (parallel_it_step(static_cast<Args>(tuple)...)) {
314  x = (x + 1) % X;
315  return x == 0;
316  }
317  return false;
318 }
319 
320 template <typename T, typename Q>
321 inline void splitter(const T& n, const Q& team, const Q& tid, T& n_start, T& n_end) {
322  if (team <= 1 || n == 0) {
323  n_start = 0;
324  n_end = n;
325  } else {
326  T n1 = (n + (T)team - 1) / (T)team;
327  T n2 = n1 - 1;
328  T T1 = n - n2 * (T)team;
329  n_end = (T)tid < T1 ? n1 : n2;
330  n_start = (T)tid <= T1 ? tid * n1 : T1 * n1 + ((T)tid - T1) * n2;
331  }
332 
333  n_end += n_start;
334 }
335 
336 template <typename T0, typename F>
337 void for_1d(const int& ithr, const int& nthr, const T0& D0, const F& func) {
338  T0 d0 {0}, end {0};
339  splitter(D0, nthr, ithr, d0, end);
340  for (; d0 < end; ++d0) func(d0);
341 }
342 
343 template <typename T0, typename F>
344 void parallel_for(const T0& D0, const F& func) {
345 #if IE_THREAD == IE_THREAD_TBB
346  auto work_amount = static_cast<size_t>(D0);
347  int nthr = parallel_get_max_threads();
348  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
349  if (nthr == 1) {
350  for_1d(0, 1, D0, func);
351  } else {
352  tbb::parallel_for(
353  0, nthr,
354  [&](int ithr) {
355  for_1d(ithr, nthr, D0, func);
356  },
357  tbb::static_partitioner());
358  }
359 #elif IE_THREAD == IE_THREAD_TBB_AUTO
360  const int nthr = parallel_get_max_threads();
361  tbb::parallel_for(0, nthr, [&](int ithr) {
362  for_1d(ithr, nthr, D0, func);
363  });
364 #elif IE_THREAD == IE_THREAD_OMP
365 #pragma omp parallel
366  for_1d(parallel_get_thread_num(), parallel_get_num_threads(), D0, func);
367 #elif IE_THREAD == IE_THREAD_SEQ
368  for_1d(0, 1, D0, func);
369 #endif
370 }
371 
372 template <typename T0, typename T1, typename F>
373 void for_2d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const F& func) {
374  const size_t work_amount = (size_t)D0 * D1;
375  if (work_amount == 0) return;
376  size_t start {0}, end {0};
377  splitter(work_amount, nthr, ithr, start, end);
378 
379  T0 d0 {0};
380  T1 d1 {0};
381  parallel_it_init(start, d0, D0, d1, D1);
382  for (size_t iwork = start; iwork < end; ++iwork) {
383  func(d0, d1);
384  parallel_it_step(d0, D0, d1, D1);
385  }
386 }
387 
388 template <typename T0, typename T1, typename F>
389 void parallel_for2d(const T0& D0, const T1& D1, const F& func) {
390 #if IE_THREAD == IE_THREAD_TBB
391  auto work_amount = static_cast<size_t>(D0 * D1);
392  int nthr = parallel_get_max_threads();
393  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
394  if (nthr == 1) {
395  for_2d(0, 1, D0, D1, func);
396  } else {
397  tbb::parallel_for(
398  0, nthr,
399  [&](int ithr) {
400  for_2d(ithr, nthr, D0, D1, func);
401  },
402  tbb::static_partitioner());
403  }
404 #elif IE_THREAD == IE_THREAD_TBB_AUTO
405  const int nthr = parallel_get_max_threads();
406  tbb::parallel_for(0, nthr, [&](int ithr) {
407  for_2d(ithr, nthr, D0, D1, func);
408  });
409 #elif IE_THREAD == IE_THREAD_OMP
410 #pragma omp parallel
411  for_2d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, func);
412 #elif IE_THREAD == IE_THREAD_SEQ
413  for_2d(0, 1, D0, D1, func);
414 #endif
415 }
416 
417 template <typename T0, typename T1, typename T2, typename F>
418 void for_3d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const T2& D2, const F& func) {
419  const size_t work_amount = (size_t)D0 * D1 * D2;
420  if (work_amount == 0) return;
421  size_t start {0}, end {0};
422  splitter(work_amount, nthr, ithr, start, end);
423 
424  T0 d0 {0};
425  T1 d1 {0};
426  T2 d2 {0};
427  parallel_it_init(start, d0, D0, d1, D1, d2, D2);
428  for (size_t iwork = start; iwork < end; ++iwork) {
429  func(d0, d1, d2);
430  parallel_it_step(d0, D0, d1, D1, d2, D2);
431  }
432 }
433 
434 template <typename T0, typename T1, typename T2, typename F>
435 void parallel_for3d(const T0& D0, const T1& D1, const T2& D2, const F& func) {
436 #if IE_THREAD == IE_THREAD_TBB
437  auto work_amount = static_cast<size_t>(D0 * D1 * D2);
438  int nthr = parallel_get_max_threads();
439  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
440  if (nthr == 1) {
441  for_3d(0, 1, D0, D1, D2, func);
442  } else {
443  tbb::parallel_for(
444  0, nthr,
445  [&](int ithr) {
446  for_3d(ithr, nthr, D0, D1, D2, func);
447  },
448  tbb::static_partitioner());
449  }
450 #elif IE_THREAD == IE_THREAD_TBB_AUTO
451  const int nthr = parallel_get_max_threads();
452  tbb::parallel_for(0, nthr, [&](int ithr) {
453  for_3d(ithr, nthr, D0, D1, D2, func);
454  });
455 #elif IE_THREAD == IE_THREAD_OMP
456 #pragma omp parallel
457  for_3d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, func);
458 #elif IE_THREAD == IE_THREAD_SEQ
459  for_3d(0, 1, D0, D1, D2, func);
460 #endif
461 }
462 
463 template <typename T0, typename T1, typename T2, typename T3, typename F>
464 void for_4d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const T2& D2, const T3& D3, const F& func) {
465  const size_t work_amount = (size_t)D0 * D1 * D2 * D3;
466  if (work_amount == 0) return;
467  size_t start {0}, end {0};
468  splitter(work_amount, nthr, ithr, start, end);
469 
470  T0 d0 {0};
471  T1 d1 {0};
472  T2 d2 {0};
473  T3 d3 {0};
474  parallel_it_init(start, d0, D0, d1, D1, d2, D2, d3, D3);
475  for (size_t iwork = start; iwork < end; ++iwork) {
476  func(d0, d1, d2, d3);
477  parallel_it_step(d0, D0, d1, D1, d2, D2, d3, D3);
478  }
479 }
480 
481 template <typename T0, typename T1, typename T2, typename T3, typename F>
482 void parallel_for4d(const T0& D0, const T1& D1, const T2& D2, const T3& D3, const F& func) {
483 #if IE_THREAD == IE_THREAD_TBB
484  auto work_amount = static_cast<size_t>(D0 * D1 * D2 * D3);
485  int nthr = parallel_get_max_threads();
486  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
487  if (nthr == 1) {
488  for_4d(0, 1, D0, D1, D2, D3, func);
489  } else {
490  tbb::parallel_for(
491  0, nthr,
492  [&](int ithr) {
493  for_4d(ithr, nthr, D0, D1, D2, D3, func);
494  },
495  tbb::static_partitioner());
496  }
497 #elif IE_THREAD == IE_THREAD_TBB_AUTO
498  const int nthr = parallel_get_max_threads();
499  tbb::parallel_for(0, nthr, [&](int ithr) {
500  for_4d(ithr, nthr, D0, D1, D2, D3, func);
501  });
502 #elif IE_THREAD == IE_THREAD_OMP
503 #pragma omp parallel
504  for_4d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, D3, func);
505 #elif IE_THREAD == IE_THREAD_SEQ
506  for_4d(0, 1, D0, D1, D2, D3, func);
507 #endif
508 }
509 
510 template <typename T0, typename T1, typename T2, typename T3, typename T4, typename F>
511 void for_5d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const T2& D2, const T3& D3, const T4& D4,
512  const F& func) {
513  const size_t work_amount = (size_t)D0 * D1 * D2 * D3 * D4;
514  if (work_amount == 0) return;
515  size_t start {0}, end {0};
516  splitter(work_amount, nthr, ithr, start, end);
517 
518  T0 d0 {0};
519  T1 d1 {0};
520  T2 d2 {0};
521  T3 d3 {0};
522  T4 d4 {0};
523  parallel_it_init(start, d0, D0, d1, D1, d2, D2, d3, D3, d4, D4);
524  for (size_t iwork = start; iwork < end; ++iwork) {
525  func(d0, d1, d2, d3, d4);
526  parallel_it_step(d0, D0, d1, D1, d2, D2, d3, D3, d4, D4);
527  }
528 }
529 
530 template <typename T0, typename T1, typename T2, typename T3, typename T4, typename F>
531 void parallel_for5d(const T0& D0, const T1& D1, const T2& D2, const T3& D3, const T4& D4, const F& func) {
532 #if IE_THREAD == IE_THREAD_TBB
533  auto work_amount = static_cast<size_t>(D0 * D1 * D2 * D3 * D4);
534  int nthr = parallel_get_max_threads();
535  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
536  if (nthr == 1) {
537  for_5d(0, 1, D0, D1, D2, D3, D4, func);
538  } else {
539  tbb::parallel_for(
540  0, nthr,
541  [&](int ithr) {
542  for_5d(ithr, nthr, D0, D1, D2, D3, D4, func);
543  },
544  tbb::static_partitioner());
545  }
546 #elif IE_THREAD == IE_THREAD_TBB_AUTO
547  const int nthr = parallel_get_max_threads();
548  tbb::parallel_for(0, nthr, [&](int ithr) {
549  for_5d(ithr, nthr, D0, D1, D2, D3, D4, func);
550  });
551 #elif IE_THREAD == IE_THREAD_OMP
552 #pragma omp parallel
553  for_5d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, D3, D4, func);
554 #elif IE_THREAD == IE_THREAD_SEQ
555  for_5d(0, 1, D0, D1, D2, D3, D4, func);
556 #endif
557 }
558 
559 } // namespace InferenceEngine
Inference Engine API.
Definition: ie_argmax_layer.hpp:15
std::string sort
top K values sort mode could be &#39;value&#39; or &#39;index&#39;
Definition: ie_layers.h:2225
std::string type
Layer type.
Definition: ie_layers.h:47