Adjusting syncing

This commit is contained in:
Noah L. Schrick 2022-02-08 17:59:07 -06:00
parent a7c3e9e0c2
commit 97e4ba4647

View File

@ -1280,17 +1280,20 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
task_zero(instance, localFrontier, mem_threshold); task_zero(instance, localFrontier, mem_threshold);
} }
//Don't sync with db node
//work_comm.barrier();
if (world.rank() > 1){ if (world.rank() > 1){
//Start Fresh each iteration
localFrontier.clear(); localFrontier.clear();
std::vector<Factbase>().swap(instance.factbases);
std::vector<Edge>().swap(instance.edges);
std::vector<FactbaseItems>().swap(instance.factbase_items);
//Check for finished signal //Check for finished signal
if(world.iprobe(0, 99)){ if(world.iprobe(0, 99)){
std::cout << "Node " << world.rank() << " received finalize signal" << std::endl; std::cout << "Node " << world.rank() << " received finalize signal" << std::endl;
break; break;
} }
//We get a new state and need to do work
if (world.iprobe(0, 1)){ if (world.iprobe(0, 1)){
NetworkState current_state; NetworkState current_state;
world.recv(0, 1, current_state); world.recv(0, 1, current_state);
@ -1449,8 +1452,8 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
{ {
mpi::request ns_req = world.isend(w, 5, new_state); mpi::request ns_req = world.isend(w, 5, new_state);
mpi::request fact_req = world.isend(w, 6, fact); mpi::request fact_req = world.isend(w, 6, fact);
ns_req.wait(); //ns_req.wait();
fact_req.wait(); //fact_req.wait();
} }
} }
} }
@ -1516,59 +1519,38 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
} }
} }
} }
//Let root node we finished, but only if we have done work since our last message //Let root node we finished, but only if we have done work since our last message
if(send_msg == 1){ if(send_msg == 1){
//if(localFrontier.size() != 0){
// world.isend(0, 2, 2);
// }
// else{
// world.isend(0, 2, 1);
//}
send_msg = 0; send_msg = 0;
std::cout << "Node " << world.rank() << " finished subgraphing work." << std::endl; std::cout << "Node " << world.rank() << " finished subgraphing work." << std::endl;
if(localFrontier.size() != 0){ if(localFrontier.size() != 0){
mpi::request lf_req = world.isend(0, 3, localFrontier); mpi::request lf_req = world.isend(0, 3, localFrontier);
std::cout << "Node " << world.rank() << " has sent a localFrontier of size " << localFrontier.size() << std::endl; std::cout << "Node " << world.rank() << " has sent a localFrontier of size " << localFrontier.size() << std::endl;
lf_req.wait(); //lf_req.wait();
std::cout << "Node " << world.rank() << " has finished sending localFrontier" << std::endl; //std::cout << "Node " << world.rank() << " has finished sending localFrontier" << std::endl;
} }
//Send new states and edges, then clear worker instance //Send new states and edges, then clear worker instance
int fb_send = 0; if(instance.factbases.size() != 0){
int ed_send = 0; std::cout << "Node " << world.rank() << " preparing to send new factbases" << std::endl;
if(instance.factbases.size() != 0){ mpi::request if_req = world.isend(0, 10, instance.factbases);
fb_send = 1; //if_req.wait();
} //std::cout << "Node " << world.rank() << " finished sending new factbases" << std::endl;
if(instance.edges.size() != 0){ }
ed_send = 1; if(instance.edges.size() != 0){
} std::cout << "Node " << world.rank() << " preparing to send new edges" << std::endl;
if(fb_send == 1){ mpi::request ie_req = world.isend(0, 11, instance.edges);
//ie_req.wait();
std::cout << "Node " << world.rank() << " preparing to send new factbases" << std::endl; //std::cout << "Node " << world.rank() << " finished sending new edges" << std::endl;
mpi::request if_req = world.isend(0, 10, instance.factbases); }
if_req.wait();
std::vector<Factbase>().swap(instance.factbases);
std::cout << "Node " << world.rank() << " finished sending new factbases" << std::endl;
}
if(ed_send == 1){
mpi::request ie_req = world.isend(0, 11, instance.edges);
ie_req.wait();
std::vector<Edge>().swap(instance.edges);
std::cout << "Node " << world.rank() << " preparing to send new edges" << std::endl;
std::cout << "Node " << world.rank() << " finished sending new edges" << std::endl;
}
std::vector<FactbaseItems>().swap(instance.factbase_items);
if(localFrontier.size() != 0){ //Finish Signal
world.isend(0, 2, 2); world.isend(0, 2, 1);
}
else{
world.isend(0, 2, 1);
}
} }
//Check for new fact and new state that caused an update in the hash table and facts //Check for new fact and new state that caused an update in the hash table and facts
//Note: Had difficulties using while with probes. Sloppy solution at time of writing: if and breaks.
//while(world.iprobe(mpi::any_source, 5) || world.iprobe(mpi::any_source, 6)){ //while(world.iprobe(mpi::any_source, 5) || world.iprobe(mpi::any_source, 6)){
while(true){ while(true){
if(!world.iprobe(mpi::any_source, 5) && !world.iprobe(mpi::any_source, 6)){ if(!world.iprobe(mpi::any_source, 5) && !world.iprobe(mpi::any_source, 6)){
@ -1576,7 +1558,7 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
} }
NetworkState update_state; NetworkState update_state;
Quality update_fact; Quality update_fact;
std::cout << "Node " << world.rank() << " receiving new hash state and fact" << std::endl; std::cout << "Node " << world.rank() << " receiving new hash state and fact" << std::endl;
world.recv(mpi::any_source, 5, update_state); world.recv(mpi::any_source, 5, update_state);
world.recv(mpi::any_source, 6, update_fact); world.recv(mpi::any_source, 6, update_fact);
//Update //Update
@ -1628,8 +1610,9 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
//World Rank = 0 //World Rank = 0
else{ else{
std::map<int, int> deque_marker; std::map<int, int> deque_marker;
std::map<int, int> frontier_recv;
int finish_count = 0; int finish_count = 0;
std::vector<Factbase> node_factbases;
std::vector<Edge> node_edges;
//Receive states and edges when nodes finish //Receive states and edges when nodes finish
//while(finish_count != world.size() -2){ //while(finish_count != world.size() -2){
std::cout << "Waiting for " << msg_sends << " total messages." << std::endl; std::cout << "Waiting for " << msg_sends << " total messages." << std::endl;
@ -1639,18 +1622,18 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
if(!world.iprobe(mpi::any_source, 10) && !world.iprobe(mpi::any_source, 11)){ if(!world.iprobe(mpi::any_source, 10) && !world.iprobe(mpi::any_source, 11)){
break; break;
} }
std::vector<Factbase> node_factbases;
std::vector<Edge> node_edges; if(world.iprobe(mpi::any_source, 10)){
if(world.iprobe(mpi::any_source, 10)){
world.recv(mpi::any_source, 10, node_factbases); world.recv(mpi::any_source, 10, node_factbases);
}
} if(world.iprobe(mpi::any_source, 11)){
if(world.iprobe(mpi::any_source, 11)){
world.recv(mpi::any_source, 11, node_edges); world.recv(mpi::any_source, 11, node_edges);
} }
std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl; if(node_factbases.size() != 0 && node_edges.size() != 0){
state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world); std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl;
std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl; state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world);
std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl;
}
} }
//Nodes finish //Nodes finish
@ -1659,12 +1642,6 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
if(world.iprobe(w, 2)){ if(world.iprobe(w, 2)){
world.recv(w, 2, dummy_finish); world.recv(w, 2, dummy_finish);
finish_count++; finish_count++;
//if(dummy_finish == 2){
// frontier_recv[w] = 1;
//}
//else{
// frontier_recv[w] = 0;
//}
std::cout << "Root received subgraph finish message from node " << w << std::endl; std::cout << "Root received subgraph finish message from node " << w << std::endl;
} }
} }
@ -1697,23 +1674,44 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
} }
*/ */
} }
std::cout << "Root received all signals from nodes" << std::endl; std::cout << "Root received all signals from nodes" << std::endl;
if(world.iprobe(mpi::any_source, 10) || world.iprobe(mpi::any_source, 11)){
std::vector<Factbase> node_factbases; //Check for Hash Table updates
std::vector<Edge> node_edges; while(true){
if(world.iprobe(mpi::any_source, 10)){ if(!world.iprobe(mpi::any_source, 5) && !world.iprobe(mpi::any_source, 6)){
world.recv(mpi::any_source, 10, node_factbases); break;
} }
if(world.iprobe(mpi::any_source, 11)){ NetworkState update_state;
world.recv(mpi::any_source, 11, node_edges); Quality update_fact;
}
std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl; world.recv(mpi::any_source, 5, update_state);
state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world); world.recv(mpi::any_source, 6, update_fact);
std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl; //Update
instance.facts.hash_table[update_state.compound_assign(update_fact)]=instance.facts.size();
instance.facts.length++;
instance.facts.str_vector.push_back(update_state.compound_assign(update_fact));
} }
//Receive node frontiers and merge them into root frontier
//Check for factbases or nodes, and add them to our network
while(true){
if(world.iprobe(mpi::any_source, 10)){
world.recv(mpi::any_source, 10, node_factbases);
}
if(world.iprobe(mpi::any_source, 11)){
world.recv(mpi::any_source, 11, node_edges);
}
if(node_factbases.size() != 0 || node_edges.size() != 0){
std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl;
state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world);
std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl;
}
else{
break;
}
}
//Receive node frontiers and merge them into root frontier
for(int w = 2; w < world.size(); w++){ for(int w = 2; w < world.size(); w++){
//if(world.iprobe(w, 3)){ //if(world.iprobe(w, 3)){
if(world.iprobe(w, 3)){ if(world.iprobe(w, 3)){
@ -1785,7 +1783,7 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
std::cout << "Preparing the send to World Rank " << w << std::endl; std::cout << "Preparing the send to World Rank " << w << std::endl;
mpi::request state_req = world.isend(w, 1, send_state); mpi::request state_req = world.isend(w, 1, send_state);
localFrontier.erase(localFrontier.begin()+deque_access); localFrontier.erase(localFrontier.begin()+deque_access);
state_req.wait(); //state_req.wait();
msg_sends++; msg_sends++;
std::cout << "Root sent new states to World Rank " << w << std::endl; std::cout << "Root sent new states to World Rank " << w << std::endl;
} }