ie_parallel.hpp
Go to the documentation of this file.
1 // Copyright (C) 2018-2019 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 /**
6  * @brief Contains declarations and definitions for sequential and multi-threading implementations.
7  * Multi-threading support is implemented in two variants: using the Threading Building Blocks library and OpenMP*
8  * product. To build a particular implementation, use the corresponding identifier: IE_THREAD_TBB, IE_THREAD_TBB_AUTO,
9  * IE_THREAD_OMP or IE_THREAD_SEQ.
10  * @file ie_parallel.hpp
11  */
12 
13 #pragma once
14 
15 #include <cstddef>
16 
17 #define IE_THREAD_TBB 0
18 #define IE_THREAD_OMP 1
19 #define IE_THREAD_SEQ 2
20 #define IE_THREAD_TBB_AUTO 3
21 
22 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
23 #define TBB_PREVIEW_LOCAL_OBSERVER 1
24 #include "tbb/blocked_range.h"
25 #include "tbb/blocked_range2d.h"
26 #include "tbb/blocked_range3d.h"
27 #include "tbb/parallel_for.h"
28 #include "tbb/parallel_reduce.h"
29 #include "tbb/parallel_sort.h"
30 #include "tbb/task_arena.h"
31 #include "tbb/task_scheduler_observer.h"
32 
33 inline int parallel_get_max_threads() {
34  return tbb::this_task_arena::max_concurrency();
35 }
36 inline int parallel_get_num_threads() {
37  return parallel_get_max_threads();
38 }
39 inline int parallel_get_thread_num() {
40  return tbb::this_task_arena::current_thread_index();
41 }
42 inline void parallel_set_num_threads(int n) {
43  return;
44 }
45 inline int parallel_get_env_threads() {
46  return 0;
47 }
48 #if IE_THREAD == IE_THREAD_TBB
49 #define PARTITIONING , tbb::static_partitioner()
50 #else
51 #define PARTITIONING
52 #endif
53 #elif IE_THREAD == IE_THREAD_OMP
54 #include <omp.h>
55 
56 #include <algorithm>
57 #include <cstdlib>
58 #include <string>
59 
60 /* MSVC still supports omp 2.0 only */
61 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
62 #define collapse(x)
63 #endif // defined(_MSC_VER) && !defined(__INTEL_COMPILER)
64 inline int parallel_get_max_threads() {
65  return omp_get_max_threads();
66 }
67 inline int parallel_get_num_threads() {
68  return omp_get_num_threads();
69 }
70 inline int parallel_get_thread_num() {
71  return omp_get_thread_num();
72 }
73 inline void parallel_set_num_threads(int n) {
74  omp_set_num_threads(n);
75 }
76 inline int parallel_get_env_threads() {
77  int env_cores = 0;
78  if (getenv("OMP_NUM_THREADS") != nullptr) {
79  try {
80  env_cores = std::stoi(getenv("OMP_NUM_THREADS"));
81  } catch (const std::exception&) {
82  env_cores = 0;
83  }
84  }
85  return env_cores;
86 }
87 
88 #elif IE_THREAD == IE_THREAD_SEQ
89 #include <algorithm> // NOLINT
90 inline int parallel_get_env_threads() {
91  return 1;
92 }
93 inline int parallel_get_max_threads() {
94  return 1;
95 }
96 inline int parallel_get_num_threads() {
97  return 1;
98 }
99 inline int parallel_get_thread_num() {
100  return 0;
101 }
102 inline void parallel_set_num_threads(int n) {
103  return;
104 }
105 #endif
106 
107 namespace InferenceEngine {
108 
109 template <typename F>
110 void parallel_nt(int nthr, const F& func) {
111 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
112  if (nthr == 0) nthr = parallel_get_max_threads();
113  if (nthr == 1) {
114  func(0, 1);
115  return;
116  }
117 
118  tbb::parallel_for(0, nthr, [&](int ithr) {
119  func(ithr, nthr);
120  });
121 #elif IE_THREAD == IE_THREAD_OMP
122  if (nthr == 1) {
123  func(0, 1);
124  return;
125  }
126 
127 #pragma omp parallel num_threads(nthr)
128  func(parallel_get_thread_num(), parallel_get_num_threads());
129 #elif IE_THREAD == IE_THREAD_SEQ
130  func(0, 1);
131 #endif
132 }
133 
134 template <typename F>
135 void parallel_nt_static(int nthr, const F& func) {
136 #if IE_THREAD == IE_THREAD_SEQ
137  const bool serial = true;
138 #else
139  const bool serial = false;
140 #endif
141 
142  if (serial || nthr == 1) {
143  func(0, 1);
144  return;
145  }
146 
147  if (nthr == 0) nthr = parallel_get_max_threads();
148 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
149  tbb::parallel_for(
150  0, nthr,
151  [&](int ithr) {
152  func(ithr, nthr);
153  },
154  tbb::static_partitioner {});
155 
156 #elif IE_THREAD == IE_THREAD_OMP
157 
158 #pragma omp parallel num_threads(nthr)
159  { func(parallel_get_thread_num(), parallel_get_num_threads()); }
160 #endif
161 }
162 
163 template <typename I, typename F>
164 void parallel_sort(I begin, I end, const F& comparator) {
165 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
166  tbb::parallel_sort(begin, end, comparator);
167 #elif IE_THREAD == IE_THREAD_OMP
168  // TODO: propose OpenMP version
169  std::sort(begin, end, comparator);
170 #elif IE_THREAD == IE_THREAD_SEQ
171  std::sort(begin, end, comparator);
172 #endif
173 }
174 
175 template <typename T0, typename R, typename F>
176 R parallel_sum(const T0& D0, const R& input, const F& func) {
177 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
178  return tbb::parallel_reduce(
179  tbb::blocked_range<T0>(0, D0), input,
180  [&](const tbb::blocked_range<T0>& r, R init) -> R {
181  R sum = init;
182  for (T0 dim1 = r.begin(); dim1 < r.end(); ++dim1) sum += func(dim1);
183  return sum;
184  },
185  [](R x, R y) -> R {
186  return x + y;
187  } PARTITIONING);
188 #else
189  R sum = input;
190 
191 #ifdef _MSC_VER
192  using T0_IT = typename std::make_signed<T0>::type;
193 #else
194  using T0_IT = T0;
195 #endif
196 
197 #if IE_THREAD == IE_THREAD_OMP
198 #pragma omp parallel for reduction(+ : sum) schedule(static)
199 #endif
200  for (T0_IT dim1 = 0; dim1 < static_cast<T0_IT>(D0); dim1++) {
201  sum += static_cast<R>(func(dim1));
202  }
203  return sum;
204 #endif
205 }
206 
207 template <typename T0, typename T1, typename R, typename F>
208 R parallel_sum2d(const T0& D0, const T1& D1, const R& input, const F& func) {
209 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
210  return tbb::parallel_reduce(
211  tbb::blocked_range2d<T0, T1>(0, D0, 0, D1), input,
212  [&](const tbb::blocked_range2d<T0, T1>& r, R init) -> R {
213  R sum = init;
214  for (T0 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
215  for (T1 dim1 = r.cols().begin(); dim1 < r.cols().end(); dim1++) {
216  sum += func(dim2, dim1);
217  }
218  }
219  return sum;
220  },
221  [](R x, R y) -> R {
222  return x + y;
223  } PARTITIONING);
224 #else
225  R sum = input;
226 
227 #ifdef _MSC_VER
228  using T0_IT = typename std::make_signed<T0>::type;
229  using T1_IT = typename std::make_signed<T1>::type;
230 #else
231  using T0_IT = T0;
232  using T1_IT = T1;
233 #endif
234 
235 #if IE_THREAD == IE_THREAD_OMP
236 #pragma omp parallel for collapse(2) reduction(+ : sum) schedule(static)
237 #endif
238  for (T0_IT dim2 = 0; dim2 < D0; dim2++) {
239  for (T1_IT dim1 = 0; dim1 < D1; dim1++) {
240  sum += func(dim2, dim1);
241  }
242  }
243  return sum;
244 #endif
245 }
246 template <typename T0, typename T1, typename T2, typename R, typename F>
247 R parallel_sum3d(const T0& D0, const T1& D1, const T2& D2, const R& input, const F& func) {
248 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
249  return tbb::parallel_reduce(
250  tbb::blocked_range3d<T0, T1, T2>(0, D0, 0, D1, 0, D2), input,
251  [&](const tbb::blocked_range3d<T0, T1, T2>& r, R init) -> R {
252  R sum = init;
253  for (T0 dim1 = r.pages().begin(); dim1 < r.pages().end(); dim1++) {
254  for (T1 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
255  for (T2 dim3 = r.cols().begin(); dim3 < r.cols().end(); dim3++) {
256  sum += func(dim1, dim2, dim3);
257  }
258  }
259  }
260  return sum;
261  },
262  [](R x, R y) -> R {
263  return x + y;
264  } PARTITIONING);
265 #else
266  R sum = input;
267 
268 #ifdef _MSC_VER
269  using T0_IT = typename std::make_signed<T0>::type;
270  using T1_IT = typename std::make_signed<T1>::type;
271  using T2_IT = typename std::make_signed<T2>::type;
272 #else
273  using T0_IT = T0;
274  using T1_IT = T1;
275  using T2_IT = T2;
276 #endif
277 
278 #if IE_THREAD == IE_THREAD_OMP
279 #pragma omp parallel for collapse(3) reduction(+ : sum) schedule(static)
280 #endif
281  for (T0_IT dim1 = 0; dim1 < static_cast<T0_IT>(D0); dim1++) {
282  for (T1_IT dim2 = 0; dim2 < static_cast<T1_IT>(D1); dim2++) {
283  for (T2_IT dim3 = 0; dim3 < static_cast<T2_IT>(D2); dim3++) {
284  sum += func(dim1, dim2, dim3);
285  }
286  }
287  }
288  return sum;
289 #endif
290 }
291 
292 template <typename T>
293 inline T parallel_it_init(T start) {
294  return start;
295 }
296 template <typename T, typename Q, typename R, typename... Args>
297 inline T parallel_it_init(T start, Q& x, const R& X, Args&&... tuple) {
298  start = parallel_it_init(start, static_cast<Args>(tuple)...);
299  x = start % X;
300  return start / X;
301 }
302 
303 inline bool parallel_it_step() {
304  return true;
305 }
306 template <typename Q, typename R, typename... Args>
307 inline bool parallel_it_step(Q& x, const R& X, Args&&... tuple) {
308  if (parallel_it_step(static_cast<Args>(tuple)...)) {
309  x = (x + 1) % X;
310  return x == 0;
311  }
312  return false;
313 }
314 
315 template <typename T, typename Q>
316 inline void splitter(const T& n, const Q& team, const Q& tid, T& n_start, T& n_end) {
317  if (team <= 1 || n == 0) {
318  n_start = 0;
319  n_end = n;
320  } else {
321  T n1 = (n + (T)team - 1) / (T)team;
322  T n2 = n1 - 1;
323  T T1 = n - n2 * (T)team;
324  n_end = (T)tid < T1 ? n1 : n2;
325  n_start = (T)tid <= T1 ? tid * n1 : T1 * n1 + ((T)tid - T1) * n2;
326  }
327 
328  n_end += n_start;
329 }
330 
331 template <typename T0, typename F>
332 void for_1d(const int& ithr, const int& nthr, const T0& D0, const F& func) {
333  T0 d0 {0}, end {0};
334  splitter(D0, nthr, ithr, d0, end);
335  for (; d0 < end; ++d0) func(d0);
336 }
337 
338 template <typename T0, typename F>
339 void parallel_for(const T0& D0, const F& func) {
340 #if IE_THREAD == IE_THREAD_TBB
341  auto work_amount = static_cast<size_t>(D0);
342  int nthr = parallel_get_max_threads();
343  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
344  if (nthr == 1) {
345  for_1d(0, 1, D0, func);
346  } else {
347  tbb::parallel_for(
348  0, nthr,
349  [&](int ithr) {
350  for_1d(ithr, nthr, D0, func);
351  },
352  tbb::static_partitioner());
353  }
354 #elif IE_THREAD == IE_THREAD_TBB_AUTO
355  const int nthr = parallel_get_max_threads();
356  tbb::parallel_for(0, nthr, [&](int ithr) {
357  for_1d(ithr, nthr, D0, func);
358  });
359 #elif IE_THREAD == IE_THREAD_OMP
360 #pragma omp parallel
361  for_1d(parallel_get_thread_num(), parallel_get_num_threads(), D0, func);
362 #elif IE_THREAD == IE_THREAD_SEQ
363  for_1d(0, 1, D0, func);
364 #endif
365 }
366 
367 template <typename T0, typename T1, typename F>
368 void for_2d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const F& func) {
369  const size_t work_amount = (size_t)D0 * D1;
370  if (work_amount == 0) return;
371  size_t start {0}, end {0};
372  splitter(work_amount, nthr, ithr, start, end);
373 
374  T0 d0 {0};
375  T1 d1 {0};
376  parallel_it_init(start, d0, D0, d1, D1);
377  for (size_t iwork = start; iwork < end; ++iwork) {
378  func(d0, d1);
379  parallel_it_step(d0, D0, d1, D1);
380  }
381 }
382 
383 template <typename T0, typename T1, typename F>
384 void parallel_for2d(const T0& D0, const T1& D1, const F& func) {
385 #if IE_THREAD == IE_THREAD_TBB
386  auto work_amount = static_cast<size_t>(D0 * D1);
387  int nthr = parallel_get_max_threads();
388  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
389  if (nthr == 1) {
390  for_2d(0, 1, D0, D1, func);
391  } else {
392  tbb::parallel_for(
393  0, nthr,
394  [&](int ithr) {
395  for_2d(ithr, nthr, D0, D1, func);
396  },
397  tbb::static_partitioner());
398  }
399 #elif IE_THREAD == IE_THREAD_TBB_AUTO
400  const int nthr = parallel_get_max_threads();
401  tbb::parallel_for(0, nthr, [&](int ithr) {
402  for_2d(ithr, nthr, D0, D1, func);
403  });
404 #elif IE_THREAD == IE_THREAD_OMP
405 #pragma omp parallel
406  for_2d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, func);
407 #elif IE_THREAD == IE_THREAD_SEQ
408  for_2d(0, 1, D0, D1, func);
409 #endif
410 }
411 
412 template <typename T0, typename T1, typename T2, typename F>
413 void for_3d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const T2& D2, const F& func) {
414  const size_t work_amount = (size_t)D0 * D1 * D2;
415  if (work_amount == 0) return;
416  size_t start {0}, end {0};
417  splitter(work_amount, nthr, ithr, start, end);
418 
419  T0 d0 {0};
420  T1 d1 {0};
421  T2 d2 {0};
422  parallel_it_init(start, d0, D0, d1, D1, d2, D2);
423  for (size_t iwork = start; iwork < end; ++iwork) {
424  func(d0, d1, d2);
425  parallel_it_step(d0, D0, d1, D1, d2, D2);
426  }
427 }
428 
429 template <typename T0, typename T1, typename T2, typename F>
430 void parallel_for3d(const T0& D0, const T1& D1, const T2& D2, const F& func) {
431 #if IE_THREAD == IE_THREAD_TBB
432  auto work_amount = static_cast<size_t>(D0 * D1 * D2);
433  int nthr = parallel_get_max_threads();
434  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
435  if (nthr == 1) {
436  for_3d(0, 1, D0, D1, D2, func);
437  } else {
438  tbb::parallel_for(
439  0, nthr,
440  [&](int ithr) {
441  for_3d(ithr, nthr, D0, D1, D2, func);
442  },
443  tbb::static_partitioner());
444  }
445 #elif IE_THREAD == IE_THREAD_TBB_AUTO
446  const int nthr = parallel_get_max_threads();
447  tbb::parallel_for(0, nthr, [&](int ithr) {
448  for_3d(ithr, nthr, D0, D1, D2, func);
449  });
450 #elif IE_THREAD == IE_THREAD_OMP
451 #pragma omp parallel
452  for_3d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, func);
453 #elif IE_THREAD == IE_THREAD_SEQ
454  for_3d(0, 1, D0, D1, D2, func);
455 #endif
456 }
457 
458 template <typename T0, typename T1, typename T2, typename T3, typename F>
459 void for_4d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const T2& D2, const T3& D3, const F& func) {
460  const size_t work_amount = (size_t)D0 * D1 * D2 * D3;
461  if (work_amount == 0) return;
462  size_t start {0}, end {0};
463  splitter(work_amount, nthr, ithr, start, end);
464 
465  T0 d0 {0};
466  T1 d1 {0};
467  T2 d2 {0};
468  T3 d3 {0};
469  parallel_it_init(start, d0, D0, d1, D1, d2, D2, d3, D3);
470  for (size_t iwork = start; iwork < end; ++iwork) {
471  func(d0, d1, d2, d3);
472  parallel_it_step(d0, D0, d1, D1, d2, D2, d3, D3);
473  }
474 }
475 
476 template <typename T0, typename T1, typename T2, typename T3, typename F>
477 void parallel_for4d(const T0& D0, const T1& D1, const T2& D2, const T3& D3, const F& func) {
478 #if IE_THREAD == IE_THREAD_TBB
479  auto work_amount = static_cast<size_t>(D0 * D1 * D2 * D3);
480  int nthr = parallel_get_max_threads();
481  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
482  if (nthr == 1) {
483  for_4d(0, 1, D0, D1, D2, D3, func);
484  } else {
485  tbb::parallel_for(
486  0, nthr,
487  [&](int ithr) {
488  for_4d(ithr, nthr, D0, D1, D2, D3, func);
489  },
490  tbb::static_partitioner());
491  }
492 #elif IE_THREAD == IE_THREAD_TBB_AUTO
493  const int nthr = parallel_get_max_threads();
494  tbb::parallel_for(0, nthr, [&](int ithr) {
495  for_4d(ithr, nthr, D0, D1, D2, D3, func);
496  });
497 #elif IE_THREAD == IE_THREAD_OMP
498 #pragma omp parallel
499  for_4d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, D3, func);
500 #elif IE_THREAD == IE_THREAD_SEQ
501  for_4d(0, 1, D0, D1, D2, D3, func);
502 #endif
503 }
504 
505 template <typename T0, typename T1, typename T2, typename T3, typename T4, typename F>
506 void for_5d(const int& ithr, const int& nthr, const T0& D0, const T1& D1, const T2& D2, const T3& D3, const T4& D4,
507  const F& func) {
508  const size_t work_amount = (size_t)D0 * D1 * D2 * D3 * D4;
509  if (work_amount == 0) return;
510  size_t start {0}, end {0};
511  splitter(work_amount, nthr, ithr, start, end);
512 
513  T0 d0 {0};
514  T1 d1 {0};
515  T2 d2 {0};
516  T3 d3 {0};
517  T4 d4 {0};
518  parallel_it_init(start, d0, D0, d1, D1, d2, D2, d3, D3, d4, D4);
519  for (size_t iwork = start; iwork < end; ++iwork) {
520  func(d0, d1, d2, d3, d4);
521  parallel_it_step(d0, D0, d1, D1, d2, D2, d3, D3, d4, D4);
522  }
523 }
524 
525 template <typename T0, typename T1, typename T2, typename T3, typename T4, typename F>
526 void parallel_for5d(const T0& D0, const T1& D1, const T2& D2, const T3& D3, const T4& D4, const F& func) {
527 #if IE_THREAD == IE_THREAD_TBB
528  auto work_amount = static_cast<size_t>(D0 * D1 * D2 * D3 * D4);
529  int nthr = parallel_get_max_threads();
530  if (static_cast<size_t>(nthr) > work_amount) nthr = static_cast<int>(work_amount);
531  if (nthr == 1) {
532  for_5d(0, 1, D0, D1, D2, D3, D4, func);
533  } else {
534  tbb::parallel_for(
535  0, nthr,
536  [&](int ithr) {
537  for_5d(ithr, nthr, D0, D1, D2, D3, D4, func);
538  },
539  tbb::static_partitioner());
540  }
541 #elif IE_THREAD == IE_THREAD_TBB_AUTO
542  const int nthr = parallel_get_max_threads();
543  tbb::parallel_for(0, nthr, [&](int ithr) {
544  for_5d(ithr, nthr, D0, D1, D2, D3, D4, func);
545  });
546 #elif IE_THREAD == IE_THREAD_OMP
547 #pragma omp parallel
548  for_5d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, D3, D4, func);
549 #elif IE_THREAD == IE_THREAD_SEQ
550  for_5d(0, 1, D0, D1, D2, D3, D4, func);
551 #endif
552 }
553 
554 } // namespace InferenceEngine
Inference Engine API.
Definition: ie_argmax_layer.hpp:11