ie_parallel.hpp
Go to the documentation of this file.
1 // Copyright (C) 2018-2019 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 /**
6  * @brief Contains declarations and definitions for sequential and multi-threading implementations.
7  * Multi-threading support is implemented in two variants: using the Threading Building Blocks library and OpenMP* product.
8  * To build a particular implementation, use the corresponding identifier: IE_THREAD_TBB, IE_THREAD_TBB_AUTO, IE_THREAD_OMP or IE_THREAD_SEQ.
9  * @file ie_parallel.hpp
10  */
11 
12 #pragma once
13 
14 #include <cstddef>
15 
16 #define IE_THREAD_TBB 0
17 #define IE_THREAD_OMP 1
18 #define IE_THREAD_SEQ 2
19 #define IE_THREAD_TBB_AUTO 3
20 
21 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
22 #define TBB_PREVIEW_LOCAL_OBSERVER 1
23 #include "tbb/task_scheduler_observer.h"
24 #include "tbb/parallel_for.h"
25 #include "tbb/task_arena.h"
26 
27 #include "tbb/parallel_sort.h"
28 #include "tbb/parallel_reduce.h"
29 #include "tbb/blocked_range.h"
30 #include "tbb/blocked_range2d.h"
31 #include "tbb/blocked_range3d.h"
32 
33 inline int parallel_get_max_threads() { return tbb::this_task_arena::max_concurrency(); }
34 inline int parallel_get_num_threads() { return parallel_get_max_threads(); }
35 inline int parallel_get_thread_num() { return tbb::this_task_arena::current_thread_index(); }
36 inline void parallel_set_num_threads(int n) { return; }
37 inline int parallel_get_env_threads() { return 0; }
38 #if IE_THREAD == IE_THREAD_TBB
39  #define PARTITIONING , tbb::static_partitioner()
40 #else
41  #define PARTITIONING
42 #endif
43 #elif IE_THREAD == IE_THREAD_OMP
44 #include <algorithm>
45 #include <cstdlib>
46 #include <string>
47 #include <omp.h>
48 
49 
50 /* MSVC still supports omp 2.0 only */
51 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
52 # define collapse(x)
53 #endif // defined(_MSC_VER) && !defined(__INTEL_COMPILER)
54 inline int parallel_get_max_threads() { return omp_get_max_threads(); }
55 inline int parallel_get_num_threads() { return omp_get_num_threads(); }
56 inline int parallel_get_thread_num() { return omp_get_thread_num(); }
57 inline void parallel_set_num_threads(int n) { omp_set_num_threads(n); }
58 inline int parallel_get_env_threads() {
59  int env_cores = 0;
60  if (getenv("OMP_NUM_THREADS") != nullptr) {
61  try {
62  env_cores = std::stoi(getenv("OMP_NUM_THREADS"));
63  } catch (const std::exception&) {
64  env_cores = 0;
65  }
66  }
67  return env_cores;
68 }
69 
70 #elif IE_THREAD == IE_THREAD_SEQ
71 #include <algorithm> // NOLINT
72 inline int parallel_get_env_threads() { return 1; }
73 inline int parallel_get_max_threads() { return 1; }
74 inline int parallel_get_num_threads() { return 1; }
75 inline int parallel_get_thread_num() { return 0; }
76 inline void parallel_set_num_threads(int n) { return; }
77 #endif
78 
79 
80 namespace InferenceEngine {
81 
82 template <typename F>
83 void parallel_nt(int nthr, const F &func) {
84 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
85  if (nthr == 0) nthr = parallel_get_max_threads();
86  if (nthr == 1) {
87  func(0, 1);
88  return;
89  }
90 
91  tbb::parallel_for(0, nthr, [&](int ithr) {
92  func(ithr, nthr);
93  });
94 #elif IE_THREAD == IE_THREAD_OMP
95  if (nthr == 1) {
96  func(0, 1);
97  return;
98  }
99 
100 # pragma omp parallel num_threads(nthr)
101  func(parallel_get_thread_num(), parallel_get_num_threads());
102 #elif IE_THREAD == IE_THREAD_SEQ
103  func(0, 1);
104 #endif
105 }
106 
107 template <typename F>
108 void parallel_nt_static(int nthr, const F &func) {
109 #if IE_THREAD == IE_THREAD_SEQ
110  const bool serial = true;
111 #else
112  const bool serial = false;
113 #endif
114 
115  if (serial || nthr == 1) {
116  func(0, 1);
117  return;
118  }
119 
120  if (nthr == 0) nthr = parallel_get_max_threads();
121 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
122  tbb::parallel_for(0, nthr, [&](int ithr) {
123  func(ithr, nthr);
124  }
125  , tbb::static_partitioner{});
126 
127 #elif IE_THREAD == IE_THREAD_OMP
128 
129 # pragma omp parallel num_threads(nthr)
130  {
131  func(parallel_get_thread_num(), parallel_get_num_threads());
132  }
133 #endif
134 }
135 
136 template <typename I, typename F>
137 void parallel_sort(I begin, I end, const F &comparator) {
138 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
139  tbb::parallel_sort(begin, end, comparator);
140 #elif IE_THREAD == IE_THREAD_OMP
141  // TODO: propose OpenMP version
142  std::sort(begin, end, comparator);
143 #elif IE_THREAD == IE_THREAD_SEQ
144  std::sort(begin, end, comparator);
145 #endif
146 }
147 
148 template <typename T0, typename R, typename F>
149 R parallel_sum(const T0 &D0, const R &input, const F &func) {
150 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
151  return tbb::parallel_reduce(
152  tbb::blocked_range<T0>(0, D0), input,
153  [&](const tbb::blocked_range<T0>& r, R init)->R {
154  R sum = init;
155  for (T0 dim1 = r.begin(); dim1 < r.end(); ++dim1)
156  sum += func(dim1);
157  return sum;
158  },
159  [](R x, R y)->R {
160  return x + y;
161  } PARTITIONING);
162 #else
163  R sum = input;
164 
165 #ifdef _MSC_VER
166  using T0_IT = typename std::make_signed<T0>::type;
167 #else
168  using T0_IT = T0;
169 #endif
170 
171 #if IE_THREAD == IE_THREAD_OMP
172  #pragma omp parallel for reduction(+ : sum) schedule(static)
173 #endif
174  for (T0_IT dim1 = 0; dim1 < static_cast<T0_IT>(D0); dim1++) {
175  sum += static_cast<R>(func(dim1));
176  }
177  return sum;
178 #endif
179 }
180 
181 template <typename T0, typename T1, typename R, typename F>
182 R parallel_sum2d(const T0 &D0, const T1 &D1, const R &input, const F &func) {
183 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
184  return tbb::parallel_reduce(
185  tbb::blocked_range2d<T0, T1>(0, D0, 0, D1), input,
186  [&](const tbb::blocked_range2d<T0, T1>& r, R init)->R {
187  R sum = init;
188  for (T0 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
189  for (T1 dim1 = r.cols().begin(); dim1 < r.cols().end(); dim1++) {
190  sum += func(dim2, dim1);
191  }
192  }
193  return sum;
194  },
195  [](R x, R y)->R {
196  return x + y;
197  } PARTITIONING);
198 #else
199  R sum = input;
200 
201 #ifdef _MSC_VER
202  using T0_IT = typename std::make_signed<T0>::type;
203  using T1_IT = typename std::make_signed<T1>::type;
204 #else
205  using T0_IT = T0;
206  using T1_IT = T1;
207 #endif
208 
209 #if IE_THREAD == IE_THREAD_OMP
210  #pragma omp parallel for collapse(2) reduction(+ : sum) schedule(static)
211 #endif
212  for (T0_IT dim2 = 0; dim2 < D0; dim2++) {
213  for (T1_IT dim1 = 0; dim1 < D1; dim1++) {
214  sum += func(dim2, dim1);
215  }
216  }
217  return sum;
218 #endif
219 }
220 template <typename T0, typename T1, typename T2, typename R, typename F>
221 R parallel_sum3d(const T0 &D0, const T1 &D1, const T2 &D2, const R &input, const F &func) {
222 #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
223  return tbb::parallel_reduce(
224  tbb::blocked_range3d<T0, T1, T2>(0, D0, 0, D1, 0, D2), input,
225  [&](const tbb::blocked_range3d<T0, T1, T2>& r, R init)->R {
226  R sum = init;
227  for (T0 dim1 = r.pages().begin(); dim1 < r.pages().end(); dim1++) {
228  for (T1 dim2 = r.rows().begin(); dim2 < r.rows().end(); dim2++) {
229  for (T2 dim3 = r.cols().begin(); dim3 < r.cols().end(); dim3++) {
230  sum += func(dim1, dim2, dim3);
231  }
232  }
233  }
234  return sum;
235  },
236  [](R x, R y)->R {
237  return x + y;
238  } PARTITIONING);
239 #else
240  R sum = input;
241 
242 #ifdef _MSC_VER
243  using T0_IT = typename std::make_signed<T0>::type;
244  using T1_IT = typename std::make_signed<T1>::type;
245  using T2_IT = typename std::make_signed<T2>::type;
246 #else
247  using T0_IT = T0;
248  using T1_IT = T1;
249  using T2_IT = T2;
250 #endif
251 
252 #if IE_THREAD == IE_THREAD_OMP
253  #pragma omp parallel for collapse(3) reduction(+ : sum) schedule(static)
254 #endif
255  for (T0_IT dim1 = 0; dim1 < static_cast<T0_IT>(D0); dim1++) {
256  for (T1_IT dim2 = 0; dim2 < static_cast<T1_IT>(D1); dim2++) {
257  for (T2_IT dim3 = 0; dim3 < static_cast<T2_IT>(D2); dim3++) {
258  sum += func(dim1, dim2, dim3);
259  }
260  }
261  }
262  return sum;
263 #endif
264 }
265 
266 template<typename T>
267 inline T parallel_it_init(T start) { return start; }
268 template<typename T, typename Q, typename R, typename... Args>
269 inline T parallel_it_init(T start, Q &x, const R &X, Args &&... tuple) {
270  start = parallel_it_init(start, static_cast<Args>(tuple)...);
271  x = start % X;
272  return start / X;
273 }
274 
275 inline bool parallel_it_step() { return true; }
276 template<typename Q, typename R, typename... Args>
277 inline bool parallel_it_step(Q &x, const R &X, Args &&... tuple) {
278  if (parallel_it_step(static_cast<Args>(tuple)...)) {
279  x = (x + 1) % X;
280  return x == 0;
281  }
282  return false;
283 }
284 
285 template <typename T, typename Q>
286 inline void splitter(const T &n, const Q &team, const Q &tid, T &n_start, T &n_end) {
287  if (team <= 1 || n == 0) {
288  n_start = 0;
289  n_end = n;
290  } else {
291  T n1 = (n + (T)team - 1) / (T)team;
292  T n2 = n1 - 1;
293  T T1 = n - n2 * (T)team;
294  n_end = (T)tid < T1 ? n1 : n2;
295  n_start = (T)tid <= T1 ? tid * n1 : T1 * n1 + ((T)tid - T1) * n2;
296  }
297 
298  n_end += n_start;
299 }
300 
301 
302 template <typename T0, typename F>
303 void for_1d(const int &ithr, const int &nthr, const T0 &D0, const F &func) {
304  T0 d0{ 0 }, end{ 0 };
305  splitter(D0, nthr, ithr, d0, end);
306  for (; d0 < end; ++d0) func(d0);
307 }
308 
309 template <typename T0, typename F>
310 void parallel_for(const T0 &D0, const F &func) {
311 #if IE_THREAD == IE_THREAD_TBB
312  auto work_amount = static_cast<size_t>(D0);
313  int nthr = parallel_get_max_threads();
314  if (static_cast<size_t>(nthr) > work_amount)
315  nthr = static_cast<int>(work_amount);
316  if (nthr == 1) {
317  for_1d(0, 1, D0, func);
318  } else {
319  tbb::parallel_for(0, nthr, [&](int ithr) {
320  for_1d(ithr, nthr, D0, func);
321  }, tbb::static_partitioner());
322  }
323 #elif IE_THREAD == IE_THREAD_TBB_AUTO
324  const int nthr = parallel_get_max_threads();
325  tbb::parallel_for(0, nthr, [&](int ithr) {
326  for_1d(ithr, nthr, D0, func);
327  });
328 #elif IE_THREAD == IE_THREAD_OMP
329 # pragma omp parallel
330  for_1d(parallel_get_thread_num(), parallel_get_num_threads(), D0, func);
331 #elif IE_THREAD == IE_THREAD_SEQ
332  for_1d(0, 1, D0, func);
333 #endif
334 }
335 
336 
337 template <typename T0, typename T1, typename F>
338 void for_2d(const int &ithr, const int &nthr, const T0 &D0, const T1 &D1, const F &func) {
339  const size_t work_amount = (size_t)D0 * D1;
340  if (work_amount == 0) return;
341  size_t start{ 0 }, end{ 0 };
342  splitter(work_amount, nthr, ithr, start, end);
343 
344  T0 d0{ 0 }; T1 d1{ 0 };
345  parallel_it_init(start, d0, D0, d1, D1);
346  for (size_t iwork = start; iwork < end; ++iwork) {
347  func(d0, d1);
348  parallel_it_step(d0, D0, d1, D1);
349  }
350 }
351 
352 template <typename T0, typename T1, typename F>
353 void parallel_for2d(const T0 &D0, const T1 &D1, const F &func) {
354 #if IE_THREAD == IE_THREAD_TBB
355  auto work_amount = static_cast<size_t>(D0 * D1);
356  int nthr = parallel_get_max_threads();
357  if (static_cast<size_t>(nthr) > work_amount)
358  nthr = static_cast<int>(work_amount);
359  if (nthr == 1) {
360  for_2d(0, 1, D0, D1, func);
361  } else {
362  tbb::parallel_for(0, nthr, [&](int ithr) {
363  for_2d(ithr, nthr, D0, D1, func);
364  }, tbb::static_partitioner());
365  }
366 #elif IE_THREAD == IE_THREAD_TBB_AUTO
367  const int nthr = parallel_get_max_threads();
368  tbb::parallel_for(0, nthr, [&](int ithr) {
369  for_2d(ithr, nthr, D0, D1, func);
370  });
371 #elif IE_THREAD == IE_THREAD_OMP
372 # pragma omp parallel
373  for_2d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, func);
374 #elif IE_THREAD == IE_THREAD_SEQ
375  for_2d(0, 1, D0, D1, func);
376 #endif
377 }
378 
379 
380 template <typename T0, typename T1, typename T2, typename F>
381 void for_3d(const int &ithr, const int &nthr, const T0 &D0, const T1 &D1,
382  const T2 &D2, const F &func) {
383  const size_t work_amount = (size_t)D0 * D1 * D2;
384  if (work_amount == 0) return;
385  size_t start{ 0 }, end{ 0 };
386  splitter(work_amount, nthr, ithr, start, end);
387 
388  T0 d0{ 0 }; T1 d1{ 0 }; T2 d2{ 0 };
389  parallel_it_init(start, d0, D0, d1, D1, d2, D2);
390  for (size_t iwork = start; iwork < end; ++iwork) {
391  func(d0, d1, d2);
392  parallel_it_step(d0, D0, d1, D1, d2, D2);
393  }
394 }
395 
396 template <typename T0, typename T1, typename T2, typename F>
397 void parallel_for3d(const T0 &D0, const T1 &D1, const T2 &D2, const F &func) {
398 #if IE_THREAD == IE_THREAD_TBB
399  auto work_amount = static_cast<size_t>(D0 * D1 * D2);
400  int nthr = parallel_get_max_threads();
401  if (static_cast<size_t>(nthr) > work_amount)
402  nthr = static_cast<int>(work_amount);
403  if (nthr == 1) {
404  for_3d(0, 1, D0, D1, D2, func);
405  } else {
406  tbb::parallel_for(0, nthr, [&](int ithr) {
407  for_3d(ithr, nthr, D0, D1, D2, func);
408  }, tbb::static_partitioner());
409  }
410 #elif IE_THREAD == IE_THREAD_TBB_AUTO
411  const int nthr = parallel_get_max_threads();
412  tbb::parallel_for(0, nthr, [&](int ithr) {
413  for_3d(ithr, nthr, D0, D1, D2, func);
414  });
415 #elif IE_THREAD == IE_THREAD_OMP
416 # pragma omp parallel
417  for_3d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, func);
418 #elif IE_THREAD == IE_THREAD_SEQ
419  for_3d(0, 1, D0, D1, D2, func);
420 #endif
421 }
422 
423 template <typename T0, typename T1, typename T2, typename T3, typename F>
424 void for_4d(const int &ithr, const int &nthr, const T0 &D0, const T1 &D1,
425  const T2 &D2, const T3 &D3, const F &func) {
426  const size_t work_amount = (size_t)D0 * D1 * D2 * D3;
427  if (work_amount == 0) return;
428  size_t start{ 0 }, end{ 0 };
429  splitter(work_amount, nthr, ithr, start, end);
430 
431  T0 d0{ 0 }; T1 d1{ 0 }; T2 d2{ 0 }; T3 d3{ 0 };
432  parallel_it_init(start, d0, D0, d1, D1, d2, D2, d3, D3);
433  for (size_t iwork = start; iwork < end; ++iwork) {
434  func(d0, d1, d2, d3);
435  parallel_it_step(d0, D0, d1, D1, d2, D2, d3, D3);
436  }
437 }
438 
439 template <typename T0, typename T1, typename T2, typename T3, typename F>
440 void parallel_for4d(const T0 &D0, const T1 &D1, const T2 &D2, const T3 &D3, const F &func) {
441 #if IE_THREAD == IE_THREAD_TBB
442  auto work_amount = static_cast<size_t>(D0 * D1 * D2 * D3);
443  int nthr = parallel_get_max_threads();
444  if (static_cast<size_t>(nthr) > work_amount)
445  nthr = static_cast<int>(work_amount);
446  if (nthr == 1) {
447  for_4d(0, 1, D0, D1, D2, D3, func);
448  } else {
449  tbb::parallel_for(0, nthr, [&](int ithr) {
450  for_4d(ithr, nthr, D0, D1, D2, D3, func);
451  }, tbb::static_partitioner());
452  }
453 #elif IE_THREAD == IE_THREAD_TBB_AUTO
454  const int nthr = parallel_get_max_threads();
455  tbb::parallel_for(0, nthr, [&](int ithr) {
456  for_4d(ithr, nthr, D0, D1, D2, D3, func);
457  });
458 #elif IE_THREAD == IE_THREAD_OMP
459 # pragma omp parallel
460  for_4d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, D3, func);
461 #elif IE_THREAD == IE_THREAD_SEQ
462  for_4d(0, 1, D0, D1, D2, D3, func);
463 #endif
464 }
465 
466 template <typename T0, typename T1, typename T2, typename T3, typename T4, typename F>
467 void for_5d(const int &ithr, const int &nthr, const T0 &D0, const T1 &D1,
468  const T2 &D2, const T3 &D3, const T4 &D4, const F &func) {
469  const size_t work_amount = (size_t)D0 * D1 * D2 * D3 * D4;
470  if (work_amount == 0) return;
471  size_t start{ 0 }, end{ 0 };
472  splitter(work_amount, nthr, ithr, start, end);
473 
474  T0 d0{ 0 }; T1 d1{ 0 }; T2 d2{ 0 }; T3 d3{ 0 }; T4 d4{ 0 };
475  parallel_it_init(start, d0, D0, d1, D1, d2, D2, d3, D3, d4, D4);
476  for (size_t iwork = start; iwork < end; ++iwork) {
477  func(d0, d1, d2, d3, d4);
478  parallel_it_step(d0, D0, d1, D1, d2, D2, d3, D3, d4, D4);
479  }
480 }
481 
482 template <typename T0, typename T1, typename T2, typename T3, typename T4, typename F>
483 void parallel_for5d(const T0 &D0, const T1 &D1, const T2 &D2, const T3 &D3,
484  const T4 &D4, const F &func) {
485 #if IE_THREAD == IE_THREAD_TBB
486  auto work_amount = static_cast<size_t>(D0 * D1 * D2 * D3 * D4);
487  int nthr = parallel_get_max_threads();
488  if (static_cast<size_t>(nthr) > work_amount)
489  nthr = static_cast<int>(work_amount);
490  if (nthr == 1) {
491  for_5d(0, 1, D0, D1, D2, D3, D4, func);
492  } else {
493  tbb::parallel_for(0, nthr, [&](int ithr) {
494  for_5d(ithr, nthr, D0, D1, D2, D3, D4, func);
495  }, tbb::static_partitioner());
496  }
497 #elif IE_THREAD == IE_THREAD_TBB_AUTO
498  const int nthr = parallel_get_max_threads();
499  tbb::parallel_for(0, nthr, [&](int ithr) {
500  for_5d(ithr, nthr, D0, D1, D2, D3, D4, func);
501  });
502 #elif IE_THREAD == IE_THREAD_OMP
503 # pragma omp parallel
504  for_5d(parallel_get_thread_num(), parallel_get_num_threads(), D0, D1, D2, D3, D4, func);
505 #elif IE_THREAD == IE_THREAD_SEQ
506  for_5d(0, 1, D0, D1, D2, D3, D4, func);
507 #endif
508 }
509 
510 } // namespace InferenceEngine
511 
Inference Engine API.
Definition: ie_argmax_layer.hpp:11