Adjusting Task 2 completion messages

This commit is contained in:
Noah L. Schrick 2022-01-29 22:02:08 -06:00
parent 6a6893fc21
commit 9bbb9baae2
2 changed files with 67 additions and 46 deletions

Binary file not shown.

View File

@ -367,6 +367,25 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
#pragma omp cancel for
}
//Task 1 Node Allocating
int alloc;
if(world.size() <= num_tasks)
alloc = 1;
else
alloc = ceil((world.size()-num_tasks)/2);
//Task 2 Node Allocating
int reduc_factor = 0;
int two_alloc = alloc;
if(world.size() % 2 != 0 && world.size() > num_tasks){
int two_alloc = alloc-1;
reduc_factor = 1;
}
//Create Communicators
boost::mpi::communicator tcomm = world.split(world.rank() > 0 && world.rank() <= alloc);
boost::mpi::communicator ttwo_comm = world.split(world.rank() == send_check(world, alloc) && world.rank() <= (2*two_alloc));
while (!localFrontier.empty() || !unex_empty() || world.rank() > 0){//while starts
//Node 0 needs to tell other nodes to continue
@ -403,34 +422,14 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
total_task0+=(t02.tv_sec-t01.tv_sec)*1000.0+(t02.tv_usec-t01.tv_usec)/1000.0;
}
//Task 1 Node Allocating
int alloc;
if(world.size() <= num_tasks)
alloc = 1;
else
alloc = ceil((world.size()-num_tasks)/2);
//Task 2 Node Allocating
int reduc_factor = 0;
int two_alloc = alloc;
if(world.size() % 2 != 0 && world.size() > num_tasks){
int two_alloc = alloc-1;
reduc_factor = 1;
}
//Create Communicators
boost::mpi::communicator tcomm = world.split(world.rank() > 0 && world.rank() <= alloc);
boost::mpi::communicator ttwo_comm = world.split(world.rank() == send_check(world, alloc) && world.rank() <= (2*two_alloc));
//Task 0 to Task 1 Communication
if(world.rank() == 0)
{
auto current_state = localFrontier.back();
auto current_hash = current_state.get_hash(instance.facts);
localFrontier.pop_back();
for(int l=0; l <= alloc; l++){
world.isend(send_check(world, world.rank()+l), 20, current_state);
for(int l=0; l < alloc; l++){
world.isend(send_check(world, l), 20, current_state);
}
}
@ -449,30 +448,42 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
//Execute Task 2
//if(world.rank() == send_check(world, alloc) && world.rank() <= send_check(world, 2*two_alloc))
if(world.rank() >= send_check(world, alloc) && world.rank() <= (2*two_alloc))
//Edge case: 2 nodes
if((world.size() == 2 && world.rank() == 0) || world.size() > 2)
{
//Execute Task 2
struct timeval t21,t22;
gettimeofday(&t21,NULL);
task_two(instance, alloc, two_alloc, world, localFrontier, mem_threshold,\
ttwo_comm, ex_groups, hash_map);
std::cout << "Finished Task 2 as Node: " << world.rank() << std::endl;
//Wait for all Task 2 nodes to finish
ttwo_comm.barrier();
gettimeofday(&t22,NULL);
total_task2+=(t22.tv_sec-t21.tv_sec)*1000.0+(t22.tv_usec-t21.tv_usec)/1000.0;
if(world.rank() >= send_check(world, alloc) && world.rank() <= (2*two_alloc))
{
//Execute Task 2
struct timeval t21,t22;
gettimeofday(&t21,NULL);
task_two(instance, alloc, two_alloc, world, localFrontier, mem_threshold,\
ttwo_comm, ex_groups, hash_map);
std::cout << "Finished Task 2 as Node: " << world.rank() << std::endl;
//Wait for all Task 2 nodes to finish
ttwo_comm.barrier();
gettimeofday(&t22,NULL);
total_task2+=(t22.tv_sec-t21.tv_sec)*1000.0+(t22.tv_usec-t21.tv_usec)/1000.0;
//Have the 0th Task 2 node tell the other world nodes that it's done
if(ttwo_comm.rank() == 0){
for (int w = 0; w < world.size(); w++)
{
if(w < world.rank() || w > (2*two_alloc))
//Have the 0th Task 2 node tell the other world nodes that it's done
if(ttwo_comm.rank() == 0){
for (int w = 0; w < world.size(); w++)
{
world.isend(w, 2, 1);
//2 Node Edge case
if(world.size() == 2){
world.isend(1, 2, 1);
std::cout << "Node " << world.rank() << " sent finished message." << std::endl;
break;
}
else if(w < world.rank() || w > 2*two_alloc)
{
world.isend(w, 2, 1);
}
}
std::cout << "Task 2 Finished." << std::endl;
}
std::cout << "Task 2 Finished." << std::endl;
}
}
else{
@ -487,8 +498,20 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
//If we haven't been told that task 2 is finished, and if we still more facts or states to update:
//while(!world.iprobe(1+alloc, 2) && world.iprobe(mpi::any_source, 3) && world.iprobe(mpi::any_source, 4) && world.iprobe(mpi::any_source, 5))
std::cout << "Node " << world.rank() << " is waiting for Task 2 completion." << std::endl;
while(!world.iprobe(send_check(world, alloc), 2) || world.iprobe(mpi::any_source, 3) || world.iprobe(mpi::any_source, 4) || world.iprobe(mpi::any_source, 5))
int finished_signal = 0;
if(world.iprobe(send_check(world, alloc), 2) || world.iprobe(0, 2)){
world.recv(mpi::any_source, finished_signal);
}
//while(!world.iprobe(send_check(world, alloc), 2) || world.iprobe(mpi::any_source, 3) || world.iprobe(mpi::any_source, 4) || world.iprobe(mpi::any_source, 5))
while(!finished_signal || world.iprobe(mpi::any_source, 3) || world.iprobe(mpi::any_source, 4) || world.iprobe(mpi::any_source, 5))
{
//Check for the finished signal
if(world.iprobe(send_check(world, alloc), 2) || world.iprobe(0, 2)){
std::cout << "Else nodes received task 2 completion message." << std::endl;
world.recv(mpi::any_source, 2, finished_signal);
std::cout << "Receive message complete." << std::endl;
}
//If we get a new fact and new state, update
if(world.iprobe(mpi::any_source, 3) && world.iprobe(mpi::any_source, 4)){
@ -522,12 +545,8 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
total_task3+=(t32.tv_sec-t31.tv_sec)*1000.0+(t32.tv_usec-t31.tv_usec)/1000.0;
}
}
}
//Receive the message so it doesn't just sit there
int ttwo_done;
world.recv(mpi::any_source, 2, ttwo_done);
std::cout << "Else nodes received task 2 completion message." << std::endl;
std::cout << "Node " << world.rank() << " has finished Task 2 waiting." << std::endl;
}
//Task Four
@ -549,6 +568,8 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
total_task4+=(t42.tv_sec-t41.tv_sec)*1000.0+(t42.tv_usec-t41.tv_usec)/1000.0;
}
}
std::cout << "Node " << world.rank() << " moving to next iter" << std::endl;
} //while ends
if(world.rank() == 0){