diff --git a/build/ag_gen b/build/ag_gen index 22e69ba..dabd297 100755 Binary files a/build/ag_gen and b/build/ag_gen differ diff --git a/src/ag_gen/ag_gen.cpp b/src/ag_gen/ag_gen.cpp index 455abc3..ac7c643 100755 --- a/src/ag_gen/ag_gen.cpp +++ b/src/ag_gen/ag_gen.cpp @@ -568,7 +568,7 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, } //Task Four - if (world.rank() == send_check(world, 2*two_alloc+1)){ + if (world.rank() == send_check(world, 2*two_alloc)){ if(world.iprobe(0, 7) || world.iprobe(0, 8)){ std::cout << "Node " << world.rank() << " is starting Task 4." << std::endl; std::vector factbases_dump; @@ -587,6 +587,38 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, } } + //Task Five + if (world.rank() == 0){ + double f_alpha; + if (!localFrontier.empty()) + f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; + else + f_alpha = 0.0; + + if (f_alpha >= (mem_threshold/2)) { + //std::cout << "Frontier Alpha prior to database storing: " << f_alpha << std::endl; + int q_eles = localFrontier.size(); + for (int q = 0; q < q_eles; q++) { + auto rem_state = localFrontier.front(); + localFrontier.pop_front(); + world.send(send_check(world, 2*two_alloc+1), 50, rem_state); + } + if (!localFrontier.empty()) + f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; + else + f_alpha = 0; + //std::cout << "Frontier Alpha after database storing: " << f_alpha << std::endl; + } + } + + if(world.rank() == send_check(world, 2*two_alloc+2)){ + while(world.iprobe(0, 50)){ + NetworkState save_state; + world.recv(0, 50, save_state); + save_unexplored_to_db(save_state); + } + } + std::cout << "Node " << world.rank() << " moving to next iter" << std::endl; } //while ends diff --git a/src/mpi/tasks.cpp b/src/mpi/tasks.cpp index cf3403b..dbdcc42 100644 --- a/src/mpi/tasks.cpp +++ b/src/mpi/tasks.cpp @@ -35,6 +35,8 @@ #include "../ag_gen/ag_gen.h" #include "tasks.h" +namespace mpi = boost::mpi; + void task_zero(AGGenInstance &instance, std::deque &localFrontier, double mem_threshold) { //std::cout << "Frontier empty, retrieving from database" << std::endl; @@ -165,15 +167,19 @@ void task_two(AGGenInstance &instance, int alloc, int two_alloc, boost::mpi::com NetworkState current_state; std::vector> appl_exploits; - world.recv(mpi::any_source, 30, appl_exploits); + if(ttwo_comm.rank() == 0) + world.recv(mpi::any_source, 30, appl_exploits); world.recv(mpi::any_source, 40, current_state); std::cout << "Process " << world.rank() << " received Task 1 data." << std::endl; std::cout << "Appl exploit size " << appl_exploits.size() << std::endl; std::vector> partial_appl_exploits; - if(ttwo_comm.size() > 1) - mpi::scatter(ttwo_comm, &partial_appl_exploits, appl_exploits, 0); + + //Don't think we need this? All nodes get their own appl_expl list + //if(ttwo_comm.size() > 1) + //mpi::scatter(ttwo_comm, partial_appl_exploits, appl_exploits.data(), 0); + //mpi::scatter(ttwo_comm, &partial_appl_exploits, appl_exploits, 0); auto current_hash = current_state.get_hash(instance.facts); @@ -399,8 +405,8 @@ void task_three(AGGenInstance &instance, NetworkState &new_state, std::deque= mem_threshold/2){ //std::cout << "Instance Alpha prior to database storing: " << i_alpha << std::endl; - world.isend(send_check(world, two_alloc), 7, instance.factbases); - world.isend(send_check(world, two_alloc), 8, instance.edges); + world.isend(send_check(world, 2*two_alloc), 7, instance.factbases); + world.isend(send_check(world, 2*two_alloc), 8, instance.edges); //save_ag_to_db(instance, true); //Clear vectors and free memory