Removing state queuing
This commit is contained in:
parent
048724cc43
commit
7add771ac3
BIN
build/ag_gen
BIN
build/ag_gen
Binary file not shown.
@ -398,15 +398,18 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
|
|||||||
ttwo_comm = world.split(world.rank() >= send_check(world, alloc) && world.rank() <= (alloc+two_alloc));
|
ttwo_comm = world.split(world.rank() >= send_check(world, alloc) && world.rank() <= (alloc+two_alloc));
|
||||||
//std::cout << "TCOMM SIZE: " << tcomm.size() << std::endl;
|
//std::cout << "TCOMM SIZE: " << tcomm.size() << std::endl;
|
||||||
//std::cout << "TTWO_COMM SIZE: " << ttwo_comm.size() << std::endl;
|
//std::cout << "TTWO_COMM SIZE: " << ttwo_comm.size() << std::endl;
|
||||||
while (!localFrontier.empty() || !unex_empty() || world.iprobe(mpi::any_source, 5) || world.rank() > 0 || (world.rank() ==0 && zero_queue > 0)){//while starts
|
while (!localFrontier.empty() || !unex_empty() || world.rank() > 0){//while starts
|
||||||
|
|
||||||
if (world.rank() != 0) {
|
if (world.rank() == 0){
|
||||||
|
int dummy = 1;
|
||||||
|
for (int w = 1; w < world.size(); w++){
|
||||||
|
mpi::request dum_req = world.isend(w, 14, dummy);
|
||||||
|
dum_req.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
//If we don't have the go-ahead, check for the Finalize message.
|
//If we don't have the go-ahead, check for the Finalize message.
|
||||||
//std::cout << "Node " << world.rank() << " is waiting for the go-ahead or finalize message." << std::endl;
|
//std::cout << "Node " << world.rank() << " is waiting for the go-ahead or finalize message." << std::endl;
|
||||||
if(!world.iprobe(0, 14)){
|
|
||||||
//Let Node 0 we're waiting
|
|
||||||
world.isend(0, 12, 1);
|
|
||||||
}
|
|
||||||
while(!world.iprobe(0, 14)){
|
while(!world.iprobe(0, 14)){
|
||||||
if(world.iprobe(0, 15)){
|
if(world.iprobe(0, 15)){
|
||||||
mpi_exit = 1;
|
mpi_exit = 1;
|
||||||
@ -423,7 +426,7 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
|
|||||||
//We need to refill the localFrontier with states from the database if it's empty
|
//We need to refill the localFrontier with states from the database if it's empty
|
||||||
//TODO: Investigate if it's faster to read from DB directly into Node 0
|
//TODO: Investigate if it's faster to read from DB directly into Node 0
|
||||||
//Or if faster to have worker node read from DB, then comm to Node 0
|
//Or if faster to have worker node read from DB, then comm to Node 0
|
||||||
if(localFrontier.empty() && world.rank() == 0 && !world.iprobe(mpi::any_source, 5) && zero_queue == 0) {
|
if(localFrontier.empty() && world.rank() == 0) {
|
||||||
struct timeval t01,t02;
|
struct timeval t01,t02;
|
||||||
gettimeofday(&t01,NULL);
|
gettimeofday(&t01,NULL);
|
||||||
task_zero(instance, localFrontier, mem_threshold);
|
task_zero(instance, localFrontier, mem_threshold);
|
||||||
@ -435,23 +438,12 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
|
|||||||
//Task 0 to Task 1 Communication
|
//Task 0 to Task 1 Communication
|
||||||
if(world.rank() == 0)
|
if(world.rank() == 0)
|
||||||
{
|
{
|
||||||
//Send at most 50 states at one time to Task 1, assuming we don't have more state data to add
|
auto current_state = localFrontier.back();
|
||||||
//if (!world.iprobe(mpi::any_source, 5)){
|
localFrontier.pop_back();
|
||||||
for (int f = 0; f < std::min(50, (int)localFrontier.size()); f++){
|
for(int l=0; l < alloc; l++){
|
||||||
auto current_state = localFrontier.back();
|
mpi::request cs_req = world.isend(send_check(world, l), 20, current_state);
|
||||||
localFrontier.pop_back();
|
cs_req.wait();
|
||||||
for(int l=0; l < alloc; l++){
|
}
|
||||||
mpi::request cs_req = world.isend(send_check(world, l), 20, current_state);
|
|
||||||
cs_req.wait();
|
|
||||||
}
|
|
||||||
//Node 0 needs to tell other nodes to continue the same number of times
|
|
||||||
int dummy = 1;
|
|
||||||
for (int w = 1; w < world.size(); w++){
|
|
||||||
mpi::request dum_req = world.isend(w, 14, dummy);
|
|
||||||
dum_req.wait();
|
|
||||||
}
|
|
||||||
zero_queue++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//Execute Task 1
|
//Execute Task 1
|
||||||
@ -641,9 +633,6 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(world.rank() == 0)
|
|
||||||
zero_queue--;
|
|
||||||
|
|
||||||
//std::cout << "Node " << world.rank() << " moving to next iter" << std::endl;
|
//std::cout << "Node " << world.rank() << " moving to next iter" << std::endl;
|
||||||
} //while ends
|
} //while ends
|
||||||
|
|
||||||
@ -1114,13 +1103,13 @@ AGGenInstance &AGGen::single_generate(bool batch_process, int batch_num, int num
|
|||||||
f_alpha = 0.0;
|
f_alpha = 0.0;
|
||||||
|
|
||||||
if (f_alpha >= (mem_threshold/2)) {
|
if (f_alpha >= (mem_threshold/2)) {
|
||||||
std::cout << "Frontier Alpha prior to database storing: " << f_alpha << std::endl;
|
//std::cout << "Frontier Alpha prior to database storing: " << f_alpha << std::endl;
|
||||||
save_unexplored_to_db(new_state);
|
save_unexplored_to_db(new_state);
|
||||||
if (!localFrontier.empty())
|
if (!localFrontier.empty())
|
||||||
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
|
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
|
||||||
else
|
else
|
||||||
f_alpha = 0;
|
f_alpha = 0;
|
||||||
std::cout << "Frontier Alpha after database storing: " << f_alpha << std::endl;
|
//std::cout << "Frontier Alpha after database storing: " << f_alpha << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
//Store new state in database to ensure proper ordering of the FIFO queue
|
//Store new state in database to ensure proper ordering of the FIFO queue
|
||||||
@ -1134,7 +1123,7 @@ AGGenInstance &AGGen::single_generate(bool batch_process, int batch_num, int num
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (i_alpha >= mem_threshold/2){
|
if (i_alpha >= mem_threshold/2){
|
||||||
std::cout << "Instance Alpha prior to database storing: " << i_alpha << std::endl;
|
//std::cout << "Instance Alpha prior to database storing: " << i_alpha << std::endl;
|
||||||
save_ag_to_db(instance, true);
|
save_ag_to_db(instance, true);
|
||||||
|
|
||||||
//Clear vectors and free memory
|
//Clear vectors and free memory
|
||||||
@ -1146,7 +1135,7 @@ AGGenInstance &AGGen::single_generate(bool batch_process, int batch_num, int num
|
|||||||
sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\
|
sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\
|
||||||
sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size()));
|
sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size()));
|
||||||
i_alpha = i_usage/tot_sys_mem;
|
i_alpha = i_usage/tot_sys_mem;
|
||||||
std::cout << "Instance Alpha after database storing: " << i_alpha << std::endl;
|
//std::cout << "Instance Alpha after database storing: " << i_alpha << std::endl;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user