/* Copyright (c) 2020-2021 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #if __INTEL_COMPILER && _MSC_VER #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated #endif #include "conformance_flowgraph.h" //! \file conformance_join_node.cpp //! \brief Test for [flow_graph.join_node] specification using input_msg = conformance::message; using my_input_tuple = std::tuple; std::vector get_values( conformance::test_push_receiver& rr ) { std::vector messages; int val = 0; for(my_input_tuple tmp(0, 0.f, input_msg(0)); rr.try_get(tmp); ++val) { messages.push_back(tmp); } return messages; } #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT void test_deduction_guides() { using namespace tbb::flow; graph g; using tuple_type = std::tuple; broadcast_node b1(g), b2(g), b3(g); broadcast_node b4(g); join_node j0(g); #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET join_node j1(follows(b1, b2, b3)); static_assert(std::is_same_v>); join_node j2(follows(b1, b2, b3), reserving()); static_assert(std::is_same_v>); join_node j3(precedes(b4)); static_assert(std::is_same_v>); join_node j4(precedes(b4), reserving()); static_assert(std::is_same_v>); #endif join_node j5(j0); static_assert(std::is_same_v>); } #endif //! The node that is constructed has a reference to the same graph object as src. //! The list of predecessors, messages in the input ports, and successors are not copied. //! \brief \ref interface TEST_CASE("join_node copy constructor"){ oneapi::tbb::flow::graph g; oneapi::tbb::flow::continue_node node0( g, [](oneapi::tbb::flow::continue_msg) { return 1; } ); oneapi::tbb::flow::join_node> node1(g); conformance::test_push_receiver> node2(g); conformance::test_push_receiver> node3(g); oneapi::tbb::flow::make_edge(node0, oneapi::tbb::flow::input_port<0>(node1)); oneapi::tbb::flow::make_edge(node1, node2); oneapi::tbb::flow::join_node> node_copy(node1); oneapi::tbb::flow::make_edge(node_copy, node3); oneapi::tbb::flow::input_port<0>(node_copy).try_put(1); g.wait_for_all(); auto values = conformance::get_values(node3); CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && values.size() == 1), "Copied node doesn`t copy successor"); node0.try_put(oneapi::tbb::flow::continue_msg()); g.wait_for_all(); CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor"); oneapi::tbb::flow::remove_edge(node1, node2); oneapi::tbb::flow::input_port<0>(node1).try_put(1); g.wait_for_all(); oneapi::tbb::flow::join_node> node_copy2(node1); oneapi::tbb::flow::make_edge(node_copy2, node3); oneapi::tbb::flow::input_port<0>(node_copy2).try_put(2); g.wait_for_all(); CHECK_MESSAGE((std::get<0>(conformance::get_values(node3)[0]) == 2), "Copied node doesn`t copy messages in the input ports"); } //! Test inheritance relations //! \brief \ref interface TEST_CASE("join_node inheritance"){ CHECK_MESSAGE((std::is_base_of>::value), "join_node should be derived from graph_node"); CHECK_MESSAGE((std::is_base_of, oneapi::tbb::flow::join_node>::value), "join_node should be derived from sender"); } //! Test join_node behavior and broadcast property //! \brief \ref requirement TEST_CASE("join_node queueing policy and broadcast property") { oneapi::tbb::flow::graph g; oneapi::tbb::flow::function_node f1( g, oneapi::tbb::flow::unlimited, [](const int &i) { return i; } ); oneapi::tbb::flow::function_node f2( g, oneapi::tbb::flow::unlimited, [](const float &f) { return f; } ); oneapi::tbb::flow::continue_node c1( g, [](oneapi::tbb::flow::continue_msg) { return input_msg(1); } ); oneapi::tbb::flow::join_node testing_node(g); conformance::test_push_receiver q_node(g); std::atomic number{1}; oneapi::tbb::flow::function_node f3( g, oneapi::tbb::flow::unlimited, [&]( const my_input_tuple &t ) { CHECK_MESSAGE((std::get<0>(t) == number), "Messages must be in first-in first-out order" ); CHECK_MESSAGE((std::get<1>(t) == static_cast(number) + 0.5f), "Messages must be in first-in first-out order" ); CHECK_MESSAGE((std::get<2>(t) == 1), "Messages must be in first-in first-out order" ); ++number; return t; } ); oneapi::tbb::flow::make_edge(f1, oneapi::tbb::flow::input_port<0>(testing_node)); oneapi::tbb::flow::make_edge(f2, oneapi::tbb::flow::input_port<1>(testing_node)); oneapi::tbb::flow::make_edge(c1, oneapi::tbb::flow::input_port<2>(testing_node)); make_edge(testing_node, f3); make_edge(f3, q_node); f1.try_put(1); g.wait_for_all(); CHECK_MESSAGE((get_values(q_node).size() == 0), "join_node must broadcast when there is at least one message at each input port"); f1.try_put(2); f2.try_put(1.5f); g.wait_for_all(); CHECK_MESSAGE((get_values(q_node).size() == 0), "join_node must broadcast when there is at least one message at each input port"); f1.try_put(3); f2.try_put(2.5f); c1.try_put(oneapi::tbb::flow::continue_msg()); g.wait_for_all(); CHECK_MESSAGE((get_values(q_node).size() == 1), "join_node must broadcast when there is at least one message at each input port"); f2.try_put(3.5f); c1.try_put(oneapi::tbb::flow::continue_msg()); g.wait_for_all(); CHECK_MESSAGE((get_values(q_node).size() == 1), "If at least one successor accepts the tuple, the head of each input port’s queue is removed"); c1.try_put(oneapi::tbb::flow::continue_msg()); g.wait_for_all(); CHECK_MESSAGE((get_values(q_node).size() == 1), "If at least one successor accepts the tuple, the head of each input port’s queue is removed"); c1.try_put(oneapi::tbb::flow::continue_msg()); g.wait_for_all(); CHECK_MESSAGE((get_values(q_node).size() == 0), "join_node must broadcast when there is at least one message at each input port"); oneapi::tbb::flow::remove_edge(testing_node, f3); f1.try_put(1); f2.try_put(1); c1.try_put(oneapi::tbb::flow::continue_msg()); g.wait_for_all(); my_input_tuple tmp(0, 0.f, input_msg(0)); CHECK_MESSAGE((testing_node.try_get(tmp)), "If no one successor accepts the tuple the messages\ must remain in their respective input port queues"); CHECK_MESSAGE((tmp == my_input_tuple(1, 1.f, input_msg(1))), "If no one successor accepts the tuple\ the messages must remain in their respective input port queues"); } //! Test join_node behavior //! \brief \ref requirement TEST_CASE("join_node reserving policy") { conformance::test_with_reserving_join_node_class>(); } template struct MyHash{ std::size_t hash(const KeyType &k) const { return k * 2000 + 3; } bool equal(const KeyType &k1, const KeyType &k2) const{ return hash(k1) == hash(k2); } }; //! Test join_node behavior //! \brief \ref requirement TEST_CASE("join_node key_matching policy"){ oneapi::tbb::flow::graph g; auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> int { return 1; }; auto body2 = [](const float &val) -> int { return static_cast(val); }; oneapi::tbb::flow::join_node, oneapi::tbb::flow::key_matching>> testing_node(g, body1, body2); oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg()); oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f); g.wait_for_all(); std::tuple tmp; CHECK_MESSAGE((testing_node.try_get(tmp)), "Mapped keys should match.\ If no successor accepts the tuple, it is must been saved and will be forwarded on a subsequent try_get"); CHECK_MESSAGE((!testing_node.try_get(tmp)), "Message should not exist after item is consumed"); } //! Test join_node behavior //! \brief \ref requirement TEST_CASE("join_node tag_matching policy"){ oneapi::tbb::flow::graph g; auto body1 = [](const oneapi::tbb::flow::continue_msg &) -> oneapi::tbb::flow::tag_value { return 1; }; auto body2 = [](const float &val) -> oneapi::tbb::flow::tag_value { return static_cast(val); }; oneapi::tbb::flow::join_node, oneapi::tbb::flow::tag_matching> testing_node(g, body1, body2); oneapi::tbb::flow::input_port<0>(testing_node).try_put(oneapi::tbb::flow::continue_msg()); oneapi::tbb::flow::input_port<1>(testing_node).try_put(1.3f); g.wait_for_all(); std::tuple tmp; CHECK_MESSAGE((testing_node.try_get(tmp) == true), "Mapped keys should match"); } #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT //! Test deduction guides //! \brief \ref requirement TEST_CASE("Deduction guides test"){ test_deduction_guides(); } #endif //! Test join_node input_ports() returns a tuple of input ports. //! \brief \ref interface \ref requirement TEST_CASE("join_node output_ports") { oneapi::tbb::flow::graph g; oneapi::tbb::flow::join_node> node(g); CHECK_MESSAGE((std::is_same>::input_ports_type&, decltype(node.input_ports())>::value), "join_node input_ports should returns a tuple of input ports"); }