diff --git a/src/ag_gen/ag_gen.cpp b/src/ag_gen/ag_gen.cpp index 94f5b3c..4041873 100755 --- a/src/ag_gen/ag_gen.cpp +++ b/src/ag_gen/ag_gen.cpp @@ -1280,17 +1280,20 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd task_zero(instance, localFrontier, mem_threshold); } - //Don't sync with db node - //work_comm.barrier(); - if (world.rank() > 1){ + //Start Fresh each iteration localFrontier.clear(); + std::vector().swap(instance.factbases); + std::vector().swap(instance.edges); + std::vector().swap(instance.factbase_items); + //Check for finished signal if(world.iprobe(0, 99)){ std::cout << "Node " << world.rank() << " received finalize signal" << std::endl; break; } + //We get a new state and need to do work if (world.iprobe(0, 1)){ NetworkState current_state; world.recv(0, 1, current_state); @@ -1449,8 +1452,8 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd { mpi::request ns_req = world.isend(w, 5, new_state); mpi::request fact_req = world.isend(w, 6, fact); - ns_req.wait(); - fact_req.wait(); + //ns_req.wait(); + //fact_req.wait(); } } } @@ -1516,59 +1519,38 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd } } } + //Let root node we finished, but only if we have done work since our last message if(send_msg == 1){ - - //if(localFrontier.size() != 0){ - // world.isend(0, 2, 2); - // } - // else{ - // world.isend(0, 2, 1); - //} send_msg = 0; std::cout << "Node " << world.rank() << " finished subgraphing work." << std::endl; if(localFrontier.size() != 0){ mpi::request lf_req = world.isend(0, 3, localFrontier); std::cout << "Node " << world.rank() << " has sent a localFrontier of size " << localFrontier.size() << std::endl; - lf_req.wait(); - std::cout << "Node " << world.rank() << " has finished sending localFrontier" << std::endl; + //lf_req.wait(); + //std::cout << "Node " << world.rank() << " has finished sending localFrontier" << std::endl; } //Send new states and edges, then clear worker instance - int fb_send = 0; - int ed_send = 0; - if(instance.factbases.size() != 0){ - fb_send = 1; - } - if(instance.edges.size() != 0){ - ed_send = 1; - } - if(fb_send == 1){ - - std::cout << "Node " << world.rank() << " preparing to send new factbases" << std::endl; - mpi::request if_req = world.isend(0, 10, instance.factbases); - if_req.wait(); - std::vector().swap(instance.factbases); - std::cout << "Node " << world.rank() << " finished sending new factbases" << std::endl; - } - if(ed_send == 1){ - mpi::request ie_req = world.isend(0, 11, instance.edges); - ie_req.wait(); - std::vector().swap(instance.edges); - std::cout << "Node " << world.rank() << " preparing to send new edges" << std::endl; - std::cout << "Node " << world.rank() << " finished sending new edges" << std::endl; - } - std::vector().swap(instance.factbase_items); + if(instance.factbases.size() != 0){ + std::cout << "Node " << world.rank() << " preparing to send new factbases" << std::endl; + mpi::request if_req = world.isend(0, 10, instance.factbases); + //if_req.wait(); + //std::cout << "Node " << world.rank() << " finished sending new factbases" << std::endl; + } + if(instance.edges.size() != 0){ + std::cout << "Node " << world.rank() << " preparing to send new edges" << std::endl; + mpi::request ie_req = world.isend(0, 11, instance.edges); + //ie_req.wait(); + //std::cout << "Node " << world.rank() << " finished sending new edges" << std::endl; + } - if(localFrontier.size() != 0){ - world.isend(0, 2, 2); - } - else{ - world.isend(0, 2, 1); - } + //Finish Signal + world.isend(0, 2, 1); } //Check for new fact and new state that caused an update in the hash table and facts + //Note: Had difficulties using while with probes. Sloppy solution at time of writing: if and breaks. //while(world.iprobe(mpi::any_source, 5) || world.iprobe(mpi::any_source, 6)){ while(true){ if(!world.iprobe(mpi::any_source, 5) && !world.iprobe(mpi::any_source, 6)){ @@ -1576,7 +1558,7 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd } NetworkState update_state; Quality update_fact; - std::cout << "Node " << world.rank() << " receiving new hash state and fact" << std::endl; + std::cout << "Node " << world.rank() << " receiving new hash state and fact" << std::endl; world.recv(mpi::any_source, 5, update_state); world.recv(mpi::any_source, 6, update_fact); //Update @@ -1628,8 +1610,9 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd //World Rank = 0 else{ std::map deque_marker; - std::map frontier_recv; int finish_count = 0; + std::vector node_factbases; + std::vector node_edges; //Receive states and edges when nodes finish //while(finish_count != world.size() -2){ std::cout << "Waiting for " << msg_sends << " total messages." << std::endl; @@ -1639,18 +1622,18 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd if(!world.iprobe(mpi::any_source, 10) && !world.iprobe(mpi::any_source, 11)){ break; } - std::vector node_factbases; - std::vector node_edges; - if(world.iprobe(mpi::any_source, 10)){ + + if(world.iprobe(mpi::any_source, 10)){ world.recv(mpi::any_source, 10, node_factbases); - - } - if(world.iprobe(mpi::any_source, 11)){ + } + if(world.iprobe(mpi::any_source, 11)){ world.recv(mpi::any_source, 11, node_edges); - } - std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl; - state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world); - std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl; + } + if(node_factbases.size() != 0 && node_edges.size() != 0){ + std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl; + state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world); + std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl; + } } //Nodes finish @@ -1659,12 +1642,6 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd if(world.iprobe(w, 2)){ world.recv(w, 2, dummy_finish); finish_count++; - //if(dummy_finish == 2){ - // frontier_recv[w] = 1; - //} - //else{ - // frontier_recv[w] = 0; - //} std::cout << "Root received subgraph finish message from node " << w << std::endl; } } @@ -1697,23 +1674,44 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd } */ } - std::cout << "Root received all signals from nodes" << std::endl; - if(world.iprobe(mpi::any_source, 10) || world.iprobe(mpi::any_source, 11)){ - std::vector node_factbases; - std::vector node_edges; - if(world.iprobe(mpi::any_source, 10)){ - world.recv(mpi::any_source, 10, node_factbases); - } - if(world.iprobe(mpi::any_source, 11)){ - world.recv(mpi::any_source, 11, node_edges); - } - std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl; - state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world); - std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl; + + //Check for Hash Table updates + while(true){ + if(!world.iprobe(mpi::any_source, 5) && !world.iprobe(mpi::any_source, 6)){ + break; + } + NetworkState update_state; + Quality update_fact; + + world.recv(mpi::any_source, 5, update_state); + world.recv(mpi::any_source, 6, update_fact); + //Update + instance.facts.hash_table[update_state.compound_assign(update_fact)]=instance.facts.size(); + instance.facts.length++; + instance.facts.str_vector.push_back(update_state.compound_assign(update_fact)); } - //Receive node frontiers and merge them into root frontier + + //Check for factbases or nodes, and add them to our network + while(true){ + if(world.iprobe(mpi::any_source, 10)){ + world.recv(mpi::any_source, 10, node_factbases); + } + if(world.iprobe(mpi::any_source, 11)){ + world.recv(mpi::any_source, 11, node_edges); + } + if(node_factbases.size() != 0 || node_edges.size() != 0){ + std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl; + state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world); + std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl; + } + else{ + break; + } + } + + //Receive node frontiers and merge them into root frontier for(int w = 2; w < world.size(); w++){ //if(world.iprobe(w, 3)){ if(world.iprobe(w, 3)){ @@ -1785,7 +1783,7 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd std::cout << "Preparing the send to World Rank " << w << std::endl; mpi::request state_req = world.isend(w, 1, send_state); localFrontier.erase(localFrontier.begin()+deque_access); - state_req.wait(); + //state_req.wait(); msg_sends++; std::cout << "Root sent new states to World Rank " << w << std::endl; }