Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
flow_graph.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_flow_graph_H
18#define __TBB_flow_graph_H
19
20#define __TBB_flow_graph_H_include_area
22
23#include "tbb_stddef.h"
24#include "atomic.h"
25#include "spin_mutex.h"
26#include "null_mutex.h"
27#include "spin_rw_mutex.h"
28#include "null_rw_mutex.h"
29#include "task.h"
31#include "tbb_exception.h"
35#include "tbb_profiling.h"
36#include "task_arena.h"
37
38#if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
39 #if __INTEL_COMPILER
40 // Disabled warning "routine is both inline and noinline"
41 #pragma warning (push)
42 #pragma warning( disable: 2196 )
43 #endif
44 #define __TBB_NOINLINE_SYM __attribute__((noinline))
45#else
46 #define __TBB_NOINLINE_SYM
47#endif
48
49#if __TBB_PREVIEW_ASYNC_MSG
50#include <vector> // std::vector in internal::async_storage
51#include <memory> // std::shared_ptr in async_msg
52#endif
53
54#if __TBB_PREVIEW_STREAMING_NODE
55// For streaming_node
56#include <array> // std::array
57#include <unordered_map> // std::unordered_map
58#include <type_traits> // std::decay, std::true_type, std::false_type
59#endif // __TBB_PREVIEW_STREAMING_NODE
60
61#if TBB_DEPRECATED_FLOW_ENQUEUE
62#define FLOW_SPAWN(a) tbb::task::enqueue((a))
63#else
64#define FLOW_SPAWN(a) tbb::task::spawn((a))
65#endif
66
67#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
68#define __TBB_DEFAULT_NODE_ALLOCATOR(T) cache_aligned_allocator<T>
69#else
70#define __TBB_DEFAULT_NODE_ALLOCATOR(T) null_type
71#endif
72
73// use the VC10 or gcc version of tuple if it is available.
74#if __TBB_CPP11_TUPLE_PRESENT
75 #include <tuple>
76namespace tbb {
77 namespace flow {
78 using std::tuple;
79 using std::tuple_size;
80 using std::tuple_element;
81 using std::get;
82 }
83}
84#else
85 #include "compat/tuple"
86#endif
87
88#include<list>
89#include<queue>
90
101namespace tbb {
102namespace flow {
103
105enum concurrency { unlimited = 0, serial = 1 };
106
107namespace interface11 {
108
110struct null_type {};
111
114
116template< typename T > class sender;
117template< typename T > class receiver;
119
120template< typename T, typename U > class limiter_node; // needed for resetting decrementer
121
122template< typename R, typename B > class run_and_put_task;
123
124namespace internal {
125
126template<typename T, typename M> class successor_cache;
127template<typename T, typename M> class broadcast_cache;
128template<typename T, typename M> class round_robin_cache;
129template<typename T, typename M> class predecessor_cache;
130template<typename T, typename M> class reservable_predecessor_cache;
131
132#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
133namespace order {
134struct following;
135struct preceding;
136}
137template<typename Order, typename... Args> struct node_set;
138#endif
139
140#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
141// Holder of edges both for caches and for those nodes which do not have predecessor caches.
142// C == receiver< ... > or sender< ... >, depending.
143template<typename C>
144class edge_container {
145
146public:
147 typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
148
149 void add_edge(C &s) {
150 built_edges.push_back(&s);
151 }
152
153 void delete_edge(C &s) {
154 for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
155 if (*i == &s) {
156 (void)built_edges.erase(i);
157 return; // only remove one predecessor per request
158 }
159 }
160 }
161
162 void copy_edges(edge_list_type &v) {
163 v = built_edges;
164 }
165
166 size_t edge_count() {
167 return (size_t)(built_edges.size());
168 }
169
170 void clear() {
171 built_edges.clear();
172 }
173
174 // methods remove the statement from all predecessors/successors liste in the edge
175 // container.
176 template< typename S > void sender_extract(S &s);
177 template< typename R > void receiver_extract(R &r);
178
179private:
180 edge_list_type built_edges;
181}; // class edge_container
182#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
183
184} // namespace internal
185
186} // namespace interfaceX
187} // namespace flow
188} // namespace tbb
189
192
193namespace tbb {
194namespace flow {
195namespace interface11 {
196
197// enqueue left task if necessary. Returns the non-enqueued task if there is one.
198static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
199 // if no RHS task, don't change left.
200 if (right == NULL) return left;
201 // right != NULL
202 if (left == NULL) return right;
203 if (left == SUCCESSFULLY_ENQUEUED) return right;
204 // left contains a task
205 if (right != SUCCESSFULLY_ENQUEUED) {
206 // both are valid tasks
208 return right;
209 }
210 return left;
211}
212
213#if __TBB_PREVIEW_ASYNC_MSG
214
215template < typename T > class __TBB_DEPRECATED async_msg;
216
217namespace internal {
218
219template < typename T > class async_storage;
220
221template< typename T, typename = void >
223 typedef async_msg<T> async_type;
224 typedef T filtered_type;
225
226 static const bool is_async_type = false;
227
228 static const void* to_void_ptr(const T& t) {
229 return static_cast<const void*>(&t);
230 }
231
232 static void* to_void_ptr(T& t) {
233 return static_cast<void*>(&t);
234 }
235
236 static const T& from_void_ptr(const void* p) {
237 return *static_cast<const T*>(p);
238 }
239
240 static T& from_void_ptr(void* p) {
241 return *static_cast<T*>(p);
242 }
243
244 static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
245 if (is_async) {
246 // This (T) is NOT async and incoming 'A<X> t' IS async
247 // Get data from async_msg
248 const async_msg<filtered_type>& msg = async_helpers< async_msg<filtered_type> >::from_void_ptr(p);
249 task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
250 // finalize() must be called after subscribe() because set() can be called in finalize()
251 // and 'this_recv' client must be subscribed by this moment
252 msg.finalize();
253 return new_task;
254 }
255 else {
256 // Incoming 't' is NOT async
257 return this_recv->try_put_task(from_void_ptr(p));
258 }
259 }
260};
261
262template< typename T >
263struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
264 typedef T async_type;
265 typedef typename T::async_msg_data_type filtered_type;
266
267 static const bool is_async_type = true;
268
269 // Receiver-classes use const interfaces
270 static const void* to_void_ptr(const T& t) {
271 return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
272 }
273
274 static void* to_void_ptr(T& t) {
275 return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
276 }
277
278 // Sender-classes use non-const interfaces
279 static const T& from_void_ptr(const void* p) {
280 return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
281 }
282
283 static T& from_void_ptr(void* p) {
284 return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
285 }
286
287 // Used in receiver<T> class
288 static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
289 if (is_async) {
290 // Both are async
291 return this_recv->try_put_task(from_void_ptr(p));
292 }
293 else {
294 // This (T) is async and incoming 'X t' is NOT async
295 // Create async_msg for X
297 const T msg(t);
298 return this_recv->try_put_task(msg);
299 }
300 }
301};
302
303class untyped_receiver;
304
306 template< typename, typename > friend class internal::predecessor_cache;
307 template< typename, typename > friend class internal::reservable_predecessor_cache;
308public:
311
312 virtual ~untyped_sender() {}
313
314 // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
315
316 // TODO: Prevent untyped successor registration
317
319 virtual bool register_successor( successor_type &r ) = 0;
320
322 virtual bool remove_successor( successor_type &r ) = 0;
323
325 virtual bool try_release( ) { return false; }
326
328 virtual bool try_consume( ) { return false; }
329
330#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
332 typedef internal::edge_container<successor_type> built_successors_type;
333 typedef built_successors_type::edge_list_type successor_list_type;
334 virtual built_successors_type &built_successors() = 0;
335 virtual void internal_add_built_successor( successor_type & ) = 0;
336 virtual void internal_delete_built_successor( successor_type & ) = 0;
337 virtual void copy_successors( successor_list_type &) = 0;
338 virtual size_t successor_count() = 0;
339#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
340protected:
342 template< typename X >
343 bool try_get( X &t ) {
345 }
346
348 template< typename X >
349 bool try_reserve( X &t ) {
351 }
352
353 virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
354 virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
355};
356
358 template< typename, typename > friend class run_and_put_task;
359
360 template< typename, typename > friend class internal::broadcast_cache;
361 template< typename, typename > friend class internal::round_robin_cache;
362 template< typename, typename > friend class internal::successor_cache;
363
364#if __TBB_PREVIEW_OPENCL_NODE
365 template< typename, typename > friend class proxy_dependency_receiver;
366#endif /* __TBB_PREVIEW_OPENCL_NODE */
367public:
370
372 virtual ~untyped_receiver() {}
373
375 template<typename X>
376 bool try_put(const X& t) {
377 task *res = try_put_task(t);
378 if (!res) return false;
380 return true;
381 }
382
383 // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
384
385 // TODO: Prevent untyped predecessor registration
386
388 virtual bool register_predecessor( predecessor_type & ) { return false; }
389
391 virtual bool remove_predecessor( predecessor_type & ) { return false; }
392
393#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
394 typedef internal::edge_container<predecessor_type> built_predecessors_type;
395 typedef built_predecessors_type::edge_list_type predecessor_list_type;
396 virtual built_predecessors_type &built_predecessors() = 0;
397 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
398 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
399 virtual void copy_predecessors( predecessor_list_type & ) = 0;
400 virtual size_t predecessor_count() = 0;
401#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
402protected:
403 template<typename X>
404 task *try_put_task(const X& t) {
406 }
407
408 virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
409
410 virtual graph& graph_reference() const = 0;
411
412 // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
413
416
417 virtual bool is_continue_receiver() { return false; }
418};
419
420} // namespace internal
421
423template< typename T >
425public:
428
430
432 virtual bool try_get( T & ) { return false; }
433
435 virtual bool try_reserve( T & ) { return false; }
436
437protected:
438 virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
439 // Both async OR both are NOT async
442 }
443 // Else: this (T) is async OR incoming 't' is async
444 __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
445 return false;
446 }
447
448 virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
449 // Both async OR both are NOT async
452 }
453 // Else: this (T) is async OR incoming 't' is async
454 __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
455 return false;
456 }
457}; // class sender<T>
458
460template< typename T >
462 template< typename > friend class internal::async_storage;
463 template< typename, typename > friend struct internal::async_helpers;
464public:
467
469
473 }
474
477 }
478
479protected:
480 virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
482 }
483
485 virtual task *try_put_task(const T& t) = 0;
486
487}; // class receiver<T>
488
489#else // __TBB_PREVIEW_ASYNC_MSG
490
492template< typename T >
493class sender {
494public:
497
500
501 virtual ~sender() {}
502
503 // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
504
507
509 __TBB_DEPRECATED virtual bool remove_successor( successor_type &r ) = 0;
510
512 virtual bool try_get( T & ) { return false; }
513
515 virtual bool try_reserve( T & ) { return false; }
516
518 virtual bool try_release( ) { return false; }
519
521 virtual bool try_consume( ) { return false; }
522
523#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
525 __TBB_DEPRECATED typedef typename internal::edge_container<successor_type> built_successors_type;
526 __TBB_DEPRECATED typedef typename built_successors_type::edge_list_type successor_list_type;
527 __TBB_DEPRECATED virtual built_successors_type &built_successors() = 0;
528 __TBB_DEPRECATED virtual void internal_add_built_successor( successor_type & ) = 0;
529 __TBB_DEPRECATED virtual void internal_delete_built_successor( successor_type & ) = 0;
530 __TBB_DEPRECATED virtual void copy_successors( successor_list_type &) = 0;
531 __TBB_DEPRECATED virtual size_t successor_count() = 0;
532#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
533}; // class sender<T>
534
536template< typename T >
537class receiver {
538public:
541
543 __TBB_DEPRECATED typedef sender<T> predecessor_type;
544
546 virtual ~receiver() {}
547
549 bool try_put( const T& t ) {
550 task *res = try_put_task(t);
551 if (!res) return false;
553 return true;
554 }
555
557protected:
558 template< typename R, typename B > friend class run_and_put_task;
559 template< typename X, typename Y > friend class internal::broadcast_cache;
560 template< typename X, typename Y > friend class internal::round_robin_cache;
561 virtual task *try_put_task(const T& t) = 0;
562 virtual graph& graph_reference() const = 0;
563public:
564 // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
565
567 __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
568
570 __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
571
572#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
573 __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
574 __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
575 __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors() = 0;
576 __TBB_DEPRECATED virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
577 __TBB_DEPRECATED virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
578 __TBB_DEPRECATED virtual void copy_predecessors( predecessor_list_type & ) = 0;
579 __TBB_DEPRECATED virtual size_t predecessor_count() = 0;
580#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
581
582protected:
584 virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
585
586 template<typename TT, typename M> friend class internal::successor_cache;
587 virtual bool is_continue_receiver() { return false; }
588
589#if __TBB_PREVIEW_OPENCL_NODE
590 template< typename, typename > friend class proxy_dependency_receiver;
591#endif /* __TBB_PREVIEW_OPENCL_NODE */
592}; // class receiver<T>
593
594#endif // __TBB_PREVIEW_ASYNC_MSG
595
597
598class continue_receiver : public receiver< continue_msg > {
599public:
600
603
606
609 __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
610 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
612 __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
613 }
614
619 __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
620 }
621
626 return true;
627 }
628
630
636 return true;
637 }
638
639#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
640 __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
641 __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
642 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
643
644 __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
646 my_built_predecessors.add_edge( s );
647 }
648
649 __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
651 my_built_predecessors.delete_edge(s);
652 }
653
654 __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
656 my_built_predecessors.copy_edges(v);
657 }
658
659 __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
661 return my_built_predecessors.edge_count();
662 }
663
664#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
665
666protected:
667 template< typename R, typename B > friend class run_and_put_task;
668 template<typename X, typename Y> friend class internal::broadcast_cache;
669 template<typename X, typename Y> friend class internal::round_robin_cache;
670 // execute body is supposed to be too small to create a task for.
672 {
676 else
678 }
679 task * res = execute();
680 return res? res : SUCCESSFULLY_ENQUEUED;
681 }
682
683#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
684 // continue_receiver must contain its own built_predecessors because it does
685 // not have a node_cache.
686 built_predecessors_type my_built_predecessors;
687#endif
693 // the friend declaration in the base class did not eliminate the "protected class"
694 // error in gcc 4.1.2
695 template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
696
699 if (f & rf_clear_edges) {
700#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
701 my_built_predecessors.clear();
702#endif
704 }
705 }
706
708
710 virtual task * execute() = 0;
711 template<typename TT, typename M> friend class internal::successor_cache;
712 bool is_continue_receiver() __TBB_override { return true; }
713
714}; // class continue_receiver
715
716} // interfaceX
717
718#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
719 template <typename K, typename T>
720 K key_from_message( const T &t ) {
721 return t.key();
722 }
723#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
724
725 using interface11::sender;
726 using interface11::receiver;
727 using interface11::continue_receiver;
728} // flow
729} // tbb
730
733
734namespace tbb {
735namespace flow {
736namespace interface11 {
737
741#if __TBB_PREVIEW_ASYNC_MSG
743#endif
745
746template <typename C, typename N>
747graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
748{
749 if (begin) current_node = my_graph->my_nodes;
750 //else it is an end iterator by default
751}
752
753template <typename C, typename N>
755 __TBB_ASSERT(current_node, "graph_iterator at end");
756 return *operator->();
757}
758
759template <typename C, typename N>
761 return current_node;
762}
763
764template <typename C, typename N>
766 if (current_node) current_node = current_node->next;
767}
768
769} // namespace interfaceX
770
771namespace interface10 {
773inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
775 own_context = true;
776 cancelled = false;
777 caught_exception = false;
778 my_context = new task_group_context(tbb::internal::FLOW_TASKS);
782 my_is_active = true;
783}
784
785inline graph::graph(task_group_context& use_this_context) :
786 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
788 own_context = false;
789 cancelled = false;
790 caught_exception = false;
794 my_is_active = true;
795}
796
798 wait_for_all();
800 tbb::task::destroy(*my_root_task);
801 if (own_context) delete my_context;
802 delete my_task_arena;
803}
804
805inline void graph::reserve_wait() {
806 if (my_root_task) {
809 }
810}
811
812inline void graph::release_wait() {
813 if (my_root_task) {
816 }
817}
818
820 n->next = NULL;
821 {
823 n->prev = my_nodes_last;
825 my_nodes_last = n;
826 if (!my_nodes) my_nodes = n;
827 }
828}
829
831 {
833 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
834 if (n->prev) n->prev->next = n->next;
835 if (n->next) n->next->prev = n->prev;
836 if (my_nodes_last == n) my_nodes_last = n->prev;
837 if (my_nodes == n) my_nodes = n->next;
838 }
839 n->prev = n->next = NULL;
840}
841
843 // reset context
845
847 cancelled = false;
848 caught_exception = false;
849 // reset all the nodes comprising the graph
850 for(iterator ii = begin(); ii != end(); ++ii) {
852 my_p->reset_node(f);
853 }
854 // Reattach the arena. Might be useful to run the graph in a particular task_arena
855 // while not limiting graph lifetime to a single task_arena::execute() call.
856 prepare_task_arena( /*reinit=*/true );
858 // now spawn the tasks necessary to start the graph
859 for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
861 }
862 my_reset_task_list.clear();
863}
864
865inline graph::iterator graph::begin() { return iterator(this, true); }
866
867inline graph::iterator graph::end() { return iterator(this, false); }
868
869inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
870
871inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
872
873inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
874
875inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
876
877#if TBB_PREVIEW_FLOW_GRAPH_TRACE
878inline void graph::set_name(const char *name) {
880}
881#endif
882
883} // namespace interface10
884
885namespace interface11 {
886
887inline graph_node::graph_node(graph& g) : my_graph(g) {
889}
890
892 my_graph.remove_node(this);
893}
894
896
897#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
898using internal::node_set;
899#endif
900
902template < typename Output >
903class input_node : public graph_node, public sender< Output > {
904public:
906 typedef Output output_type;
907
910
911 //Source node has no input type
913
914#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
915 typedef typename sender<output_type>::built_successors_type built_successors_type;
916 typedef typename sender<output_type>::successor_list_type successor_list_type;
917#endif
918
920 template< typename Body >
922 : graph_node(g), my_active(false),
923 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
924 my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
925 my_reserved(false), my_has_cached_item(false)
926 {
927 my_successors.set_owner(this);
928 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
929 static_cast<sender<output_type> *>(this), this->my_body );
930 }
931
932#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
933 template <typename Body, typename... Successors>
934 input_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
935 : input_node(successors.graph_reference(), body) {
936 make_edges(*this, successors);
937 }
938#endif
939
942 graph_node(src.my_graph), sender<Output>(),
943 my_active(false),
944 my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
945 my_reserved(false), my_has_cached_item(false)
946 {
947 my_successors.set_owner(this);
948 tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
949 static_cast<sender<output_type> *>(this), this->my_body );
950 }
951
953 ~input_node() { delete my_body; delete my_init_body; }
954
955#if TBB_PREVIEW_FLOW_GRAPH_TRACE
956 void set_name( const char *name ) __TBB_override {
958 }
959#endif
960
964 my_successors.register_successor(r);
965 if ( my_active )
966 spawn_put();
967 return true;
968 }
969
973 my_successors.remove_successor(r);
974 return true;
975 }
976
977#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
978
979 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
980
981 void internal_add_built_successor( successor_type &r) __TBB_override {
983 my_successors.internal_add_built_successor(r);
984 }
985
986 void internal_delete_built_successor( successor_type &r) __TBB_override {
988 my_successors.internal_delete_built_successor(r);
989 }
990
991 size_t successor_count() __TBB_override {
993 return my_successors.successor_count();
994 }
995
996 void copy_successors(successor_list_type &v) __TBB_override {
998 my_successors.copy_successors(v);
999 }
1000#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1001
1005 if ( my_reserved )
1006 return false;
1007
1008 if ( my_has_cached_item ) {
1009 v = my_cached_item;
1010 my_has_cached_item = false;
1011 return true;
1012 }
1013 // we've been asked to provide an item, but we have none. enqueue a task to
1014 // provide one.
1015 if ( my_active )
1016 spawn_put();
1017 return false;
1018 }
1019
1023 if ( my_reserved ) {
1024 return false;
1025 }
1026
1027 if ( my_has_cached_item ) {
1028 v = my_cached_item;
1029 my_reserved = true;
1030 return true;
1031 } else {
1032 return false;
1033 }
1034 }
1035
1037
1040 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1041 my_reserved = false;
1042 if(!my_successors.empty())
1043 spawn_put();
1044 return true;
1045 }
1046
1050 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1051 my_reserved = false;
1052 my_has_cached_item = false;
1053 if ( !my_successors.empty() ) {
1054 spawn_put();
1055 }
1056 return true;
1057 }
1058
1060 void activate() {
1062 my_active = true;
1063 if (!my_successors.empty())
1064 spawn_put();
1065 }
1066
1067 template<typename Body>
1070 return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1071 }
1072
1073#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1074 void extract( ) __TBB_override {
1075 my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1076 my_active = false;
1077 my_reserved = false;
1079 }
1080#endif
1081
1082protected:
1083
1086 my_active = false;
1087 my_reserved = false;
1088 my_has_cached_item = false;
1089
1090 if(f & rf_clear_edges) my_successors.clear();
1091 if(f & rf_reset_bodies) {
1093 delete my_body;
1094 my_body = tmp;
1095 }
1096 }
1097
1098private:
1107
1108 // used by apply_body_bypass, can invoke body of node.
1111 if ( my_reserved ) {
1112 return false;
1113 }
1114 if ( !my_has_cached_item ) {
1116 bool r = (*my_body)(my_cached_item);
1118 if (r) {
1119 my_has_cached_item = true;
1120 }
1121 }
1122 if ( my_has_cached_item ) {
1123 v = my_cached_item;
1124 my_reserved = true;
1125 return true;
1126 } else {
1127 return false;
1128 }
1129 }
1130
1132 return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1134 }
1135
1137 void spawn_put( ) {
1140 }
1141 }
1142
1146 output_type v;
1147 if ( !try_reserve_apply_body(v) )
1148 return NULL;
1149
1150 task *last_task = my_successors.try_put_task(v);
1151 if ( last_task )
1152 try_consume();
1153 else
1154 try_release();
1155 return last_task;
1156 }
1157}; // class input_node
1158
1159#if TBB_USE_SOURCE_NODE_AS_ALIAS
1160template < typename Output >
1161class source_node : public input_node <Output> {
1162public:
1164 template< typename Body >
1165 __TBB_NOINLINE_SYM source_node( graph &g, Body body )
1166 : input_node<Output>(g, body)
1167 {
1168 }
1169
1170#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1171 template <typename Body, typename... Successors>
1172 source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
1173 : input_node<Output>(successors, body) {
1174 }
1175#endif
1176};
1177#else // TBB_USE_SOURCE_NODE_AS_ALIAS
1179template < typename Output > class
1180__TBB_DEPRECATED_MSG("TBB Warning: tbb::flow::source_node is deprecated, use tbb::flow::input_node." )
1181source_node : public graph_node, public sender< Output > {
1182public:
1184 typedef Output output_type;
1185
1188
1189 //Source node has no input type
1191
1192#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1193 typedef typename sender<output_type>::built_successors_type built_successors_type;
1194 typedef typename sender<output_type>::successor_list_type successor_list_type;
1195#endif
1196
1198 template< typename Body >
1199 __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
1200 : graph_node(g), my_active(is_active), init_my_active(is_active),
1201 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
1202 my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
1203 my_reserved(false), my_has_cached_item(false)
1204 {
1205 my_successors.set_owner(this);
1206 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1207 static_cast<sender<output_type> *>(this), this->my_body );
1208 }
1209
1210#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1211 template <typename Body, typename... Successors>
1212 source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
1213 : source_node(successors.graph_reference(), body, is_active) {
1214 make_edges(*this, successors);
1215 }
1216#endif
1217
1220 graph_node(src.my_graph), sender<Output>(),
1221 my_active(src.init_my_active),
1222 init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
1223 my_reserved(false), my_has_cached_item(false)
1224 {
1225 my_successors.set_owner(this);
1226 tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1227 static_cast<sender<output_type> *>(this), this->my_body );
1228 }
1229
1231 ~source_node() { delete my_body; delete my_init_body; }
1232
1233#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1234 void set_name( const char *name ) __TBB_override {
1236 }
1237#endif
1238
1241 spin_mutex::scoped_lock lock(my_mutex);
1242 my_successors.register_successor(r);
1243 if ( my_active )
1244 spawn_put();
1245 return true;
1246 }
1247
1250 spin_mutex::scoped_lock lock(my_mutex);
1251 my_successors.remove_successor(r);
1252 return true;
1253 }
1254
1255#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1256
1257 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1258
1259 void internal_add_built_successor( successor_type &r) __TBB_override {
1260 spin_mutex::scoped_lock lock(my_mutex);
1261 my_successors.internal_add_built_successor(r);
1262 }
1263
1264 void internal_delete_built_successor( successor_type &r) __TBB_override {
1265 spin_mutex::scoped_lock lock(my_mutex);
1266 my_successors.internal_delete_built_successor(r);
1267 }
1268
1269 size_t successor_count() __TBB_override {
1270 spin_mutex::scoped_lock lock(my_mutex);
1271 return my_successors.successor_count();
1272 }
1273
1274 void copy_successors(successor_list_type &v) __TBB_override {
1275 spin_mutex::scoped_lock l(my_mutex);
1276 my_successors.copy_successors(v);
1277 }
1278#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1279
1282 spin_mutex::scoped_lock lock(my_mutex);
1283 if ( my_reserved )
1284 return false;
1285
1286 if ( my_has_cached_item ) {
1287 v = my_cached_item;
1288 my_has_cached_item = false;
1289 return true;
1290 }
1291 // we've been asked to provide an item, but we have none. enqueue a task to
1292 // provide one.
1293 spawn_put();
1294 return false;
1295 }
1296
1299 spin_mutex::scoped_lock lock(my_mutex);
1300 if ( my_reserved ) {
1301 return false;
1302 }
1303
1304 if ( my_has_cached_item ) {
1305 v = my_cached_item;
1306 my_reserved = true;
1307 return true;
1308 } else {
1309 return false;
1310 }
1311 }
1312
1314
1316 spin_mutex::scoped_lock lock(my_mutex);
1317 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1318 my_reserved = false;
1319 if(!my_successors.empty())
1320 spawn_put();
1321 return true;
1322 }
1323
1326 spin_mutex::scoped_lock lock(my_mutex);
1327 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1328 my_reserved = false;
1329 my_has_cached_item = false;
1330 if ( !my_successors.empty() ) {
1331 spawn_put();
1332 }
1333 return true;
1334 }
1335
1337 void activate() {
1338 spin_mutex::scoped_lock lock(my_mutex);
1339 my_active = true;
1340 if (!my_successors.empty())
1341 spawn_put();
1342 }
1343
1344 template<typename Body>
1346 internal::source_body<output_type> &body_ref = *this->my_body;
1347 return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1348 }
1349
1350#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1351 void extract( ) __TBB_override {
1352 my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1353 my_active = init_my_active;
1354 my_reserved = false;
1355 if(my_has_cached_item) my_has_cached_item = false;
1356 }
1357#endif
1358
1359protected:
1360
1363 my_active = init_my_active;
1364 my_reserved =false;
1365 if(my_has_cached_item) {
1366 my_has_cached_item = false;
1367 }
1368 if(f & rf_clear_edges) my_successors.clear();
1369 if(f & rf_reset_bodies) {
1370 internal::source_body<output_type> *tmp = my_init_body->clone();
1371 delete my_body;
1372 my_body = tmp;
1373 }
1374 if(my_active)
1375 internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1376 }
1377
1378private:
1388
1389 // used by apply_body_bypass, can invoke body of node.
1391 spin_mutex::scoped_lock lock(my_mutex);
1392 if ( my_reserved ) {
1393 return false;
1394 }
1395 if ( !my_has_cached_item ) {
1397 bool r = (*my_body)(my_cached_item);
1398 tbb::internal::fgt_end_body( my_body );
1399 if (r) {
1400 my_has_cached_item = true;
1401 }
1402 }
1403 if ( my_has_cached_item ) {
1404 v = my_cached_item;
1405 my_reserved = true;
1406 return true;
1407 } else {
1408 return false;
1409 }
1410 }
1411
1412 // when resetting, and if the source_node was created with my_active == true, then
1413 // when we reset the node we must store a task to run the node, and spawn it only
1414 // after the reset is complete and is_active() is again true. This is why we don't
1415 // test for is_active() here.
1417 return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1419 }
1420
1422 void spawn_put( ) {
1423 if(internal::is_graph_active(this->my_graph)) {
1424 internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1425 }
1426 }
1427
1428 friend class internal::source_task_bypass< source_node< output_type > >;
1431 output_type v;
1432 if ( !try_reserve_apply_body(v) )
1433 return NULL;
1434
1435 task *last_task = my_successors.try_put_task(v);
1436 if ( last_task )
1437 try_consume();
1438 else
1439 try_release();
1440 return last_task;
1441 }
1442}; // class source_node
1443#endif // TBB_USE_SOURCE_NODE_AS_ALIAS
1444
1446template<typename Input, typename Output = continue_msg, typename Policy = queueing,
1447 typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1449 : public graph_node
1450#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1451 , public internal::function_input< Input, Output, Policy, Allocator >
1452#else
1453 , public internal::function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
1454#endif
1455 , public internal::function_output<Output> {
1456
1457#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1458 typedef Allocator internals_allocator;
1459#else
1461
1464 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1465 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1466 );
1467#endif
1468
1469public:
1470 typedef Input input_type;
1471 typedef Output output_type;
1477#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1478 typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1479 typedef typename fOutput_type::successor_list_type successor_list_type;
1480#endif
1482
1484 // input_queue_type is allocated here, but destroyed in the function_input_base.
1485 // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1486 // be done in one place. This would be an interface-breaking change.
1487 template< typename Body >
1490 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1491#else
1493#endif
1495 fOutput_type(g) {
1496 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1497 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1498 }
1499
1500#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1501 template <typename Body>
1502 function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1503 : function_node(g, concurrency, body, Policy(), priority) {}
1504#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1505
1506#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1507 template <typename Body, typename... Args>
1508 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1511 make_edges_in_order(nodes, *this);
1512 }
1513
1514#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1515 template <typename Body, typename... Args>
1516 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1517 : function_node(nodes, concurrency, body, Policy(), priority) {}
1518#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1519#endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1520
1523 graph_node(src.my_graph),
1524 input_impl_type(src),
1525 fOutput_type(src.my_graph) {
1526 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1527 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1528 }
1529
1530#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1531 void set_name( const char *name ) __TBB_override {
1533 }
1534#endif
1535
1536#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1537 void extract( ) __TBB_override {
1538 my_predecessors.built_predecessors().receiver_extract(*this);
1539 successors().built_successors().sender_extract(*this);
1540 }
1541#endif
1542
1543protected:
1544 template< typename R, typename B > friend class run_and_put_task;
1545 template<typename X, typename Y> friend class internal::broadcast_cache;
1546 template<typename X, typename Y> friend class internal::round_robin_cache;
1548
1550
1553 // TODO: use clear() instead.
1554 if(f & rf_clear_edges) {
1555 successors().clear();
1557 }
1558 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1559 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1560 }
1561
1562}; // class function_node
1563
1565// Output is a tuple of output types.
1566template<typename Input, typename Output, typename Policy = queueing,
1567 typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1569 public graph_node,
1571 <
1572 Input,
1573 typename internal::wrap_tuple_elements<
1574 tbb::flow::tuple_size<Output>::value, // #elements in tuple
1575 internal::multifunction_output, // wrap this around each element
1576 Output // the tuple providing the types
1577 >::type,
1578 Policy,
1579#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1580 Allocator
1581#else
1582 cache_aligned_allocator<Input>
1583#endif
1584 > {
1585#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1586 typedef Allocator internals_allocator;
1587#else
1589
1592 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1593 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1594 );
1595#endif
1596
1597protected:
1599public:
1600 typedef Input input_type;
1606private:
1608public:
1609 template<typename Body>
1611 graph &g, size_t concurrency,
1613 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1614#else
1616#endif
1618 tbb::internal::fgt_multioutput_node_with_body<N>(
1619 CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1620 &this->my_graph, static_cast<receiver<input_type> *>(this),
1621 this->output_ports(), this->my_body
1622 );
1623 }
1624
1625#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1626 template <typename Body>
1628 : multifunction_node(g, concurrency, body, Policy(), priority) {}
1629#endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1630
1631#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1632 template <typename Body, typename... Args>
1633 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1636 make_edges_in_order(nodes, *this);
1637 }
1638
1639#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1640 template <typename Body, typename... Args>
1641 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1642 : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1643#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1644#endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1645
1647 graph_node(other.my_graph), input_impl_type(other) {
1648 tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1649 &this->my_graph, static_cast<receiver<input_type> *>(this),
1650 this->output_ports(), this->my_body );
1651 }
1652
1653#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1654 void set_name( const char *name ) __TBB_override {
1656 }
1657#endif
1658
1659#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1660 void extract( ) __TBB_override {
1661 my_predecessors.built_predecessors().receiver_extract(*this);
1662 input_impl_type::extract();
1663 }
1664#endif
1665 // all the guts are in multifunction_input...
1666protected:
1668}; // multifunction_node
1669
1671// successors. The node has unlimited concurrency, so it does not reject inputs.
1672template<typename TupleType, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(TupleType)>
1673class split_node : public graph_node, public receiver<TupleType> {
1676public:
1677 typedef TupleType input_type;
1678#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1679 typedef Allocator allocator_type;
1680#else
1683 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1684 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1685 );
1686#endif
1687#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1689 typedef typename base_type::predecessor_list_type predecessor_list_type;
1690 typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1691 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1692#endif
1693
1694 typedef typename internal::wrap_tuple_elements<
1695 N, // #elements in tuple
1696 internal::multifunction_output, // wrap this around each element
1697 TupleType // the tuple providing the types
1699
1701 : graph_node(g),
1703 {
1704 tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1705 static_cast<receiver<input_type> *>(this), this->output_ports());
1706 }
1707
1708#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1709 template <typename... Args>
1710 __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1711 make_edges_in_order(nodes, *this);
1712 }
1713#endif
1714
1716 : graph_node(other.my_graph), base_type(other),
1718 {
1719 tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1720 static_cast<receiver<input_type> *>(this), this->output_ports());
1721 }
1722
1723#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1724 void set_name( const char *name ) __TBB_override {
1726 }
1727#endif
1728
1730
1731protected:
1732 task *try_put_task(const TupleType& t) __TBB_override {
1733 // Sending split messages in parallel is not justified, as overheads would prevail.
1734 // Also, we do not have successors here. So we just tell the task returned here is successful.
1736 }
1738 if (f & rf_clear_edges)
1740
1742 }
1745 return my_graph;
1746 }
1747#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1748private:
1749 void extract() __TBB_override {}
1750
1752 void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1753
1755 void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1756
1757 size_t predecessor_count() __TBB_override { return 0; }
1758
1759 void copy_predecessors(predecessor_list_type&) __TBB_override {}
1760
1761 built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1762
1764 built_predecessors_type my_predessors;
1765#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1766
1767private:
1769};
1770
1772template <typename Output, typename Policy = internal::Policy<void> >
1773class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1774 public internal::function_output<Output> {
1775public:
1777 typedef Output output_type;
1780 typedef typename input_impl_type::predecessor_type predecessor_type;
1782
1784 template <typename Body >
1786 graph &g,
1788 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1789#else
1791#endif
1792 ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1793 fOutput_type(g) {
1794 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1795
1796 static_cast<receiver<input_type> *>(this),
1797 static_cast<sender<output_type> *>(this), this->my_body );
1798 }
1799
1800#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1801 template <typename Body>
1802 continue_node( graph& g, Body body, node_priority_t priority )
1803 : continue_node(g, body, Policy(), priority) {}
1804#endif
1805
1806#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1807 template <typename Body, typename... Args>
1808 continue_node( const node_set<Args...>& nodes, Body body,
1810 : continue_node(nodes.graph_reference(), body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority) ) {
1811 make_edges_in_order(nodes, *this);
1812 }
1813#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1814 template <typename Body, typename... Args>
1815 continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1816 : continue_node(nodes, body, Policy(), priority) {}
1817#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1818#endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1819
1821 template <typename Body >
1823 graph &g, int number_of_predecessors,
1825 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1826#else
1828#endif
1829 ) : graph_node(g)
1830 , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1831 fOutput_type(g) {
1832 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1833 static_cast<receiver<input_type> *>(this),
1834 static_cast<sender<output_type> *>(this), this->my_body );
1835 }
1836
1837#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1838 template <typename Body>
1839 continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1840 : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1841#endif
1842
1843#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1844 template <typename Body, typename... Args>
1845 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1846 Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1847 : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1848 make_edges_in_order(nodes, *this);
1849 }
1850
1851#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1852 template <typename Body, typename... Args>
1853 continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1854 Body body, node_priority_t priority )
1855 : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1856#endif
1857#endif
1858
1862 internal::function_output<Output>(src.my_graph) {
1863 tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1864 static_cast<receiver<input_type> *>(this),
1865 static_cast<sender<output_type> *>(this), this->my_body );
1866 }
1867
1868#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1869 void set_name( const char *name ) __TBB_override {
1871 }
1872#endif
1873
1874#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1875 void extract() __TBB_override {
1876 input_impl_type::my_built_predecessors.receiver_extract(*this);
1877 successors().built_successors().sender_extract(*this);
1878 }
1879#endif
1880
1881protected:
1882 template< typename R, typename B > friend class run_and_put_task;
1883 template<typename X, typename Y> friend class internal::broadcast_cache;
1884 template<typename X, typename Y> friend class internal::round_robin_cache;
1885 using input_impl_type::try_put_task;
1887
1890 if(f & rf_clear_edges)successors().clear();
1891 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1892 }
1893}; // continue_node
1894
1896template <typename T>
1897class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1898public:
1899 typedef T input_type;
1900 typedef T output_type;
1903#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1904 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1905 typedef typename sender<output_type>::successor_list_type successor_list_type;
1906#endif
1907private:
1909#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1910 internal::edge_container<predecessor_type> my_built_predecessors;
1911 spin_mutex pred_mutex; // serialize accesses on edge_container
1912#endif
1913public:
1914
1916 my_successors.set_owner( this );
1917 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1918 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1919 }
1920
1921#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1922 template <typename... Args>
1923 broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1924 make_edges_in_order(nodes, *this);
1925 }
1926#endif
1927
1928 // Copy constructor
1930 graph_node(src.my_graph), receiver<T>(), sender<T>()
1931 {
1932 my_successors.set_owner( this );
1933 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1934 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1935 }
1936
1937#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1938 void set_name( const char *name ) __TBB_override {
1940 }
1941#endif
1942
1945 my_successors.register_successor( r );
1946 return true;
1947 }
1948
1951 my_successors.remove_successor( r );
1952 return true;
1953 }
1954
1955#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1956 typedef typename sender<T>::built_successors_type built_successors_type;
1957
1958 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1959
1960 void internal_add_built_successor(successor_type &r) __TBB_override {
1961 my_successors.internal_add_built_successor(r);
1962 }
1963
1964 void internal_delete_built_successor(successor_type &r) __TBB_override {
1965 my_successors.internal_delete_built_successor(r);
1966 }
1967
1968 size_t successor_count() __TBB_override {
1969 return my_successors.successor_count();
1970 }
1971
1972 void copy_successors(successor_list_type &v) __TBB_override {
1973 my_successors.copy_successors(v);
1974 }
1975
1976 typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1977
1978 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1979
1980 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1981 spin_mutex::scoped_lock l(pred_mutex);
1982 my_built_predecessors.add_edge(p);
1983 }
1984
1985 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1986 spin_mutex::scoped_lock l(pred_mutex);
1987 my_built_predecessors.delete_edge(p);
1988 }
1989
1990 size_t predecessor_count() __TBB_override {
1991 spin_mutex::scoped_lock l(pred_mutex);
1992 return my_built_predecessors.edge_count();
1993 }
1994
1995 void copy_predecessors(predecessor_list_type &v) __TBB_override {
1996 spin_mutex::scoped_lock l(pred_mutex);
1997 my_built_predecessors.copy_edges(v);
1998 }
1999
2000 void extract() __TBB_override {
2001 my_built_predecessors.receiver_extract(*this);
2002 my_successors.built_successors().sender_extract(*this);
2003 }
2004#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2005
2006protected:
2007 template< typename R, typename B > friend class run_and_put_task;
2008 template<typename X, typename Y> friend class internal::broadcast_cache;
2009 template<typename X, typename Y> friend class internal::round_robin_cache;
2012 task *new_task = my_successors.try_put_task(t);
2013 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
2014 return new_task;
2015 }
2016
2018 return my_graph;
2019 }
2020
2022
2024 if (f&rf_clear_edges) {
2025 my_successors.clear();
2026#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2027 my_built_predecessors.clear();
2028#endif
2029 }
2030 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
2031 }
2032}; // broadcast_node
2033
2035template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2037 : public graph_node
2038#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2039 , public internal::reservable_item_buffer< T, Allocator >
2040#else
2041 , public internal::reservable_item_buffer< T, cache_aligned_allocator<T> >
2042#endif
2043 , public receiver<T>, public sender<T> {
2044#if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2045 typedef Allocator internals_allocator;
2046#else
2048#endif
2049public:
2050 typedef T input_type;
2051 typedef T output_type;
2055#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2056 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2057 typedef typename sender<output_type>::successor_list_type successor_list_type;
2058#endif
2059#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2062 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2063 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2064 );
2065#endif
2066
2067protected:
2068 typedef size_t size_type;
2070
2071#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2072 internal::edge_container<predecessor_type> my_built_predecessors;
2073#endif
2074
2076
2078#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2079 , add_blt_succ, del_blt_succ,
2080 add_blt_pred, del_blt_pred,
2081 blt_succ_cnt, blt_pred_cnt,
2082 blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
2083#endif
2085
2086 // implements the aggregator_operation concept
2087 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
2088 public:
2089 char type;
2090#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2091 task * ltask;
2092 union {
2096 size_t cnt_val;
2097 successor_list_type *svec;
2098 predecessor_list_type *pvec;
2099 };
2100#else
2104#endif
2105 buffer_operation(const T& e, op_type t) : type(char(t))
2106
2107#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2108 , ltask(NULL), elem(const_cast<T*>(&e))
2109#else
2110 , elem(const_cast<T*>(&e)) , ltask(NULL)
2111#endif
2112 {}
2113 buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
2114 };
2115
2120
2121 virtual void handle_operations(buffer_operation *op_list) {
2122 handle_operations_impl(op_list, this);
2123 }
2124
2125 template<typename derived_type>
2126 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
2127 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2128
2129 buffer_operation *tmp = NULL;
2130 bool try_forwarding = false;
2131 while (op_list) {
2132 tmp = op_list;
2133 op_list = op_list->next;
2134 switch (tmp->type) {
2135 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
2136 case rem_succ: internal_rem_succ(tmp); break;
2137 case req_item: internal_pop(tmp); break;
2138 case res_item: internal_reserve(tmp); break;
2139 case rel_res: internal_release(tmp); try_forwarding = true; break;
2140 case con_res: internal_consume(tmp); try_forwarding = true; break;
2141 case put_item: try_forwarding = internal_push(tmp); break;
2142 case try_fwd_task: internal_forward_task(tmp); break;
2143#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2144 // edge recording
2145 case add_blt_succ: internal_add_built_succ(tmp); break;
2146 case del_blt_succ: internal_del_built_succ(tmp); break;
2147 case add_blt_pred: internal_add_built_pred(tmp); break;
2148 case del_blt_pred: internal_del_built_pred(tmp); break;
2149 case blt_succ_cnt: internal_succ_cnt(tmp); break;
2150 case blt_pred_cnt: internal_pred_cnt(tmp); break;
2151 case blt_succ_cpy: internal_copy_succs(tmp); break;
2152 case blt_pred_cpy: internal_copy_preds(tmp); break;
2153#endif
2154 }
2155 }
2156
2157 derived->order();
2158
2159 if (try_forwarding && !forwarder_busy) {
2161 forwarder_busy = true;
2162 task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
2163 forward_task_bypass<class_type>(*this);
2164 // tmp should point to the last item handled by the aggregator. This is the operation
2165 // the handling thread enqueued. So modifying that record will be okay.
2166 // workaround for icc bug
2167 tbb::task *z = tmp->ltask;
2168 graph &g = this->my_graph;
2169 tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
2170 }
2171 }
2172 } // handle_operations
2173
2175 return op_data.ltask;
2176 }
2177
2179 task *ft = grab_forwarding_task(op_data);
2180 if(ft) {
2182 return true;
2183 }
2184 return false;
2185 }
2186
2188 virtual task *forward_task() {
2190 task *last_task = NULL;
2191 do {
2192 op_data.status = internal::WAIT;
2193 op_data.ltask = NULL;
2194 my_aggregator.execute(&op_data);
2195
2196 // workaround for icc bug
2197 tbb::task *xtask = op_data.ltask;
2198 graph& g = this->my_graph;
2199 last_task = combine_tasks(g, last_task, xtask);
2200 } while (op_data.status ==internal::SUCCEEDED);
2201 return last_task;
2202 }
2203
2206 my_successors.register_successor(*(op->r));
2208 }
2209
2212 my_successors.remove_successor(*(op->r));
2214 }
2215
2216#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2217 typedef typename sender<T>::built_successors_type built_successors_type;
2218
2219 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2220
2221 virtual void internal_add_built_succ(buffer_operation *op) {
2222 my_successors.internal_add_built_successor(*(op->r));
2224 }
2225
2226 virtual void internal_del_built_succ(buffer_operation *op) {
2227 my_successors.internal_delete_built_successor(*(op->r));
2229 }
2230
2231 typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
2232
2233 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
2234
2235 virtual void internal_add_built_pred(buffer_operation *op) {
2236 my_built_predecessors.add_edge(*(op->p));
2238 }
2239
2240 virtual void internal_del_built_pred(buffer_operation *op) {
2241 my_built_predecessors.delete_edge(*(op->p));
2243 }
2244
2245 virtual void internal_succ_cnt(buffer_operation *op) {
2246 op->cnt_val = my_successors.successor_count();
2248 }
2249
2250 virtual void internal_pred_cnt(buffer_operation *op) {
2251 op->cnt_val = my_built_predecessors.edge_count();
2253 }
2254
2255 virtual void internal_copy_succs(buffer_operation *op) {
2256 my_successors.copy_successors(*(op->svec));
2258 }
2259
2260 virtual void internal_copy_preds(buffer_operation *op) {
2261 my_built_predecessors.copy_edges(*(op->pvec));
2263 }
2264
2265#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2266
2267private:
2268 void order() {}
2269
2271 return this->my_item_valid(this->my_tail - 1);
2272 }
2273
2274 void try_put_and_add_task(task*& last_task) {
2275 task *new_task = my_successors.try_put_task(this->back());
2276 if (new_task) {
2277 // workaround for icc bug
2278 graph& g = this->my_graph;
2279 last_task = combine_tasks(g, last_task, new_task);
2280 this->destroy_back();
2281 }
2282 }
2283
2284protected:
2288 }
2289
2290 template<typename derived_type>
2291 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
2292 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2293
2294 if (this->my_reserved || !derived->is_item_valid()) {
2296 this->forwarder_busy = false;
2297 return;
2298 }
2299 // Try forwarding, giving each successor a chance
2300 task * last_task = NULL;
2301 size_type counter = my_successors.size();
2302 for (; counter > 0 && derived->is_item_valid(); --counter)
2303 derived->try_put_and_add_task(last_task);
2304
2305 op->ltask = last_task; // return task
2306 if (last_task && !counter) {
2308 }
2309 else {
2311 forwarder_busy = false;
2312 }
2313 }
2314
2316 this->push_back(*(op->elem));
2318 return true;
2319 }
2320
2321 virtual void internal_pop(buffer_operation *op) {
2322 if(this->pop_back(*(op->elem))) {
2324 }
2325 else {
2327 }
2328 }
2329
2331 if(this->reserve_front(*(op->elem))) {
2333 }
2334 else {
2336 }
2337 }
2338
2340 this->consume_front();
2342 }
2343
2345 this->release_front();
2347 }
2348
2349public:
2353 sender<T>(), forwarder_busy(false)
2354 {
2355 my_successors.set_owner(this);
2356 my_aggregator.initialize_handler(handler_type(this));
2357 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2358 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2359 }
2360
2361#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2362 template <typename... Args>
2363 buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2364 make_edges_in_order(nodes, *this);
2365 }
2366#endif
2367
2371 receiver<T>(), sender<T>(), forwarder_busy(false)
2372 {
2373 my_successors.set_owner(this);
2374 my_aggregator.initialize_handler(handler_type(this));
2375 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2376 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2377 }
2378
2379#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2380 void set_name( const char *name ) __TBB_override {
2382 }
2383#endif
2384
2385 //
2386 // message sender implementation
2387 //
2388
2390
2392 buffer_operation op_data(reg_succ);
2393 op_data.r = &r;
2394 my_aggregator.execute(&op_data);
2395 (void)enqueue_forwarding_task(op_data);
2396 return true;
2397 }
2398
2399#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2400 void internal_add_built_successor( successor_type &r) __TBB_override {
2401 buffer_operation op_data(add_blt_succ);
2402 op_data.r = &r;
2403 my_aggregator.execute(&op_data);
2404 }
2405
2406 void internal_delete_built_successor( successor_type &r) __TBB_override {
2407 buffer_operation op_data(del_blt_succ);
2408 op_data.r = &r;
2409 my_aggregator.execute(&op_data);
2410 }
2411
2412 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2413 buffer_operation op_data(add_blt_pred);
2414 op_data.p = &p;
2415 my_aggregator.execute(&op_data);
2416 }
2417
2418 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2419 buffer_operation op_data(del_blt_pred);
2420 op_data.p = &p;
2421 my_aggregator.execute(&op_data);
2422 }
2423
2424 size_t predecessor_count() __TBB_override {
2425 buffer_operation op_data(blt_pred_cnt);
2426 my_aggregator.execute(&op_data);
2427 return op_data.cnt_val;
2428 }
2429
2430 size_t successor_count() __TBB_override {
2431 buffer_operation op_data(blt_succ_cnt);
2432 my_aggregator.execute(&op_data);
2433 return op_data.cnt_val;
2434 }
2435
2436 void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2437 buffer_operation op_data(blt_pred_cpy);
2438 op_data.pvec = &v;
2439 my_aggregator.execute(&op_data);
2440 }
2441
2442 void copy_successors( successor_list_type &v ) __TBB_override {
2443 buffer_operation op_data(blt_succ_cpy);
2444 op_data.svec = &v;
2445 my_aggregator.execute(&op_data);
2446 }
2447
2448#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2449
2451
2454 r.remove_predecessor(*this);
2455 buffer_operation op_data(rem_succ);
2456 op_data.r = &r;
2457 my_aggregator.execute(&op_data);
2458 // even though this operation does not cause a forward, if we are the handler, and
2459 // a forward is scheduled, we may be the first to reach this point after the aggregator,
2460 // and so should check for the task.
2461 (void)enqueue_forwarding_task(op_data);
2462 return true;
2463 }
2464
2466
2469 buffer_operation op_data(req_item);
2470 op_data.elem = &v;
2471 my_aggregator.execute(&op_data);
2472 (void)enqueue_forwarding_task(op_data);
2473 return (op_data.status==internal::SUCCEEDED);
2474 }
2475
2477
2480 buffer_operation op_data(res_item);
2481 op_data.elem = &v;
2482 my_aggregator.execute(&op_data);
2483 (void)enqueue_forwarding_task(op_data);
2484 return (op_data.status==internal::SUCCEEDED);
2485 }
2486
2488
2490 buffer_operation op_data(rel_res);
2491 my_aggregator.execute(&op_data);
2492 (void)enqueue_forwarding_task(op_data);
2493 return true;
2494 }
2495
2497
2499 buffer_operation op_data(con_res);
2500 my_aggregator.execute(&op_data);
2501 (void)enqueue_forwarding_task(op_data);
2502 return true;
2503 }
2504
2505protected:
2506
2507 template< typename R, typename B > friend class run_and_put_task;
2508 template<typename X, typename Y> friend class internal::broadcast_cache;
2509 template<typename X, typename Y> friend class internal::round_robin_cache;
2512 buffer_operation op_data(t, put_item);
2513 my_aggregator.execute(&op_data);
2514 task *ft = grab_forwarding_task(op_data);
2515 // sequencer_nodes can return failure (if an item has been previously inserted)
2516 // We have to spawn the returned task if our own operation fails.
2517
2518 if(ft && op_data.status ==internal::FAILED) {
2519 // we haven't succeeded queueing the item, but for some reason the
2520 // call returned a task (if another request resulted in a successful
2521 // forward this could happen.) Queue the task and reset the pointer.
2523 }
2524 else if(!ft && op_data.status ==internal::SUCCEEDED) {
2526 }
2527 return ft;
2528 }
2529
2531 return my_graph;
2532 }
2533
2535
2536#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2537public:
2538 void extract() __TBB_override {
2539 my_built_predecessors.receiver_extract(*this);
2540 my_successors.built_successors().sender_extract(*this);
2541 }
2542#endif
2543
2544protected:
2547 // TODO: just clear structures
2548 if (f&rf_clear_edges) {
2549 my_successors.clear();
2550#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2551 my_built_predecessors.clear();
2552#endif
2553 }
2554 forwarder_busy = false;
2555 }
2556}; // buffer_node
2557
2559template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2560class queue_node : public buffer_node<T, Allocator> {
2561#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2564 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2565 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2566 );
2567#endif
2568protected:
2573
2574private:
2575 template<typename, typename> friend class buffer_node;
2576
2578 return this->my_item_valid(this->my_head);
2579 }
2580
2581 void try_put_and_add_task(task*& last_task) {
2582 task *new_task = this->my_successors.try_put_task(this->front());
2583 if (new_task) {
2584 // workaround for icc bug
2585 graph& graph_ref = this->graph_reference();
2586 last_task = combine_tasks(graph_ref, last_task, new_task);
2587 this->destroy_front();
2588 }
2589 }
2590
2591protected:
2592 void internal_forward_task(queue_operation *op) __TBB_override {
2593 this->internal_forward_task_impl(op, this);
2594 }
2595
2597 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2599 }
2600 else {
2601 this->pop_front(*(op->elem));
2603 }
2604 }
2606 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2608 }
2609 else {
2610 this->reserve_front(*(op->elem));
2612 }
2613 }
2615 this->consume_front();
2617 }
2618
2619public:
2620 typedef T input_type;
2621 typedef T output_type;
2624
2627 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2628 static_cast<receiver<input_type> *>(this),
2629 static_cast<sender<output_type> *>(this) );
2630 }
2631
2632#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2633 template <typename... Args>
2634 queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2635 make_edges_in_order(nodes, *this);
2636 }
2637#endif
2638
2641 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2642 static_cast<receiver<input_type> *>(this),
2643 static_cast<sender<output_type> *>(this) );
2644 }
2645
2646#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2647 void set_name( const char *name ) __TBB_override {
2649 }
2650#endif
2651
2652protected:
2655 }
2656}; // queue_node
2657
2659template< typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2660class sequencer_node : public queue_node<T, Allocator> {
2662 // my_sequencer should be a benign function and must be callable
2663 // from a parallel context. Does this mean it needn't be reset?
2664public:
2665#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2668 "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2669 "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2670 );
2671#endif
2672 typedef T input_type;
2673 typedef T output_type;
2676
2678 template< typename Sequencer >
2679 __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, Allocator>(g),
2680 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2681 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2682 static_cast<receiver<input_type> *>(this),
2683 static_cast<sender<output_type> *>(this) );
2684 }
2685
2686#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2687 template <typename Sequencer, typename... Args>
2688 sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2689 : sequencer_node(nodes.graph_reference(), s) {
2690 make_edges_in_order(nodes, *this);
2691 }
2692#endif
2693
2696 my_sequencer( src.my_sequencer->clone() ) {
2697 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2698 static_cast<receiver<input_type> *>(this),
2699 static_cast<sender<output_type> *>(this) );
2700 }
2701
2704
2705#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2706 void set_name( const char *name ) __TBB_override {
2708 }
2709#endif
2710
2711protected:
2714
2715private:
2717 size_type tag = (*my_sequencer)(*(op->elem));
2718#if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2719 if (tag < this->my_head) {
2720 // have already emitted a message with this tag
2722 return false;
2723 }
2724#endif
2725 // cannot modify this->my_tail now; the buffer would be inconsistent.
2726 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2727
2728 if (this->size(new_tail) > this->capacity()) {
2729 this->grow_my_array(this->size(new_tail));
2730 }
2731 this->my_tail = new_tail;
2732
2733 const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2734 __TBB_store_with_release(op->status, res);
2735 return res ==internal::SUCCEEDED;
2736 }
2737}; // sequencer_node
2738
2740template<typename T, typename Compare = std::less<T>, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T)>
2741class priority_queue_node : public buffer_node<T, Allocator> {
2742public:
2743#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2746 "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
2747 "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
2748 );
2749#endif
2750 typedef T input_type;
2751 typedef T output_type;
2756
2758 __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2759 : buffer_node<T, Allocator>(g), compare(comp), mark(0) {
2760 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2761 static_cast<receiver<input_type> *>(this),
2762 static_cast<sender<output_type> *>(this) );
2763 }
2764
2765#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2766 template <typename... Args>
2767 priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2768 : priority_queue_node(nodes.graph_reference(), comp) {
2769 make_edges_in_order(nodes, *this);
2770 }
2771#endif
2772
2775 : buffer_node<T, Allocator>(src), mark(0)
2776 {
2777 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2778 static_cast<receiver<input_type> *>(this),
2779 static_cast<sender<output_type> *>(this) );
2780 }
2781
2782#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2783 void set_name( const char *name ) __TBB_override {
2785 }
2786#endif
2787
2788protected:
2789
2791 mark = 0;
2793 }
2794
2798
2801 this->internal_forward_task_impl(op, this);
2802 }
2803
2805 this->handle_operations_impl(op_list, this);
2806 }
2807
2809 prio_push(*(op->elem));
2811 return true;
2812 }
2813
2815 // if empty or already reserved, don't pop
2816 if ( this->my_reserved == true || this->my_tail == 0 ) {
2818 return;
2819 }
2820
2821 *(op->elem) = prio();
2823 prio_pop();
2824
2825 }
2826
2827 // pops the highest-priority item, saves copy
2829 if (this->my_reserved == true || this->my_tail == 0) {
2831 return;
2832 }
2833 this->my_reserved = true;
2834 *(op->elem) = prio();
2835 reserved_item = *(op->elem);
2837 prio_pop();
2838 }
2839
2842 this->my_reserved = false;
2844 }
2845
2846 void internal_release(prio_operation *op) __TBB_override {
2849 this->my_reserved = false;
2851 }
2852
2853private:
2854 template<typename, typename> friend class buffer_node;
2855
2856 void order() {
2857 if (mark < this->my_tail) heapify();
2858 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2859 }
2860
2862 return this->my_tail > 0;
2863 }
2864
2865 void try_put_and_add_task(task*& last_task) {
2866 task * new_task = this->my_successors.try_put_task(this->prio());
2867 if (new_task) {
2868 // workaround for icc bug
2869 graph& graph_ref = this->graph_reference();
2870 last_task = combine_tasks(graph_ref, last_task, new_task);
2871 prio_pop();
2872 }
2873 }
2874
2875private:
2876 Compare compare;
2878
2880
2881 // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2883 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2884 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2885 }
2886
2887 // prio_push: checks that the item will fit, expand array if necessary, put at end
2888 void prio_push(const T &src) {
2889 if ( this->my_tail >= this->my_array_size )
2890 this->grow_my_array( this->my_tail + 1 );
2891 (void) this->place_item(this->my_tail, src);
2892 ++(this->my_tail);
2893 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2894 }
2895
2896 // prio_pop: deletes highest priority item from the array, and if it is item
2897 // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2898 // and mark. Assumes the array has already been tested for emptiness; no failure.
2899 void prio_pop() {
2900 if (prio_use_tail()) {
2901 // there are newly pushed elements; last one higher than top
2902 // copy the data
2903 this->destroy_item(this->my_tail-1);
2904 --(this->my_tail);
2905 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2906 return;
2907 }
2908 this->destroy_item(0);
2909 if(this->my_tail > 1) {
2910 // push the last element down heap
2911 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2912 this->move_item(0,this->my_tail - 1);
2913 }
2914 --(this->my_tail);
2915 if(mark > this->my_tail) --mark;
2916 if (this->my_tail > 1) // don't reheap for heap of size 1
2917 reheap();
2918 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2919 }
2920
2921 const T& prio() {
2922 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2923 }
2924
2925 // turn array into heap
2926 void heapify() {
2927 if(this->my_tail == 0) {
2928 mark = 0;
2929 return;
2930 }
2931 if (!mark) mark = 1;
2932 for (; mark<this->my_tail; ++mark) { // for each unheaped element
2933 size_type cur_pos = mark;
2934 input_type to_place;
2935 this->fetch_item(mark,to_place);
2936 do { // push to_place up the heap
2937 size_type parent = (cur_pos-1)>>1;
2938 if (!compare(this->get_my_item(parent), to_place))
2939 break;
2940 this->move_item(cur_pos, parent);
2941 cur_pos = parent;
2942 } while( cur_pos );
2943 (void) this->place_item(cur_pos, to_place);
2944 }
2945 }
2946
2947 // otherwise heapified array with new root element; rearrange to heap
2948 void reheap() {
2949 size_type cur_pos=0, child=1;
2950 while (child < mark) {
2951 size_type target = child;
2952 if (child+1<mark &&
2953 compare(this->get_my_item(child),
2954 this->get_my_item(child+1)))
2955 ++target;
2956 // target now has the higher priority child
2957 if (compare(this->get_my_item(target),
2958 this->get_my_item(cur_pos)))
2959 break;
2960 // swap
2961 this->swap_items(cur_pos, target);
2962 cur_pos = target;
2963 child = (cur_pos<<1)+1;
2964 }
2965 }
2966}; // priority_queue_node
2967
2968} // interfaceX
2969
2970namespace interface11 {
2971
2973
2976template< typename T, typename DecrementType=continue_msg >
2977class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2978public:
2979 typedef T input_type;
2980 typedef T output_type;
2983#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2984 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2985 typedef typename sender<output_type>::built_successors_type built_successors_type;
2986 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2987 typedef typename sender<output_type>::successor_list_type successor_list_type;
2988#endif
2989 //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2990
2991private:
2993 size_t my_count; //number of successful puts
2994 size_t my_tries; //number of active put attempts
2998 __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
2999
3001
3002 // Let decrementer call decrement_counter()
3003 friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
3004
3005 bool check_conditions() { // always called under lock
3006 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
3007 }
3008
3009 // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
3011 input_type v;
3012 task *rval = NULL;
3013 bool reserved = false;
3014 {
3016 if ( check_conditions() )
3017 ++my_tries;
3018 else
3019 return NULL;
3020 }
3021
3022 //SUCCESS
3023 // if we can reserve and can put, we consume the reservation
3024 // we increment the count and decrement the tries
3025 if ( (my_predecessors.try_reserve(v)) == true ){
3026 reserved=true;
3027 if ( (rval = my_successors.try_put_task(v)) != NULL ){
3028 {
3030 ++my_count;
3031 --my_tries;
3032 my_predecessors.try_consume();
3033 if ( check_conditions() ) {
3034 if ( internal::is_graph_active(this->my_graph) ) {
3035 task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3038 }
3039 }
3040 }
3041 return rval;
3042 }
3043 }
3044 //FAILURE
3045 //if we can't reserve, we decrement the tries
3046 //if we can reserve but can't put, we decrement the tries and release the reservation
3047 {
3049 --my_tries;
3050 if (reserved) my_predecessors.try_release();
3051 if ( check_conditions() ) {
3052 if ( internal::is_graph_active(this->my_graph) ) {
3053 task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3055 __TBB_ASSERT(!rval, "Have two tasks to handle");
3056 return rtask;
3057 }
3058 }
3059 return rval;
3060 }
3061 }
3062
3063 void forward() {
3064 __TBB_ASSERT(false, "Should never be called");
3065 return;
3066 }
3067
3068 task* decrement_counter( long long delta ) {
3069 {
3071 if( delta > 0 && size_t(delta) > my_count )
3072 my_count = 0;
3073 else if( delta < 0 && size_t(delta) > my_threshold - my_count )
3075 else
3076 my_count -= size_t(delta); // absolute value of delta is sufficiently small
3077 }
3078 return forward_task();
3079 }
3080
3081 void initialize() {
3082 my_predecessors.set_owner(this);
3083 my_successors.set_owner(this);
3084 decrement.set_owner(this);
3086 CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
3087 static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
3088 static_cast<sender<output_type> *>(this)
3089 );
3090 }
3091public:
3094
3095#if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
3097 "Deprecated interface of the limiter node can be used only in conjunction "
3098 "with continue_msg as the type of DecrementType template parameter." );
3099#endif // Check for incompatible interface
3100
3103 __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
3104 : graph_node(g), my_threshold(threshold), my_count(0),
3106 my_tries(0), decrement(),
3107 init_decrement_predecessors(num_decrement_predecessors),
3108 decrement(num_decrement_predecessors)) {
3109 initialize();
3110 }
3111
3112#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3113 template <typename... Args>
3114 limiter_node(const node_set<Args...>& nodes, size_t threshold)
3115 : limiter_node(nodes.graph_reference(), threshold) {
3116 make_edges_in_order(nodes, *this);
3117 }
3118#endif
3119
3122 graph_node(src.my_graph), receiver<T>(), sender<T>(),
3125 my_tries(0), decrement(),
3126 init_decrement_predecessors(src.init_decrement_predecessors),
3127 decrement(src.init_decrement_predecessors)) {
3128 initialize();
3129 }
3130
3131#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3132 void set_name( const char *name ) __TBB_override {
3134 }
3135#endif
3136
3140 bool was_empty = my_successors.empty();
3141 my_successors.register_successor(r);
3142 //spawn a forward task if this is the only successor
3143 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
3144 if ( internal::is_graph_active(this->my_graph) ) {
3145 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3148 }
3149 }
3150 return true;
3151 }
3152
3154
3156 r.remove_predecessor(*this);
3157 my_successors.remove_successor(r);
3158 return true;
3159 }
3160
3161#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3162 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3163 built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
3164
3165 void internal_add_built_successor(successor_type &src) __TBB_override {
3166 my_successors.internal_add_built_successor(src);
3167 }
3168
3169 void internal_delete_built_successor(successor_type &src) __TBB_override {
3170 my_successors.internal_delete_built_successor(src);
3171 }
3172
3173 size_t successor_count() __TBB_override { return my_successors.successor_count(); }
3174
3175 void copy_successors(successor_list_type &v) __TBB_override {
3176 my_successors.copy_successors(v);
3177 }
3178
3179 void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
3180 my_predecessors.internal_add_built_predecessor(src);
3181 }
3182
3183 void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
3184 my_predecessors.internal_delete_built_predecessor(src);
3185 }
3186
3187 size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
3188
3189 void copy_predecessors(predecessor_list_type &v) __TBB_override {
3190 my_predecessors.copy_predecessors(v);
3191 }
3192
3193 void extract() __TBB_override {
3194 my_count = 0;
3195 my_successors.built_successors().sender_extract(*this);
3196 my_predecessors.built_predecessors().receiver_extract(*this);
3197 decrement.built_predecessors().receiver_extract(decrement);
3198 }
3199#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3200
3204 my_predecessors.add( src );
3205 if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
3206 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3209 }
3210 return true;
3211 }
3212
3215 my_predecessors.remove( src );
3216 return true;
3217 }
3218
3219protected:
3220
3221 template< typename R, typename B > friend class run_and_put_task;
3222 template<typename X, typename Y> friend class internal::broadcast_cache;
3223 template<typename X, typename Y> friend class internal::round_robin_cache;
3226 {
3228 if ( my_count + my_tries >= my_threshold )
3229 return NULL;
3230 else
3231 ++my_tries;
3232 }
3233
3234 task * rtask = my_successors.try_put_task(t);
3235
3236 if ( !rtask ) { // try_put_task failed.
3238 --my_tries;
3240 rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3242 }
3243 }
3244 else {
3246 ++my_count;
3247 --my_tries;
3248 }
3249 return rtask;
3250 }
3251
3253
3255 __TBB_ASSERT(false,NULL); // should never be called
3256 }
3257
3259 my_count = 0;
3260 if(f & rf_clear_edges) {
3261 my_predecessors.clear();
3262 my_successors.clear();
3263 }
3264 else
3265 {
3266 my_predecessors.reset( );
3267 }
3268 decrement.reset_receiver(f);
3269 }
3270}; // limiter_node
3271
3273
3279
3280template<typename OutputTuple, typename JP=queueing> class join_node;
3281
3282template<typename OutputTuple>
3283class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
3284private:
3287public:
3288 typedef OutputTuple output_type;
3291 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3292 this->input_ports(), static_cast< sender< output_type > *>(this) );
3293 }
3294
3295#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3296 template <typename... Args>
3297 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
3298 make_edges_in_order(nodes, *this);
3299 }
3300#endif
3301
3303 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3304 this->input_ports(), static_cast< sender< output_type > *>(this) );
3305 }
3306
3307#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3308 void set_name( const char *name ) __TBB_override {
3310 }
3311#endif
3312
3313};
3314
3315template<typename OutputTuple>
3316class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
3317private:
3320public:
3321 typedef OutputTuple output_type;
3324 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3325 this->input_ports(), static_cast< sender< output_type > *>(this) );
3326 }
3327
3328#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3329 template <typename... Args>
3330 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
3331 make_edges_in_order(nodes, *this);
3332 }
3333#endif
3334
3336 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3337 this->input_ports(), static_cast< sender< output_type > *>(this) );
3338 }
3339
3340#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3341 void set_name( const char *name ) __TBB_override {
3343 }
3344#endif
3345
3346};
3347
3348// template for key_matching join_node
3349// tag_matching join_node is a specialization of key_matching, and is source-compatible.
3350template<typename OutputTuple, typename K, typename KHash>
3351class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
3352 key_matching_port, OutputTuple, key_matching<K,KHash> > {
3353private:
3356public:
3357 typedef OutputTuple output_type;
3359
3360#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
3362
3363#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3364 template <typename... Args>
3365 join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
3366 : join_node(nodes.graph_reference()) {
3367 make_edges_in_order(nodes, *this);
3368 }
3369#endif
3370
3371#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
3372
3373 template<typename __TBB_B0, typename __TBB_B1>
3374 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
3375 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3376 this->input_ports(), static_cast< sender< output_type > *>(this) );
3377 }
3378 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3379 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3380 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3381 this->input_ports(), static_cast< sender< output_type > *>(this) );
3382 }
3383 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3384 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3385 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3386 this->input_ports(), static_cast< sender< output_type > *>(this) );
3387 }
3388 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3389 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3390 unfolded_type(g, b0, b1, b2, b3, b4) {
3391 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3392 this->input_ports(), static_cast< sender< output_type > *>(this) );
3393 }
3394#if __TBB_VARIADIC_MAX >= 6
3395 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3396 typename __TBB_B5>
3397 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3398 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3399 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3400 this->input_ports(), static_cast< sender< output_type > *>(this) );
3401 }
3402#endif
3403#if __TBB_VARIADIC_MAX >= 7
3404 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3405 typename __TBB_B5, typename __TBB_B6>
3406 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3407 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3408 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3409 this->input_ports(), static_cast< sender< output_type > *>(this) );
3410 }
3411#endif
3412#if __TBB_VARIADIC_MAX >= 8
3413 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3414 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3415 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3416 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3417 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3418 this->input_ports(), static_cast< sender< output_type > *>(this) );
3419 }
3420#endif
3421#if __TBB_VARIADIC_MAX >= 9
3422 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3423 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3424 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3425 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3426 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3427 this->input_ports(), static_cast< sender< output_type > *>(this) );
3428 }
3429#endif
3430#if __TBB_VARIADIC_MAX >= 10
3431 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3432 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3433 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3434 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3435 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3436 this->input_ports(), static_cast< sender< output_type > *>(this) );
3437 }
3438#endif
3439
3440#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3441 template <typename... Args, typename... Bodies>
3442 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3443 : join_node(nodes.graph_reference(), bodies...) {
3444 make_edges_in_order(nodes, *this);
3445 }
3446#endif
3447
3449 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3450 this->input_ports(), static_cast< sender< output_type > *>(this) );
3451 }
3452
3453#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3454 void set_name( const char *name ) __TBB_override {
3456 }
3457#endif
3458
3459};
3460
3461// indexer node
3463
3464// TODO: Implement interface with variadic template or tuple
3465template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3466 typename T4=null_type, typename T5=null_type, typename T6=null_type,
3467 typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3468
3469//indexer node specializations
3470template<typename T0>
3471class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3472private:
3473 static const int N = 1;
3474public:
3475 typedef tuple<T0> InputTuple;
3479 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3480 this->input_ports(), static_cast< sender< output_type > *>(this) );
3481 }
3482
3483#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3484 template <typename... Args>
3485 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3486 make_edges_in_order(nodes, *this);
3487 }
3488#endif
3489
3490 // Copy constructor
3492 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3493 this->input_ports(), static_cast< sender< output_type > *>(this) );
3494 }
3495
3496#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3497 void set_name( const char *name ) __TBB_override {
3499 }
3500#endif
3501};
3502
3503template<typename T0, typename T1>
3504class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3505private:
3506 static const int N = 2;
3507public:
3508 typedef tuple<T0, T1> InputTuple;
3512 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3513 this->input_ports(), static_cast< sender< output_type > *>(this) );
3514 }
3515
3516#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3517 template <typename... Args>
3518 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3519 make_edges_in_order(nodes, *this);
3520 }
3521#endif
3522
3523 // Copy constructor
3525 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3526 this->input_ports(), static_cast< sender< output_type > *>(this) );
3527 }
3528
3529#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3530 void set_name( const char *name ) __TBB_override {
3532 }
3533#endif
3534};
3535
3536template<typename T0, typename T1, typename T2>
3537class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3538private:
3539 static const int N = 3;
3540public:
3541 typedef tuple<T0, T1, T2> InputTuple;
3545 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3546 this->input_ports(), static_cast< sender< output_type > *>(this) );
3547 }
3548
3549#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3550 template <typename... Args>
3551 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3552 make_edges_in_order(nodes, *this);
3553 }
3554#endif
3555
3556 // Copy constructor
3558 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3559 this->input_ports(), static_cast< sender< output_type > *>(this) );
3560 }
3561
3562#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3563 void set_name( const char *name ) __TBB_override {
3565 }
3566#endif
3567};
3568
3569template<typename T0, typename T1, typename T2, typename T3>
3570class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3571private:
3572 static const int N = 4;
3573public:
3574 typedef tuple<T0, T1, T2, T3> InputTuple;
3578 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3579 this->input_ports(), static_cast< sender< output_type > *>(this) );
3580 }
3581
3582#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3583 template <typename... Args>
3584 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3585 make_edges_in_order(nodes, *this);
3586 }
3587#endif
3588
3589 // Copy constructor
3591 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3592 this->input_ports(), static_cast< sender< output_type > *>(this) );
3593 }
3594
3595#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3596 void set_name( const char *name ) __TBB_override {
3598 }
3599#endif
3600};
3601
3602template<typename T0, typename T1, typename T2, typename T3, typename T4>
3603class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3604private:
3605 static const int N = 5;
3606public:
3607 typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3611 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3612 this->input_ports(), static_cast< sender< output_type > *>(this) );
3613 }
3614
3615#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3616 template <typename... Args>
3617 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3618 make_edges_in_order(nodes, *this);
3619 }
3620#endif
3621
3622 // Copy constructor
3624 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3625 this->input_ports(), static_cast< sender< output_type > *>(this) );
3626 }
3627
3628#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3629 void set_name( const char *name ) __TBB_override {
3631 }
3632#endif
3633};
3634
3635#if __TBB_VARIADIC_MAX >= 6
3636template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3637class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3638private:
3639 static const int N = 6;
3640public:
3641 typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3645 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3646 this->input_ports(), static_cast< sender< output_type > *>(this) );
3647 }
3648
3649#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3650 template <typename... Args>
3651 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3652 make_edges_in_order(nodes, *this);
3653 }
3654#endif
3655
3656 // Copy constructor
3658 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3659 this->input_ports(), static_cast< sender< output_type > *>(this) );
3660 }
3661
3662#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3663 void set_name( const char *name ) __TBB_override {
3665 }
3666#endif
3667};
3668#endif //variadic max 6
3669
3670#if __TBB_VARIADIC_MAX >= 7
3671template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3672 typename T6>
3673class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3674private:
3675 static const int N = 7;
3676public:
3677 typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3681 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3682 this->input_ports(), static_cast< sender< output_type > *>(this) );
3683 }
3684
3685#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3686 template <typename... Args>
3687 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3688 make_edges_in_order(nodes, *this);
3689 }
3690#endif
3691
3692 // Copy constructor
3694 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3695 this->input_ports(), static_cast< sender< output_type > *>(this) );
3696 }
3697
3698#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3699 void set_name( const char *name ) __TBB_override {
3701 }
3702#endif
3703};
3704#endif //variadic max 7
3705
3706#if __TBB_VARIADIC_MAX >= 8
3707template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3708 typename T6, typename T7>
3709class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3710private:
3711 static const int N = 8;
3712public:
3713 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3717 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3718 this->input_ports(), static_cast< sender< output_type > *>(this) );
3719 }
3720
3721#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3722 template <typename... Args>
3723 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3724 make_edges_in_order(nodes, *this);
3725 }
3726#endif
3727
3728 // Copy constructor
3729 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3730 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3731 this->input_ports(), static_cast< sender< output_type > *>(this) );
3732 }
3733
3734#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3735 void set_name( const char *name ) __TBB_override {
3737 }
3738#endif
3739};
3740#endif //variadic max 8
3741
3742#if __TBB_VARIADIC_MAX >= 9
3743template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3744 typename T6, typename T7, typename T8>
3745class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3746private:
3747 static const int N = 9;
3748public:
3749 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3753 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3754 this->input_ports(), static_cast< sender< output_type > *>(this) );
3755 }
3756
3757#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3758 template <typename... Args>
3759 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3760 make_edges_in_order(nodes, *this);
3761 }
3762#endif
3763
3764 // Copy constructor
3766 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3767 this->input_ports(), static_cast< sender< output_type > *>(this) );
3768 }
3769
3770#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3771 void set_name( const char *name ) __TBB_override {
3773 }
3774#endif
3775};
3776#endif //variadic max 9
3777
3778#if __TBB_VARIADIC_MAX >= 10
3779template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3780 typename T6, typename T7, typename T8, typename T9>
3781class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3782private:
3783 static const int N = 10;
3784public:
3785 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3789 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3790 this->input_ports(), static_cast< sender< output_type > *>(this) );
3791 }
3792
3793#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3794 template <typename... Args>
3795 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3796 make_edges_in_order(nodes, *this);
3797 }
3798#endif
3799
3800 // Copy constructor
3802 tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3803 this->input_ports(), static_cast< sender< output_type > *>(this) );
3804 }
3805
3806#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3807 void set_name( const char *name ) __TBB_override {
3809 }
3810#endif
3811};
3812#endif //variadic max 10
3813
3814#if __TBB_PREVIEW_ASYNC_MSG
3816#else
3817template< typename T >
3818inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3819#endif
3820#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3821 s.internal_add_built_predecessor(p);
3822 p.internal_add_built_successor(s);
3823#endif
3824 p.register_successor( s );
3826}
3827
3829template< typename T >
3830inline void make_edge( sender<T> &p, receiver<T> &s ) {
3832}
3833
3834#if __TBB_PREVIEW_ASYNC_MSG
3835template< typename TS, typename TR,
3838inline void make_edge( TS &p, TR &s ) {
3840}
3841
3842template< typename T >
3845}
3846
3847template< typename T >
3850}
3851
3852#endif // __TBB_PREVIEW_ASYNC_MSG
3853
3854#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3855//Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3856template< typename T, typename V,
3857 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3858inline void make_edge( T& output, V& input) {
3859 make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3860}
3861
3862//Makes an edge from port 0 of a multi-output predecessor to a receiver.
3863template< typename T, typename R,
3864 typename = typename T::output_ports_type >
3865inline void make_edge( T& output, receiver<R>& input) {
3866 make_edge(get<0>(output.output_ports()), input);
3867}
3868
3869//Makes an edge from a sender to port 0 of a multi-input successor.
3870template< typename S, typename V,
3871 typename = typename V::input_ports_type >
3872inline void make_edge( sender<S>& output, V& input) {
3873 make_edge(output, get<0>(input.input_ports()));
3874}
3875#endif
3876
3877#if __TBB_PREVIEW_ASYNC_MSG
3879#else
3880template< typename T >
3881inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3882#endif
3883 p.remove_successor( s );
3884#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3885 // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3886 p.internal_delete_built_successor(s);
3887 s.internal_delete_built_predecessor(p);
3888#endif
3890}
3891
3893template< typename T >
3896}
3897
3898#if __TBB_PREVIEW_ASYNC_MSG
3899template< typename TS, typename TR,
3902inline void remove_edge( TS &p, TR &s ) {
3904}
3905
3906template< typename T >
3909}
3910
3911template< typename T >
3914}
3915#endif // __TBB_PREVIEW_ASYNC_MSG
3916
3917#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3918//Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3919template< typename T, typename V,
3920 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3921inline void remove_edge( T& output, V& input) {
3922 remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3923}
3924
3925//Removes an edge between port 0 of a multi-output predecessor and a receiver.
3926template< typename T, typename R,
3927 typename = typename T::output_ports_type >
3928inline void remove_edge( T& output, receiver<R>& input) {
3929 remove_edge(get<0>(output.output_ports()), input);
3930}
3931//Removes an edge between a sender and port 0 of a multi-input successor.
3932template< typename S, typename V,
3933 typename = typename V::input_ports_type >
3934inline void remove_edge( sender<S>& output, V& input) {
3935 remove_edge(output, get<0>(input.input_ports()));
3936}
3937#endif
3938
3939#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3940template<typename C >
3941template< typename S >
3942void internal::edge_container<C>::sender_extract( S &s ) {
3943 edge_list_type e = built_edges;
3944 for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3945 remove_edge(s, **i);
3946 }
3947}
3948
3949template<typename C >
3950template< typename R >
3951void internal::edge_container<C>::receiver_extract( R &r ) {
3952 edge_list_type e = built_edges;
3953 for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3954 remove_edge(**i, r);
3955 }
3956}
3957#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3958
3960template< typename Body, typename Node >
3961Body copy_body( Node &n ) {
3962 return n.template copy_function_object<Body>();
3963}
3964
3965#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3966
3967//composite_node
3968template< typename InputTuple, typename OutputTuple > class composite_node;
3969
3970template< typename... InputTypes, typename... OutputTypes>
3971class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3972
3973public:
3974 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3975 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3976
3977private:
3978 std::unique_ptr<input_ports_type> my_input_ports;
3979 std::unique_ptr<output_ports_type> my_output_ports;
3980
3981 static const size_t NUM_INPUTS = sizeof...(InputTypes);
3982 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3983
3984protected:
3986
3987public:
3988#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3989 composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3990 tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3992 }
3993#else
3995 tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3996 }
3997#endif
3998
3999 template<typename T1, typename T2>
4000 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
4001 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4002 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4003 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
4004 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
4005
4008 }
4009
4010 template< typename... NodeTypes >
4011 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4012
4013 template< typename... NodeTypes >
4014 void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4015
4016#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4017 void set_name( const char *name ) __TBB_override {
4019 }
4020#endif
4021
4023 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4024 return *my_input_ports;
4025 }
4026
4028 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4029 return *my_output_ports;
4030 }
4031
4032#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4033 void extract() __TBB_override {
4034 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4035 }
4036#endif
4037}; // class composite_node
4038
4039//composite_node with only input ports
4040template< typename... InputTypes>
4041class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
4042public:
4043 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
4044
4045private:
4046 std::unique_ptr<input_ports_type> my_input_ports;
4047 static const size_t NUM_INPUTS = sizeof...(InputTypes);
4048
4049protected:
4051
4052public:
4053#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4054 composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4057 }
4058#else
4061 }
4062#endif
4063
4064 template<typename T>
4065 void set_external_ports(T&& input_ports_tuple) {
4066 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4067
4068 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
4069
4070 tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
4071 }
4072
4073 template< typename... NodeTypes >
4074 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4075
4076 template< typename... NodeTypes >
4077 void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4078
4079#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4080 void set_name( const char *name ) __TBB_override {
4082 }
4083#endif
4084
4086 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4087 return *my_input_ports;
4088 }
4089
4090#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4091 void extract() __TBB_override {
4092 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4093 }
4094#endif
4095
4096}; // class composite_node
4097
4098//composite_nodes with only output_ports
4099template<typename... OutputTypes>
4100class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
4101public:
4102 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
4103
4104private:
4105 std::unique_ptr<output_ports_type> my_output_ports;
4106 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
4107
4108protected:
4110
4111public:
4112#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4113 __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4116 }
4117#else
4120 }
4121#endif
4122
4123 template<typename T>
4124 void set_external_ports(T&& output_ports_tuple) {
4125 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4126
4127 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
4128
4129 tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
4130 }
4131
4132 template<typename... NodeTypes >
4133 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4134
4135 template<typename... NodeTypes >
4136 void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4137
4138#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4139 void set_name( const char *name ) __TBB_override {
4141 }
4142#endif
4143
4145 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4146 return *my_output_ports;
4147 }
4148
4149#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4150 void extract() __TBB_override {
4151 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4152 }
4153#endif
4154
4155}; // class composite_node
4156
4157#endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
4158
4159namespace internal {
4160
4161template<typename Gateway>
4163public:
4164 typedef Gateway gateway_type;
4165
4167 void set_gateway(gateway_type *gateway) {
4168 my_gateway = gateway;
4169 }
4170
4171protected:
4173};
4174
4175template<typename Input, typename Ports, typename Gateway, typename Body>
4176class async_body: public async_body_base<Gateway> {
4177public:
4179 typedef Gateway gateway_type;
4180
4181 async_body(const Body &body, gateway_type *gateway)
4182 : base_type(gateway), my_body(body) { }
4183
4184 void operator()( const Input &v, Ports & ) {
4185 my_body(v, *this->my_gateway);
4186 }
4187
4188 Body get_body() { return my_body; }
4189
4190private:
4192};
4193
4194} // namespace internal
4195
4196} // namespace interfaceX
4197namespace interface11 {
4198
4200template < typename Input, typename Output,
4201 typename Policy = queueing_lightweight,
4202 typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input) >
4204 : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output >
4205{
4206#if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
4209 "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
4210 "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
4211 );
4212#endif
4215
4216public:
4217 typedef Input input_type;
4218 typedef Output output_type;
4225
4226private:
4230 // TODO: pass value by copy since we do not want to block asynchronous thread.
4231 const Output *value;
4233 try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
4234 void operator()() {
4235 result = port->try_put(*value);
4236 }
4237 };
4238
4240 public:
4241 receiver_gateway_impl(async_node* node): my_node(node) {}
4243 tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4244 my_node->my_graph.reserve_wait();
4245 }
4246
4248 my_node->my_graph.release_wait();
4249 tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4250 }
4251
4253 bool try_put(const Output &i) __TBB_override {
4254 return my_node->try_put_impl(i);
4255 }
4256
4257 private:
4259 } my_gateway;
4260
4261 //The substitute of 'this' for member construction, to prevent compiler warnings
4262 async_node* self() { return this; }
4263
4265 bool try_put_impl(const Output &i) {
4266 internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
4267 internal::broadcast_cache<output_type>& port_successors = port_0.successors();
4269 task_list tasks;
4270 bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
4271 __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
4272 "Return status is inconsistent with the method operation." );
4273
4274 while( !tasks.empty() ) {
4275 internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
4276 }
4278 return is_at_least_one_put_successful;
4279 }
4280
4281public:
4282 template<typename Body>
4284 graph &g, size_t concurrency,
4287#else
4289#endif
4290 ) : base_type(
4291 g, concurrency,
4292 internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
4293 (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
4294 tbb::internal::fgt_multioutput_node_with_body<1>(
4295 CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4296 &this->my_graph, static_cast<receiver<input_type> *>(this),
4297 this->output_ports(), this->my_body
4298 );
4299 }
4300
4301#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
4302 template <typename Body, typename... Args>
4303 __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
4304 : async_node(g, concurrency, body, Policy(), priority) {}
4305#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4306
4307#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4308 template <typename Body, typename... Args>
4310 const node_set<Args...>& nodes, size_t concurrency, Body body,
4312 ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
4313 make_edges_in_order(nodes, *this);
4314 }
4315
4316#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4317 template <typename Body, typename... Args>
4318 __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
4319 : async_node(nodes, concurrency, body, Policy(), priority) {}
4320#endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4321#endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4322
4323 __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
4324 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
4325 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
4326
4327 tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4328 &this->my_graph, static_cast<receiver<input_type> *>(this),
4329 this->output_ports(), this->my_body );
4330 }
4331
4333 return my_gateway;
4334 }
4335
4336#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4337 void set_name( const char *name ) __TBB_override {
4339 }
4340#endif
4341
4342 // Define sender< Output >
4343
4346 return internal::output_port<0>(*this).register_successor(r);
4347 }
4348
4351 return internal::output_port<0>(*this).remove_successor(r);
4352 }
4353
4354 template<typename Body>
4358 mfn_body_type &body_ref = *this->my_body;
4359 async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
4360 return ab.get_body();
4361 }
4362
4363#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4365 typedef typename internal::edge_container<successor_type> built_successors_type;
4366 typedef typename built_successors_type::edge_list_type successor_list_type;
4367 built_successors_type &built_successors() __TBB_override {
4368 return internal::output_port<0>(*this).built_successors();
4369 }
4370
4371 void internal_add_built_successor( successor_type &r ) __TBB_override {
4372 internal::output_port<0>(*this).internal_add_built_successor(r);
4373 }
4374
4375 void internal_delete_built_successor( successor_type &r ) __TBB_override {
4376 internal::output_port<0>(*this).internal_delete_built_successor(r);
4377 }
4378
4379 void copy_successors( successor_list_type &l ) __TBB_override {
4380 internal::output_port<0>(*this).copy_successors(l);
4381 }
4382
4383 size_t successor_count() __TBB_override {
4384 return internal::output_port<0>(*this).successor_count();
4385 }
4386#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4387
4388protected:
4389
4391 base_type::reset_node(f);
4392 }
4393};
4394
4395#if __TBB_PREVIEW_STREAMING_NODE
4397#endif // __TBB_PREVIEW_STREAMING_NODE
4398
4400
4401template< typename T >
4402class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4403public:
4404 typedef T input_type;
4405 typedef T output_type;
4408#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4409 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4410 typedef typename sender<output_type>::built_successors_type built_successors_type;
4411 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4412 typedef typename sender<output_type>::successor_list_type successor_list_type;
4413#endif
4414
4415 __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4416 my_successors.set_owner( this );
4417 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4418 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4419 }
4420
4421#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4422 template <typename... Args>
4423 overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4424 make_edges_in_order(nodes, *this);
4425 }
4426#endif
4427
4430 graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4431 {
4432 my_successors.set_owner( this );
4433 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4434 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4435 }
4436
4438
4439#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4440 void set_name( const char *name ) __TBB_override {
4442 }
4443#endif
4444
4446 spin_mutex::scoped_lock l( my_mutex );
4447 if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4448 // We have a valid value that must be forwarded immediately.
4449 bool ret = s.try_put( my_buffer );
4450 if ( ret ) {
4451 // We add the successor that accepted our put
4452 my_successors.register_successor( s );
4453 } else {
4454 // In case of reservation a race between the moment of reservation and register_successor can appear,
4455 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4456 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4457 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4458 task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4459 register_predecessor_task( *this, s );
4460 internal::spawn_in_graph_arena( my_graph, *rtask );
4461 }
4462 } else {
4463 // No valid value yet, just add as successor
4464 my_successors.register_successor( s );
4465 }
4466 return true;
4467 }
4468
4470 spin_mutex::scoped_lock l( my_mutex );
4471 my_successors.remove_successor(s);
4472 return true;
4473 }
4474
4475#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4476 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4477 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
4478
4479 void internal_add_built_successor( successor_type &s) __TBB_override {
4480 spin_mutex::scoped_lock l( my_mutex );
4481 my_successors.internal_add_built_successor(s);
4482 }
4483
4484 void internal_delete_built_successor( successor_type &s) __TBB_override {
4485 spin_mutex::scoped_lock l( my_mutex );
4486 my_successors.internal_delete_built_successor(s);
4487 }
4488
4489 size_t successor_count() __TBB_override {
4490 spin_mutex::scoped_lock l( my_mutex );
4491 return my_successors.successor_count();
4492 }
4493
4494 void copy_successors(successor_list_type &v) __TBB_override {
4495 spin_mutex::scoped_lock l( my_mutex );
4496 my_successors.copy_successors(v);
4497 }
4498
4499 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4500 spin_mutex::scoped_lock l( my_mutex );
4501 my_built_predecessors.add_edge(p);
4502 }
4503
4504 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4505 spin_mutex::scoped_lock l( my_mutex );
4506 my_built_predecessors.delete_edge(p);
4507 }
4508
4509 size_t predecessor_count() __TBB_override {
4510 spin_mutex::scoped_lock l( my_mutex );
4511 return my_built_predecessors.edge_count();
4512 }
4513
4514 void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4515 spin_mutex::scoped_lock l( my_mutex );
4516 my_built_predecessors.copy_edges(v);
4517 }
4518
4519 void extract() __TBB_override {
4520 my_buffer_is_valid = false;
4521 built_successors().sender_extract(*this);
4522 built_predecessors().receiver_extract(*this);
4523 }
4524
4525#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4526
4528 spin_mutex::scoped_lock l( my_mutex );
4529 if ( my_buffer_is_valid ) {
4530 v = my_buffer;
4531 return true;
4532 }
4533 return false;
4534 }
4535
4538 return try_get(v);
4539 }
4540
4542 bool try_release() __TBB_override { return true; }
4543
4545 bool try_consume() __TBB_override { return true; }
4546
4547 bool is_valid() {
4548 spin_mutex::scoped_lock l( my_mutex );
4549 return my_buffer_is_valid;
4550 }
4551
4552 void clear() {
4553 spin_mutex::scoped_lock l( my_mutex );
4554 my_buffer_is_valid = false;
4555 }
4556
4557protected:
4558
4559 template< typename R, typename B > friend class run_and_put_task;
4560 template<typename X, typename Y> friend class internal::broadcast_cache;
4561 template<typename X, typename Y> friend class internal::round_robin_cache;
4564 return try_put_task_impl(v);
4565 }
4566
4568 my_buffer = v;
4569 my_buffer_is_valid = true;
4570 task * rtask = my_successors.try_put_task(v);
4571 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4572 return rtask;
4573 }
4574
4576 return my_graph;
4577 }
4578
4581
4583 o(owner), s(succ) {};
4584
4586 if (!s.register_predecessor(o)) {
4587 o.register_successor(s);
4588 }
4589 return NULL;
4590 }
4591
4594 };
4595
4598#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4599 internal::edge_container<predecessor_type> my_built_predecessors;
4600#endif
4604
4606 my_buffer_is_valid = false;
4607 if (f&rf_clear_edges) {
4608 my_successors.clear();
4609 }
4610 }
4611}; // overwrite_node
4612
4613template< typename T >
4615public:
4616 typedef T input_type;
4617 typedef T output_type;
4621
4624 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4625 static_cast<receiver<input_type> *>(this),
4626 static_cast<sender<output_type> *>(this) );
4627 }
4628
4629#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4630 template <typename... Args>
4631 write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4632 make_edges_in_order(nodes, *this);
4633 }
4634#endif
4635
4638 tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4639 static_cast<receiver<input_type> *>(this),
4640 static_cast<sender<output_type> *>(this) );
4641 }
4642
4643#if TBB_PREVIEW_FLOW_GRAPH_TRACE
4644 void set_name( const char *name ) __TBB_override {
4646 }
4647#endif
4648
4649protected:
4650 template< typename R, typename B > friend class run_and_put_task;
4651 template<typename X, typename Y> friend class internal::broadcast_cache;
4652 template<typename X, typename Y> friend class internal::round_robin_cache;
4654 spin_mutex::scoped_lock l( this->my_mutex );
4655 return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4656 }
4657};
4658
4659} // interfaceX
4660
4665
4666 using interface11::graph;
4667 using interface11::graph_node;
4668 using interface11::continue_msg;
4669 using interface11::source_node;
4670 using interface11::input_node;
4671 using interface11::function_node;
4672 using interface11::multifunction_node;
4673 using interface11::split_node;
4675 using interface11::indexer_node;
4676 using interface11::internal::tagged_msg;
4679 using interface11::continue_node;
4680 using interface11::overwrite_node;
4681 using interface11::write_once_node;
4682 using interface11::broadcast_node;
4683 using interface11::buffer_node;
4684 using interface11::queue_node;
4685 using interface11::sequencer_node;
4686 using interface11::priority_queue_node;
4687 using interface11::limiter_node;
4688 using namespace interface11::internal::graph_policy_namespace;
4689 using interface11::join_node;
4695#if __TBB_FLOW_GRAPH_CPP11_FEATURES
4696 using interface11::composite_node;
4697#endif
4698 using interface11::async_node;
4699#if __TBB_PREVIEW_ASYNC_MSG
4701#endif
4702#if __TBB_PREVIEW_STREAMING_NODE
4705#endif // __TBB_PREVIEW_STREAMING_NODE
4706#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4709#endif
4710
4711#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4712 using interface11::internal::follows;
4713 using interface11::internal::precedes;
4714 using interface11::internal::make_node_set;
4715 using interface11::internal::make_edges;
4716#endif
4717
4718} // flow
4719} // tbb
4720
4721// Include deduction guides for node classes
4723
4724#undef __TBB_PFG_RESET_ARG
4725#undef __TBB_COMMA
4726#undef __TBB_DEFAULT_NODE_ALLOCATOR
4727
4729#undef __TBB_flow_graph_H_include_area
4730
4731#if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4732 #undef __TBB_NOINLINE_SYM
4733#endif
4734
4735#endif // __TBB_flow_graph_H
__TBB_DEPRECATED internal::port_ref_impl< N1, N2 > port_ref()
class __TBB_DEPRECATED streaming_node
#define CODEPTR()
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
#define __TBB_DEPRECATED_LIMITER_ARG2(arg1, arg2)
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
#define __TBB_DEPRECATED_LIMITER_EXPR(expr)
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
#define __TBB_CPP11_PRESENT
Definition: tbb_config.h:149
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
#define __TBB_NOINLINE_SYM
Definition: flow_graph.h:46
#define __TBB_DEFAULT_NODE_ALLOCATOR(T)
Definition: flow_graph.h:70
void const char const char int ITT_FORMAT __itt_group_sync s
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync p
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
STL namespace.
The graph class.
class __TBB_DEPRECATED_MSG("tbb::tbb_hash is deprecated, use std::hash") tbb_hash
static void fgt_async_try_put_end(void *, void *)
static void fgt_async_reserve(void *, void *)
static void fgt_multioutput_node_desc(const NodeType *, const char *)
static void fgt_async_try_put_begin(void *, void *)
static void fgt_begin_body(void *)
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
static void fgt_node(void *, string_index, void *, void *)
static void fgt_reserve_wait(void *)
static void fgt_graph(void *)
static void fgt_node_with_body(void *, string_index, void *, void *, void *)
static void fgt_release_wait(void *)
static void fgt_async_commit(void *, void *)
static void fgt_composite(void *, void *, void *)
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
static void fgt_multiinput_multioutput_node(void *, string_index, void *, void *)
static void fgt_remove_edge(void *, void *)
static void fgt_end_body(void *)
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
static void fgt_node_desc(const NodeType *, const char *)
static void fgt_make_edge(void *, void *)
static void fgt_graph_desc(void *, const char *)
K key_from_message(const T &t)
Definition: flow_graph.h:720
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial.
Definition: flow_graph.h:105
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3878
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3815
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3830
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:198
class __TBB_DEPRECATED async_msg
Definition: flow_graph.h:215
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3894
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3961
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void deactivate_graph(tbb::flow::interface10::graph &g)
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
bool is_graph_active(tbb::flow::interface10::graph &g)
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
void activate_graph(tbb::flow::interface10::graph &g)
void add_nodes_impl(CompositeType *, bool)
tbb::internal::uint64_t tag_value
tbb::flow::tuple_element< N, typenameJNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
const V & cast_to(T const &t)
tbb::flow::tuple_element< N, typenameMOP::output_ports_type >::type & output_port(MOP &op)
bool is_a(T const &t)
interface11::internal::Policy< queueing, lightweight > queueing_lightweight
static const node_priority_t no_priority
unsigned int node_priority_t
static tbb::task *const SUCCESSFULLY_ENQUEUED
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
An empty class used for messages that mean "I'm done".
Definition: flow_graph.h:113
Forward declaration section.
Definition: flow_graph.h:424
__TBB_DEPRECATED typedef T output_type
The output type of this sender.
Definition: flow_graph.h:427
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:432
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:438
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:435
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:429
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:448
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:461
__TBB_DEPRECATED typedef T input_type
The input type of this receiver.
Definition: flow_graph.h:466
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:468
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:480
bool try_put(const typename internal::async_helpers< T >::async_type &t)
Definition: flow_graph.h:475
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:471
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:2977
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3155
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:3138
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:3225
limiter_node(graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
Constructor.
Definition: flow_graph.h:3102
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:3202
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:2997
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:3121
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:3254
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:3214
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:3252
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:3093
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:2995
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2981
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3258
task * decrement_counter(long long delta)
Definition: flow_graph.h:3068
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2982
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:228
static const T & from_void_ptr(const void *p)
Definition: flow_graph.h:236
static task * try_put_task_wrapper_impl(receiver< T > *const this_recv, const void *p, bool is_async)
Definition: flow_graph.h:244
virtual bool try_get_wrapper(void *p, bool is_async)=0
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:328
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:349
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:310
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:325
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:343
virtual bool try_reserve_wrapper(void *p, bool is_async)=0
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:388
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:376
virtual void reset_receiver(reset_flags f=rf_reset_protocol)=0
put receiver back in initial state
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:391
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:369
virtual task * try_put_task_wrapper(const void *p, bool is_async)=0
Base class for receivers of completion messages.
Definition: flow_graph.h:598
task * try_put_task(const input_type &) __TBB_override
Definition: flow_graph.h:671
__TBB_DEPRECATED typedef continue_msg input_type
The input type.
Definition: flow_graph.h:602
__TBB_DEPRECATED bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:623
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:697
__TBB_DEPRECATED continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:608
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:712
__TBB_DEPRECATED continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:616
__TBB_DEPRECATED typedef receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:605
__TBB_DEPRECATED bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:633
virtual task * execute()=0
Does whatever should happen when the threshold is reached.
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:903
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1038
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1101
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:971
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1137
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1109
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:962
__TBB_NOINLINE_SYM input_node(const input_node &src)
Copy constructor.
Definition: flow_graph.h:941
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1021
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1060
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1103
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1085
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:909
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:906
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1048
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:1003
__TBB_NOINLINE_SYM input_node(graph &g, Body body)
Constructor for a node with a successor.
Definition: flow_graph.h:921
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1102
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1145
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:1181
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1325
__TBB_NOINLINE_SYM source_node(const source_node &src)
Copy constructor.
Definition: flow_graph.h:1219
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:1249
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1390
__TBB_NOINLINE_SYM source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
Definition: flow_graph.h:1199
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1337
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:1240
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:1184
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1362
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1430
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1383
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1382
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:1187
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1384
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1315
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:1281
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1298
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1455
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1549
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1476
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1474
internal::function_input_queue< input_type, internals_allocator > input_queue_type
Definition: flow_graph.h:1473
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1475
cache_aligned_allocator< Input > internals_allocator
Definition: flow_graph.h:1460
__TBB_NOINLINE_SYM function_node(const function_node &src)
Copy constructor.
Definition: flow_graph.h:1522
internal::function_input< input_type, output_type, Policy, internals_allocator > input_impl_type
Definition: flow_graph.h:1472
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
__TBB_NOINLINE_SYM function_node(graph &g, size_t concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority=tbb::flow::internal::no_priority))
Constructor.
Definition: flow_graph.h:1488
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1551
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1584
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1667
__TBB_NOINLINE_SYM multifunction_node(const multifunction_node &other)
Definition: flow_graph.h:1646
__TBB_NOINLINE_SYM multifunction_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:1610
internal::multifunction_input< input_type, output_ports_type, Policy, internals_allocator > input_impl_type
Definition: flow_graph.h:1604
internal::function_input_queue< input_type, internals_allocator > input_queue_type
Definition: flow_graph.h:1605
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1602
cache_aligned_allocator< Input > internals_allocator
Definition: flow_graph.h:1588
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1673
output_ports_type & output_ports()
Definition: flow_graph.h:1729
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1732
__TBB_NOINLINE_SYM split_node(graph &g)
Definition: flow_graph.h:1700
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1698
receiver< TupleType > base_type
Definition: flow_graph.h:1675
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:1744
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1743
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1737
__TBB_NOINLINE_SYM split_node(const split_node &other)
Definition: flow_graph.h:1715
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1774
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1886
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1778
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1779
__TBB_NOINLINE_SYM continue_node(const continue_node &src)
Copy constructor.
Definition: flow_graph.h:1860
__TBB_NOINLINE_SYM continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1785
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1888
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1780
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1781
__TBB_NOINLINE_SYM continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1822
Forwards messages of type T to all successors.
Definition: flow_graph.h:1897
__TBB_NOINLINE_SYM broadcast_node(graph &g)
Definition: flow_graph.h:1915
internal::broadcast_cache< input_type > my_successors
Definition: flow_graph.h:1908
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2021
__TBB_NOINLINE_SYM broadcast_node(const broadcast_node &src)
Definition: flow_graph.h:1929
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2017
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1902
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2023
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1901
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:2011
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
Definition: flow_graph.h:1944
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1950
Forwards messages in arbitrary order.
Definition: flow_graph.h:2043
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:2498
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:2479
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2053
virtual bool internal_push(buffer_operation *op)
Definition: flow_graph.h:2315
__TBB_NOINLINE_SYM buffer_node(graph &g)
Constructor.
Definition: flow_graph.h:2351
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2274
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2052
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
Definition: flow_graph.h:2126
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:2489
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
Definition: flow_graph.h:2211
internal::round_robin_cache< T, null_rw_mutex > my_successors
Definition: flow_graph.h:2069
__TBB_NOINLINE_SYM buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:2369
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:2468
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:2339
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:2321
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:2330
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2530
internal::aggregator< handler_type, buffer_operation > my_aggregator
Definition: flow_graph.h:2119
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:2117
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2534
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
buffer_node< T, Allocator > class_type
Definition: flow_graph.h:2054
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2545
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:2391
bool enqueue_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:2178
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:2511
cache_aligned_allocator< T > internals_allocator
Definition: flow_graph.h:2047
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
Definition: flow_graph.h:2291
virtual void handle_operations(buffer_operation *op_list)
Definition: flow_graph.h:2121
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
Definition: flow_graph.h:2286
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:2174
virtual void internal_reg_succ(buffer_operation *op)
Register successor.
Definition: flow_graph.h:2205
bool remove_successor(successor_type &r) __TBB_override
Removes a successor.
Definition: flow_graph.h:2453
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:2344
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
Definition: flow_graph.h:2188
Forwards messages in FIFO order.
Definition: flow_graph.h:2560
__TBB_NOINLINE_SYM queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2640
__TBB_NOINLINE_SYM queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2626
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2581
base_type::size_type size_type
Definition: flow_graph.h:2570
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2596
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2614
buffer_node< T, Allocator > base_type
Definition: flow_graph.h:2569
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2622
void internal_forward_task(queue_operation *op) __TBB_override
Definition: flow_graph.h:2592
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2653
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2605
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2571
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2623
Forwards messages in sequence order.
Definition: flow_graph.h:2660
buffer_node< T, Allocator >::size_type size_type
Definition: flow_graph.h:2712
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2674
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2716
__TBB_NOINLINE_SYM sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2679
buffer_node< T, Allocator >::buffer_operation sequencer_operation
Definition: flow_graph.h:2713
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2675
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2661
__TBB_NOINLINE_SYM sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2695
Forwards messages in priority order.
Definition: flow_graph.h:2741
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2808
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2865
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2800
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2814
buffer_node< T, Allocator >::item_type item_type
Definition: flow_graph.h:2796
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. " "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR.")
buffer_node< T, Allocator >::buffer_operation prio_operation
Definition: flow_graph.h:2797
buffer_node< T, Allocator >::size_type size_type
Definition: flow_graph.h:2795
buffer_node< T, Allocator > base_type
Definition: flow_graph.h:2752
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2754
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2804
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2755
__TBB_NOINLINE_SYM priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2774
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2828
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2840
__TBB_NOINLINE_SYM priority_queue_node(graph &g, const Compare &comp=Compare())
Constructor.
Definition: flow_graph.h:2758
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2846
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2790
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:3302
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:3286
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:3335
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:3319
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:3355
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:3379
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1)
Definition: flow_graph.h:3374
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:3406
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:3424
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:3433
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:3389
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:3397
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:3384
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:3415
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3788
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3785
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3787
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3786
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3801
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3491
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:3476
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3478
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3477
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3510
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:3509
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3511
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3524
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:3542
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3543
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3557
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3576
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3590
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:3575
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3623
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:3608
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3609
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3657
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3643
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:3642
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3693
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:3678
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3679
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3715
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3714
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3765
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3751
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3750
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:4181
async_body_base< Gateway > base_type
Definition: flow_graph.h:4178
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:4184
Implements async node.
Definition: flow_graph.h:4205
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:4220
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:4265
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:4213
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. " "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR.")
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:4214
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4350
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:4224
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:4223
__TBB_NOINLINE_SYM async_node(const async_node &other)
Definition: flow_graph.h:4323
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4221
receiver< input_type > receiver_type
Definition: flow_graph.h:4219
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4345
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4390
__TBB_NOINLINE_SYM async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:4283
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:4222
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:4228
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:4233
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:4242
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:4247
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:4253
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:4527
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:4597
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4562
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4406
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4445
__TBB_NOINLINE_SYM overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
Definition: flow_graph.h:4429
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:4542
__TBB_NOINLINE_SYM overwrite_node(graph &g)
Definition: flow_graph.h:4415
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:4603
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:4575
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:4567
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4407
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:4537
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:4545
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4605
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4469
Breaks an infinite loop between the node reservation and register_successor call.
Definition: flow_graph.h:4580
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:4582
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:4585
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4620
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4653
__TBB_NOINLINE_SYM write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:4637
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4619
__TBB_NOINLINE_SYM write_once_node(graph &g)
Constructor.
Definition: flow_graph.h:4623
uintptr_t status
Zero value means "wait" status, all other values are "user" specified values and are defined into the...
field of type K being used for matching.
virtual source_body * clone()=0
The leaf for source_body.
function_body that takes an Input and a set of output ports
leaf for multifunction. OutputSet can be a std::tuple or a vector.
A task that calls a node's forward_task function.
A task that calls a node's apply_body_bypass function with no input.
An abstract cache of successors.
A cache of successors that are broadcast to.
A cache of successors that are put in a round-robin fashion.
Base class for tasks generated by graph nodes.
void register_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:819
tbb::flow::interface11::graph_iterator< const graph, const tbb::flow::interface11::graph_node > const_iterator
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:873
tbb::task_group_context * my_context
tbb::flow::interface11::graph_node * my_nodes
void reset(tbb::flow::interface11::reset_flags f=tbb::flow::interface11::rf_reset_protocol)
Definition: flow_graph.h:842
iterator end()
end iterator
Definition: flow_graph.h:867
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:773
const_iterator cend() const
end const iterator
Definition: flow_graph.h:875
void remove_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:830
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:805
tbb::flow::interface11::graph_iterator< graph, tbb::flow::interface11::graph_node > iterator
tbb::task * root_task()
Returns the root task of the graph.
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
void prepare_task_arena(bool reinit=false)
tbb::flow::interface11::graph_node * my_nodes_last
iterator begin()
start iterator
Definition: flow_graph.h:865
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:812
~graph()
Destroys the graph.
Definition: flow_graph.h:797
The base of all graph nodes.
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
void grow_my_array(size_t minimum_size)
Grows the internal array.
item_buffer with reservable front-end. NOTE: if reserving, do not
The two-phase join port.
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
Implements methods for a function node that takes a type Input as input and sends.
void reset_function_input(reset_flags f)
Implements methods for a function node that takes a type Input as input.
static task * emit_this(graph &g, const T &t, P &p)
Implements methods for an executable node that takes continue_msg as input.
void reset_receiver(reset_flags f) __TBB_override
Implements methods for both executable and function nodes that puts Output to its successors.
broadcast_cache_type & successors()
broadcast_cache_type my_successors
sender< output_type >::successor_type successor_type
bool try_put(const output_type &i)
Enables one or the other code branches.
Detects whether two given types are the same.
A lock that occupies a single byte.
Definition: spin_mutex.h:39
friend class scoped_lock
Definition: spin_mutex.h:179
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
Used to form groups of tasks.
Definition: task.h:358
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
Base class for user-defined tasks.
Definition: task.h:615
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
Definition: task.h:788
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:663
void set_ref_count(int count)
Set reference count.
Definition: task.h:761
void increment_ref_count()
Atomically increment reference count.
Definition: task.h:771
task that does nothing. Useful for synchronization.
Definition: task.h:1042
A list of children.
Definition: task.h:1074
task & pop_front()
Pop the front task from the list.
Definition: task.h:1109
bool empty() const
True if list is empty; false otherwise.
Definition: task.h:1088
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
Class for determining type of std::allocator<T>::value_type.
Definition: tbb_stddef.h:471

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.