// Copyright (C) 2005-2006 The Trustees of Indiana University. // Use, modification and distribution is subject to the Boost Software // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) // Authors: Douglas Gregor // Andrew Lumsdaine // // Implements redistribution of vertices for a distributed adjacency // list. This file should not be included by users. It will be // included by the distributed adjacency list header. // #ifndef BOOST_GRAPH_USE_MPI #error "Parallel BGL files should not be included unless has been included" #endif #include namespace boost { namespace detail { namespace parallel { /* This structure contains a (vertex or edge) descriptor that is being moved from one processor to another. It contains the properties for that descriptor (if any). */ template struct redistributed_descriptor : maybe_store_property { typedef maybe_store_property inherited; redistributed_descriptor() { } redistributed_descriptor(const Descriptor& v, const DescriptorProperty& p) : inherited(p), descriptor(v) { } Descriptor descriptor; private: friend class boost::serialization::access; template void serialize(Archiver& ar, unsigned int /*version*/) { ar & boost::serialization::base_object(*this) & unsafe_serialize(descriptor); } }; /* Predicate that returns true if the target has migrated. */ template struct target_migrated_t { typedef typename graph_traits::vertex_descriptor Vertex; typedef typename graph_traits::edge_descriptor Edge; target_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g) : vertex_to_processor(vertex_to_processor), g(g) { } bool operator()(Edge e) const { typedef global_descriptor DVertex; processor_id_type owner = get(edge_target_processor_id, g, e); return get(vertex_to_processor, DVertex(owner, target(e, g))) != owner; } private: VertexProcessorMap vertex_to_processor; const Graph& g; }; template inline target_migrated_t target_migrated(VertexProcessorMap vertex_to_processor, const Graph& g) { return target_migrated_t(vertex_to_processor, g); } /* Predicate that returns true if the source of an in-edge has migrated. */ template struct source_migrated_t { typedef typename graph_traits::vertex_descriptor Vertex; typedef typename graph_traits::edge_descriptor Edge; source_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g) : vertex_to_processor(vertex_to_processor), g(g) { } bool operator()(stored_in_edge e) const { return get(vertex_to_processor, DVertex(e.source_processor, source(e.e, g))) != e.source_processor; } private: VertexProcessorMap vertex_to_processor; const Graph& g; }; template inline source_migrated_t source_migrated(VertexProcessorMap vertex_to_processor, const Graph& g) { return source_migrated_t(vertex_to_processor, g); } /* Predicate that returns true if the target has migrated. */ template struct source_or_target_migrated_t { typedef typename graph_traits::edge_descriptor Edge; source_or_target_migrated_t(VertexProcessorMap vertex_to_processor, const Graph& g) : vertex_to_processor(vertex_to_processor), g(g) { } bool operator()(Edge e) const { return get(vertex_to_processor, source(e, g)) != source(e, g).owner || get(vertex_to_processor, target(e, g)) != target(e, g).owner; } private: VertexProcessorMap vertex_to_processor; const Graph& g; }; template inline source_or_target_migrated_t source_or_target_migrated(VertexProcessorMap vertex_to_processor, const Graph& g) { typedef source_or_target_migrated_t result_type; return result_type(vertex_to_processor, g); } } } // end of namespace detail::parallel template template void PBGL_DISTRIB_ADJLIST_TYPE ::request_in_neighbors(vertex_descriptor v, VertexProcessorMap vertex_to_processor, bidirectionalS) { BGL_FORALL_INEDGES_T(v, e, *this, graph_type) request(vertex_to_processor, source(e, *this)); } template template void PBGL_DISTRIB_ADJLIST_TYPE ::remove_migrated_in_edges(vertex_descriptor v, VertexProcessorMap vertex_to_processor, bidirectionalS) { graph_detail::erase_if(get(vertex_in_edges, base())[v.local], source_migrated(vertex_to_processor, base())); } template template void PBGL_DISTRIB_ADJLIST_TYPE ::redistribute(VertexProcessorMap vertex_to_processor) { using boost::parallel::inplace_all_to_all; // When we have stable descriptors, we only move those descriptors // that actually need to be moved. Otherwise, we essentially have to // regenerate the entire graph. const bool has_stable_descriptors = is_same::value || is_same::value || is_same::value; typedef detail::parallel::redistributed_descriptor redistributed_vertex; typedef detail::parallel::redistributed_descriptor redistributed_edge; vertex_iterator vi, vi_end; edge_iterator ei, ei_end; process_group_type pg = process_group(); // Initial synchronization makes sure that we have all of our ducks // in a row. We don't want any outstanding add/remove messages // coming in mid-redistribution! synchronize(process_group_); // We cannot cope with eviction of ghost cells vertex_to_processor.set_max_ghost_cells(0); process_id_type p = num_processes(pg); // Send vertices and edges to the processor where they will // actually reside. This requires O(|V| + |E|) communication std::vector > redistributed_vertices(p); std::vector > redistributed_edges(p); // Build the sets of relocated vertices for each process and then do // an all-to-all transfer. for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; ++vi) { if (!has_stable_descriptors || get(vertex_to_processor, *vi) != vi->owner) { redistributed_vertices[get(vertex_to_processor, *vi)] .push_back(redistributed_vertex(*vi, get(vertex_all_t(), base(), vi->local))); } // When our descriptors are stable, we need to determine which // adjacent descriptors are stable to determine which edges will // be removed. if (has_stable_descriptors) { BGL_FORALL_OUTEDGES_T(*vi, e, *this, graph_type) request(vertex_to_processor, target(e, *this)); request_in_neighbors(*vi, vertex_to_processor, directed_selector()); } } inplace_all_to_all(pg, redistributed_vertices); // If we have stable descriptors, we need to know where our neighbor // vertices are moving. if (has_stable_descriptors) synchronize(vertex_to_processor); // Build the sets of relocated edges for each process and then do // an all-to-all transfer. for (boost::tie(ei, ei_end) = edges(*this); ei != ei_end; ++ei) { vertex_descriptor src = source(*ei, *this); vertex_descriptor tgt = target(*ei, *this); if (!has_stable_descriptors || get(vertex_to_processor, src) != src.owner || get(vertex_to_processor, tgt) != tgt.owner) redistributed_edges[get(vertex_to_processor, source(*ei, *this))] .push_back(redistributed_edge(*ei, split_edge_property(get(edge_all_t(), base(), ei->local)))); } inplace_all_to_all(pg, redistributed_edges); // A mapping from old vertex descriptors to new vertex // descriptors. This is an STL map partly because I'm too lazy to // build a real property map (which is hard in the general case) but // also because it won't try to look in the graph itself, because // the keys are all vertex descriptors that have been invalidated. std::map old_to_new_vertex_map; if (has_stable_descriptors) { // Clear out all vertices and edges that will have moved. There // are several stages to this. // First, eliminate all outgoing edges from the (local) vertices // that have been moved or whose targets have been moved. BGL_FORALL_VERTICES_T(v, *this, graph_type) { if (get(vertex_to_processor, v) != v.owner) { clear_out_edges(v.local, base()); clear_in_edges_local(v, directed_selector()); } else { remove_out_edge_if(v.local, target_migrated(vertex_to_processor, base()), base()); remove_migrated_in_edges(v, vertex_to_processor, directed_selector()); } } // Next, eliminate locally-stored edges that have migrated (for // undirected graphs). graph_detail::erase_if(local_edges_, source_or_target_migrated(vertex_to_processor, *this)); // Eliminate vertices that have migrated for (boost::tie(vi, vi_end) = vertices(*this); vi != vi_end; /* in loop */) { if (get(vertex_to_processor, *vi) != vi->owner) remove_vertex((*vi++).local, base()); else { // Add the identity relation for vertices that have not migrated old_to_new_vertex_map[*vi] = *vi; ++vi; } } } else { // Clear out the local graph: the entire graph is in transit clear(); } // Add the new vertices to the graph. When we do so, update the old // -> new vertex mapping both locally and for the owner of the "old" // vertex. { typedef std::pair mapping_pair; std::vector > mappings(p); for (process_id_type src = 0; src < p; ++src) { for (typename std::vector::iterator vi = redistributed_vertices[src].begin(); vi != redistributed_vertices[src].end(); ++vi) { vertex_descriptor new_vertex = add_vertex(vi->get_property(), *this); old_to_new_vertex_map[vi->descriptor] = new_vertex; mappings[vi->descriptor.owner].push_back(mapping_pair(vi->descriptor, new_vertex)); } redistributed_vertices[src].clear(); } inplace_all_to_all(pg, mappings); // Add the mappings we were sent into the old->new map. for (process_id_type src = 0; src < p; ++src) old_to_new_vertex_map.insert(mappings[src].begin(), mappings[src].end()); } // Get old->new vertex mappings for all of the vertices we need to // know about. // TBD: An optimization here might involve sending the // request-response pairs without an explicit request step (for // bidirectional and undirected graphs). However, it may not matter // all that much given the cost of redistribution. { std::vector > vertex_map_requests(p); std::vector > vertex_map_responses(p); // We need to know about all of the vertices incident on edges // that have been relocated to this processor. Tell each processor // what each other processor needs to know. for (process_id_type src = 0; src < p; ++src) for (typename std::vector::iterator ei = redistributed_edges[src].begin(); ei != redistributed_edges[src].end(); ++ei) { vertex_descriptor need_vertex = target(ei->descriptor, *this); if (old_to_new_vertex_map.find(need_vertex) == old_to_new_vertex_map.end()) { old_to_new_vertex_map[need_vertex] = need_vertex; vertex_map_requests[need_vertex.owner].push_back(need_vertex); } } inplace_all_to_all(pg, vertex_map_requests, vertex_map_responses); // Process the requests made for vertices we own. Then perform yet // another all-to-all swap. This one matches the requests we've // made to the responses we were given. for (process_id_type src = 0; src < p; ++src) for (typename std::vector::iterator vi = vertex_map_responses[src].begin(); vi != vertex_map_responses[src].end(); ++vi) *vi = old_to_new_vertex_map[*vi]; inplace_all_to_all(pg, vertex_map_responses); // Matching the requests to the responses, update the old->new // vertex map for all of the vertices we will need to know. for (process_id_type src = 0; src < p; ++src) { typedef typename std::vector::size_type size_type; for (size_type i = 0; i < vertex_map_requests[src].size(); ++i) { old_to_new_vertex_map[vertex_map_requests[src][i]] = vertex_map_responses[src][i]; } } } // Add edges to the graph by mapping the source and target. for (process_id_type src = 0; src < p; ++src) { for (typename std::vector::iterator ei = redistributed_edges[src].begin(); ei != redistributed_edges[src].end(); ++ei) { add_edge(old_to_new_vertex_map[source(ei->descriptor, *this)], old_to_new_vertex_map[target(ei->descriptor, *this)], ei->get_property(), *this); } redistributed_edges[src].clear(); } // Be sure that edge-addition messages are received now, completing // the graph. synchronize(process_group_); this->distribution().clear(); detail::parallel::maybe_initialize_vertex_indices(vertices(base()), get(vertex_index, base())); } } // end namespace boost