diff --git a/build/ag_gen b/build/ag_gen index a6ded4b..967061b 100755 Binary files a/build/ag_gen and b/build/ag_gen differ diff --git a/src/ag_gen/ag_gen.cpp b/src/ag_gen/ag_gen.cpp index 52916f3..4a16f79 100755 --- a/src/ag_gen/ag_gen.cpp +++ b/src/ag_gen/ag_gen.cpp @@ -367,6 +367,25 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, #pragma omp cancel for } + //Task 1 Node Allocating + int alloc; + if(world.size() <= num_tasks) + alloc = 1; + else + alloc = ceil((world.size()-num_tasks)/2); + + //Task 2 Node Allocating + int reduc_factor = 0; + int two_alloc = alloc; + if(world.size() % 2 != 0 && world.size() > num_tasks){ + int two_alloc = alloc-1; + reduc_factor = 1; + } + + //Create Communicators + boost::mpi::communicator tcomm = world.split(world.rank() > 0 && world.rank() <= alloc); + boost::mpi::communicator ttwo_comm = world.split(world.rank() == send_check(world, alloc) && world.rank() <= (2*two_alloc)); + while (!localFrontier.empty() || !unex_empty() || world.rank() > 0){//while starts //Node 0 needs to tell other nodes to continue @@ -403,34 +422,14 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, total_task0+=(t02.tv_sec-t01.tv_sec)*1000.0+(t02.tv_usec-t01.tv_usec)/1000.0; } - //Task 1 Node Allocating - int alloc; - if(world.size() <= num_tasks) - alloc = 1; - else - alloc = ceil((world.size()-num_tasks)/2); - - //Task 2 Node Allocating - int reduc_factor = 0; - int two_alloc = alloc; - if(world.size() % 2 != 0 && world.size() > num_tasks){ - int two_alloc = alloc-1; - reduc_factor = 1; - } - - //Create Communicators - boost::mpi::communicator tcomm = world.split(world.rank() > 0 && world.rank() <= alloc); - - boost::mpi::communicator ttwo_comm = world.split(world.rank() == send_check(world, alloc) && world.rank() <= (2*two_alloc)); - //Task 0 to Task 1 Communication if(world.rank() == 0) { auto current_state = localFrontier.back(); auto current_hash = current_state.get_hash(instance.facts); localFrontier.pop_back(); - for(int l=0; l <= alloc; l++){ - world.isend(send_check(world, world.rank()+l), 20, current_state); + for(int l=0; l < alloc; l++){ + world.isend(send_check(world, l), 20, current_state); } } @@ -449,30 +448,42 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, //Execute Task 2 //if(world.rank() == send_check(world, alloc) && world.rank() <= send_check(world, 2*two_alloc)) - if(world.rank() >= send_check(world, alloc) && world.rank() <= (2*two_alloc)) + //Edge case: 2 nodes + if((world.size() == 2 && world.rank() == 0) || world.size() > 2) { - //Execute Task 2 - struct timeval t21,t22; - gettimeofday(&t21,NULL); - task_two(instance, alloc, two_alloc, world, localFrontier, mem_threshold,\ - ttwo_comm, ex_groups, hash_map); - std::cout << "Finished Task 2 as Node: " << world.rank() << std::endl; - //Wait for all Task 2 nodes to finish - ttwo_comm.barrier(); - gettimeofday(&t22,NULL); - total_task2+=(t22.tv_sec-t21.tv_sec)*1000.0+(t22.tv_usec-t21.tv_usec)/1000.0; + if(world.rank() >= send_check(world, alloc) && world.rank() <= (2*two_alloc)) + { + //Execute Task 2 + struct timeval t21,t22; + gettimeofday(&t21,NULL); + task_two(instance, alloc, two_alloc, world, localFrontier, mem_threshold,\ + ttwo_comm, ex_groups, hash_map); + std::cout << "Finished Task 2 as Node: " << world.rank() << std::endl; + //Wait for all Task 2 nodes to finish + ttwo_comm.barrier(); + gettimeofday(&t22,NULL); + total_task2+=(t22.tv_sec-t21.tv_sec)*1000.0+(t22.tv_usec-t21.tv_usec)/1000.0; - //Have the 0th Task 2 node tell the other world nodes that it's done - if(ttwo_comm.rank() == 0){ - for (int w = 0; w < world.size(); w++) - { - if(w < world.rank() || w > (2*two_alloc)) + //Have the 0th Task 2 node tell the other world nodes that it's done + if(ttwo_comm.rank() == 0){ + for (int w = 0; w < world.size(); w++) { - world.isend(w, 2, 1); + //2 Node Edge case + if(world.size() == 2){ + world.isend(1, 2, 1); + std::cout << "Node " << world.rank() << " sent finished message." << std::endl; + break; + } + + else if(w < world.rank() || w > 2*two_alloc) + { + world.isend(w, 2, 1); + } + } + std::cout << "Task 2 Finished." << std::endl; } - std::cout << "Task 2 Finished." << std::endl; } } else{ @@ -487,8 +498,20 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, //If we haven't been told that task 2 is finished, and if we still more facts or states to update: //while(!world.iprobe(1+alloc, 2) && world.iprobe(mpi::any_source, 3) && world.iprobe(mpi::any_source, 4) && world.iprobe(mpi::any_source, 5)) std::cout << "Node " << world.rank() << " is waiting for Task 2 completion." << std::endl; - while(!world.iprobe(send_check(world, alloc), 2) || world.iprobe(mpi::any_source, 3) || world.iprobe(mpi::any_source, 4) || world.iprobe(mpi::any_source, 5)) + int finished_signal = 0; + if(world.iprobe(send_check(world, alloc), 2) || world.iprobe(0, 2)){ + world.recv(mpi::any_source, finished_signal); + } + + //while(!world.iprobe(send_check(world, alloc), 2) || world.iprobe(mpi::any_source, 3) || world.iprobe(mpi::any_source, 4) || world.iprobe(mpi::any_source, 5)) + while(!finished_signal || world.iprobe(mpi::any_source, 3) || world.iprobe(mpi::any_source, 4) || world.iprobe(mpi::any_source, 5)) { + //Check for the finished signal + if(world.iprobe(send_check(world, alloc), 2) || world.iprobe(0, 2)){ + std::cout << "Else nodes received task 2 completion message." << std::endl; + world.recv(mpi::any_source, 2, finished_signal); + std::cout << "Receive message complete." << std::endl; + } //If we get a new fact and new state, update if(world.iprobe(mpi::any_source, 3) && world.iprobe(mpi::any_source, 4)){ @@ -522,12 +545,8 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, total_task3+=(t32.tv_sec-t31.tv_sec)*1000.0+(t32.tv_usec-t31.tv_usec)/1000.0; } } - } - //Receive the message so it doesn't just sit there - int ttwo_done; - world.recv(mpi::any_source, 2, ttwo_done); - std::cout << "Else nodes received task 2 completion message." << std::endl; + std::cout << "Node " << world.rank() << " has finished Task 2 waiting." << std::endl; } //Task Four @@ -549,6 +568,8 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, total_task4+=(t42.tv_sec-t41.tv_sec)*1000.0+(t42.tv_usec-t41.tv_usec)/1000.0; } } + + std::cout << "Node " << world.rank() << " moving to next iter" << std::endl; } //while ends if(world.rank() == 0){