diff --git a/build/ag_gen b/build/ag_gen index 05c51ee..8b59d6c 100755 Binary files a/build/ag_gen and b/build/ag_gen differ diff --git a/build/run.sh b/build/run.sh index e2e2d1f..ba3e55d 100755 --- a/build/run.sh +++ b/build/run.sh @@ -62,7 +62,7 @@ if [ "$TYPE" == "$strval1" ]; then #6 Exploit Option #mpiexec --mca btl_openib_allow_ib 1 --mca btl openib,self,vader --mca opal_warn_on_missing_libcuda 0 --bind-to numa --map-by numa -np "$NODES" --timeout 129600 ./ag_gen -n ../Oct_2021/nm_files/"$CARS"_car_timeline_maintenance.nm -x ../Oct_2021/Sync/6_Exploits/"$NUM_SERV"_Serv/sync_timeline_maintenance.xp -t "$NUM_THREADS" -q 1 -p -a 0.6 -z "$DBNAME" -s -l 20 else - mpiexec --mca btl_openib_allow_ib 1 --mca opal_warn_on_missing_libcuda 0 --bind-to numa --map-by numa -np "$NODES" --timeout 129600 ./ag_gen -n ../Oct_2021/nm_files/"$CARS"_car_timeline_maintenance.nm -x ../Oct_2021/Sync/4_Exploits/"$NUM_SERV"_Serv/sync_timeline_maintenance.xp -t "$NUM_THREADS" -q 1 -p -a 0.6 -g DOTFILE.dot -z "$DBNAME" -l 20 + mpiexec --mca btl_openib_allow_ib 1 --mca opal_warn_on_missing_libcuda 0 --bind-to numa --map-by numa -np "$NODES" --timeout 129600 ./ag_gen -n ../Oct_2021/nm_files/"$CARS"_car_timeline_maintenance.nm -x ../Oct_2021/Sync/4_Exploits/"$NUM_SERV"_Serv/sync_timeline_maintenance.xp -t "$NUM_THREADS" -q 1 -p -a 0.6 -g DOTFILE.dot -z "$DBNAME" -l 20 -e fi # 4 Exploit #mpiexec -np "$NODES" --bind-to numa --map-by numa ./ag_gen -n ../Oct_2021/nm_files/"$CARS"_car_timeline_maintenance.nm -x ../Oct_2021/Sync/4_Exploits/"$NUM_SERV"_Serv/sync_timeline_maintenance.xp -t 1 -q 1 -p -a 0.6 -z "$DBNAME" diff --git a/src/ag_gen/ag_gen.cpp b/src/ag_gen/ag_gen.cpp index 35e9982..cab91db 100755 --- a/src/ag_gen/ag_gen.cpp +++ b/src/ag_gen/ag_gen.cpp @@ -216,7 +216,7 @@ double mem_threshold, mpi::communicator &world) double total_t=0.0; //unti:ms - double total_task0, total_task1, total_task2, total_task3, total_task4 = 0.0; + double total_task0, total_task1, total_task2, total_task3, total_task4, total_task5, total_task6 = 0.0; struct timeval t1,t2; gettimeofday(&t1,NULL); @@ -224,7 +224,7 @@ double mem_threshold, mpi::communicator &world) int num_tasks = 6; #pragma omp parallel for num_threads(numThrd) default(none) shared(esize,counter,\ exploit_list,od_map,frt_size,total_t,t1,t2,std::cout, mem_threshold, num_tasks,\ - ex_groups, world, total_task0, total_task1, total_task2, total_task3, total_task4, last_known_id) schedule(dynamic,1) + ex_groups, world, total_task0, total_task1, total_task2, total_task3, total_task4, total_task5, last_known_id) schedule(dynamic,1) //auto ag_start = std::chrono::system_clock::now(); for(int k=0;k 0.0) world.isend(0, 27, total_task4); + if (total_task5 > 0.0) + world.isend(0, 28, total_task5); } world.barrier(); - //Don't really care about each node's time, we just want a time (node 1 time ~= node 2 time) - //Just grab any value. if(world.rank() == 0){ - if(world.iprobe(mpi::any_source, 24)) - world.recv(mpi::any_source, 24, total_task1); - if(world.iprobe(mpi::any_source, 25)) - world.recv(mpi::any_source, 25, total_task2); - if(world.iprobe(mpi::any_source, 26)) - world.recv(mpi::any_source, 26, total_task3); - if(world.iprobe(mpi::any_source, 27)) - world.recv(mpi::any_source, 27, total_task4); + double task1_count, task2_count, task3_count, task4_count, task5_count = 0; + while(world.iprobe(mpi::any_source, 24)){ + double tmp; + world.recv(mpi::any_source, 24, tmp); + total_task1 += tmp; + task1_count++; + } + while(world.iprobe(mpi::any_source, 25)){ + double tmp; + world.recv(mpi::any_source, 25, tmp); + total_task2 += tmp; + task2_count++; + } + while(world.iprobe(mpi::any_source, 26)){ + double tmp; + world.recv(mpi::any_source, 26, tmp); + total_task3 += tmp; + task3_count++; + } + while(world.iprobe(mpi::any_source, 27)){ + double tmp; + world.recv(mpi::any_source, 27, tmp); + total_task4 += tmp; + task4_count++; + } + while(world.iprobe(mpi::any_source, 28)){ + double tmp; + world.recv(mpi::any_source, 28, tmp); + total_task5 += tmp; + task5_count++; + } - std::cout << "Task 0 time: " << total_task0 << "ms" << std::endl; - std::cout << "Task 1 time: " << total_task1 << "ms" << std::endl; - std::cout << "Task 2 time: " << total_task2 << "ms" << std::endl; - std::cout << "Task 3 time: " << total_task3 << "ms" << std::endl; - std::cout << "Task 4 time: " << total_task4 << "ms" << std::endl; + std::cout << "Average Task 0 time: " << total_task0 << "ms" << std::endl; + std::cout << "Average Task 1 time: " << total_task1/task1_count << "ms" << std::endl; + std::cout << "Average Task 2 time: " << total_task2/task2_count << "ms" << std::endl; + std::cout << "Average Task 3 time: " << total_task3/task3_count << "ms" << std::endl; + std::cout << "Average Task 4 time: " << total_task4/task4_count << "ms" << std::endl; + std::cout << "Average Task 5 time: " << total_task5/task5_count << "ms" << std::endl; + + edge_adjustment(instance, hash_map); } std::chrono::duration elapsed_seconds = end - start; @@ -962,6 +992,7 @@ int initQSize, double mem_threshold, mpi::communicator &world) } if (i_alpha >= mem_threshold/2){ + edge_adjustment(instance, hash_map); save_ag_to_db(instance, true); //Clear vectors and free memory @@ -1080,8 +1111,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) int ns_inc = 0; int first_loop = 1; int msg_sends = 0; - NetworkState buffer_state; - Quality buffer_fact; + std::unordered_set front_expl; std::random_device rd; // obtain a random number from hardware @@ -1091,15 +1121,23 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) mpi::communicator work_comm = world.split(world.rank() != 1); //std::unordered_set localFrontier_seen; - int last_known_id; + int last_known_id = 0; //Send new Network State to all worker nodes, if we have enough unex states to do so if(world.rank() == 0){ //2 offset for root node and db node for (int w = 0; w < std::min((int)world.size()-2, (int)localFrontier.size()); w++){ - last_known_id = localFrontier.front().get_id(); - mpi::request state_req = world.isend(w+2, 1, localFrontier.front()); + //last_known_id = localFrontier.front().get_id(); + if(world.size() >= 3){ + mpi::request state_req = world.isend(w+2, 1, localFrontier.front()); + state_req.wait(); + + } + else{ + mpi::request state_req = world.isend(w, 1, localFrontier.front()); + state_req.wait(); + + } localFrontier.pop_front(); - state_req.wait(); msg_sends++; } } @@ -1260,36 +1298,6 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) break; case UPDATE_T: new_state.update_quality(fact); - /* - //TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess - if(fact.get_op()=="+="){ - - std::unordered_map::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact)); - - //If the value is not already in the hash_table, insert it. - //Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it - //As a result, you have to manually add it. - if(got==instance.facts.hash_table.end()){ - instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size(); - instance.facts.length++; - instance.facts.str_vector.push_back(new_state.compound_assign(fact)); - //buffer_state = new_state; - //buffer_fact = fact; - - //Update ALL nodes (include ttwo_comm nodes) with new data - for (int w = 0; w < world.size(); w++) - { - if(w != world.rank() && w != 1) - { - requests[0] = world.isend(w, 5, new_state); - requests[1] = world.isend(w, 6, fact); - //ns_req.wait(); - //fact_req.wait(); - } - } - } - } - */ break; case DELETE_T: new_state.delete_quality(fact); @@ -1327,14 +1335,16 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id())); localFrontier.emplace_front(new_state); - Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup); + //Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup); + Edge ed(current_state.get_hash(instance.facts), new_state.get_hash(instance.facts), exploit, assetGroup); ed.set_id(); instance.edges.push_back(ed); } //END if (hash_map.find(hash_num) == hash_map.end()) else { auto id = hash_map[hash_num]; - Edge ed(current_state.get_id(), id, exploit, assetGroup); + //Edge ed(current_state.get_id(), id, exploit, assetGroup); + Edge ed(current_state.get_hash(instance.facts), hash_num, exploit, assetGroup); ed.set_id(); instance.edges.push_back(ed); } @@ -1370,12 +1380,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) } //Check for new fact and new state that caused an update in the hash table and facts - //Note: Had difficulties using while with probes. Sloppy solution at time of writing: if and breaks. - //while(world.iprobe(mpi::any_source, 5) || world.iprobe(mpi::any_source, 6)){ - while(true){ - if(!world.iprobe(mpi::any_source, 5) && !world.iprobe(mpi::any_source, 6)){ - break; - } + while(world.iprobe(mpi::any_source, 5) || world.iprobe(mpi::any_source, 6)){ NetworkState update_state; Quality update_fact; world.recv(mpi::any_source, 5, update_state); @@ -1414,10 +1419,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) } //Check for frontier storage requests - while(true){ - if(!world.iprobe(0, 50)){ - break; - } + while(world.iprobe(0, 50)){ NetworkState save_state; world.recv(0, 50, save_state); save_unexplored_to_db(save_state); @@ -1433,11 +1435,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) std::vector node_edges; //Receive states and edges when nodes finish while(finish_count != msg_sends){ - while(true){ - if(!world.iprobe(mpi::any_source, 10) && !world.iprobe(mpi::any_source, 11)){ - break; - } - + while(world.iprobe(mpi::any_source, 10) || world.iprobe(mpi::any_source, 11)){ if(world.iprobe(mpi::any_source, 10)){ world.recv(mpi::any_source, 10, node_factbases); } @@ -1445,8 +1443,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) world.recv(mpi::any_source, 11, node_edges); } if(!node_factbases.empty() || !node_edges.empty()){ - state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world, last_known_id); - last_known_id = instance.factbases.size()-1; + state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world); } } @@ -1460,10 +1457,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) } //Check for new fact and new state that caused an update in the hash table and facts - while(true){ - if(!world.iprobe(mpi::any_source, 5) || !world.iprobe(mpi::any_source, 6)){ - break; - } + while(world.iprobe(mpi::any_source, 5) || world.iprobe(mpi::any_source, 6)){ NetworkState update_state; Quality update_fact; @@ -1478,10 +1472,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) } //Check for Hash Table updates - while(true){ - if(!world.iprobe(mpi::any_source, 5) && !world.iprobe(mpi::any_source, 6)){ - break; - } + while(world.iprobe(mpi::any_source, 5) || world.iprobe(mpi::any_source, 6)){ NetworkState update_state; Quality update_fact; @@ -1494,10 +1485,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) } //Check for factbases or nodes, and add them to our network - while(true){ - if(!world.iprobe(mpi::any_source, 10) && !world.iprobe(mpi::any_source, 11)){ - break; - } + while(world.iprobe(mpi::any_source, 10) || world.iprobe(mpi::any_source, 11)){ if(world.iprobe(mpi::any_source, 10)){ world.recv(mpi::any_source, 10, node_factbases); } @@ -1505,11 +1493,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) world.recv(mpi::any_source, 11, node_edges); } if(!node_factbases.empty() || !node_edges.empty()){ - state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world, last_known_id); - last_known_id = instance.factbases.size()-1; - } - else{ - break; + state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world); } } @@ -1521,7 +1505,6 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) continue; } } - //if(world.iprobe(w, 3)){ if(world.iprobe(w, 3)){ std::deque nodeFrontier; world.recv(w, 3, nodeFrontier); @@ -1636,6 +1619,7 @@ int initQSize, double mem_threshold, mpi::communicator &world, int state_limit) gettimeofday(&t2,NULL); total_t+=(t2.tv_sec-t1.tv_sec)*1000.0+(t2.tv_usec-t1.tv_usec)/1000.0; printf("AG TOOK %lf ms.\n", total_t); + edge_adjustment(instance, hash_map); auto end = std::chrono::system_clock::now(); diff --git a/src/ag_gen/edge.cpp b/src/ag_gen/edge.cpp index 1dbb7c3..99276f5 100755 --- a/src/ag_gen/edge.cpp +++ b/src/ag_gen/edge.cpp @@ -23,6 +23,9 @@ Edge::Edge(int iFrom, int iTo, Exploit &ex, AssetGroup &ag) : from_node(iFrom), to_node(iTo), exploit(ex), assetGroup(ag), deleted(false) {} +Edge::Edge(unsigned long iFrom, unsigned long iTo, Exploit &ex, AssetGroup &ag) + : from_node(iFrom), to_node(iTo), exploit(ex), assetGroup(ag), deleted(false) {} + Edge::Edge() { @@ -44,7 +47,7 @@ bool Edge::is_deleted() { return deleted; } //} -int Edge::get_from_id() +unsigned long Edge::get_from_id() { return from_node; } @@ -56,7 +59,7 @@ int Edge::get_from_id() //} -int Edge::get_to_id() +unsigned long Edge::get_to_id() { return to_node; } @@ -86,11 +89,11 @@ int Edge::set_id() { return id; } -void Edge::force_from_id(int i) { +void Edge::force_from_id(unsigned long i) { from_node = i; } -void Edge::force_to_id(int i) { +void Edge::force_to_id(unsigned long i) { to_node = i; } diff --git a/src/ag_gen/edge.h b/src/ag_gen/edge.h index 2233187..8885fec 100755 --- a/src/ag_gen/edge.h +++ b/src/ag_gen/edge.h @@ -23,12 +23,13 @@ /** Edge class * @brief Edge of the graph based on integer id. */ -/* class Edge { static int edge_current_id; int id; - int from_node; - int to_node; + //int from_node; + //int to_node; + unsigned long from_node; + unsigned long to_node; Exploit exploit; AssetGroup assetGroup; bool deleted; @@ -43,6 +44,7 @@ class Edge { public: Edge(int, int, Exploit &, AssetGroup &); + Edge(unsigned long, unsigned long, Exploit &, AssetGroup &); Edge(); std::string get_query(); @@ -50,45 +52,12 @@ class Edge { int get_id(); int set_id(); - int get_from_id(); - int get_to_id(); - int get_exploit_id(); - void set_deleted(); - bool is_deleted(); -}; -*/ - -// Edge class based on hash -class Edge { - static int edge_current_id; - int id; - int from_node; - int to_node; - Exploit exploit; - AssetGroup assetGroup; - bool deleted; - - friend std::ostream & operator << (std::ostream &os, const Edge &ed); - friend class boost::serialization::access; - - template - void serialize(Archive &ar, const unsigned int version){ - ar & edge_current_id & id & from_node & to_node & exploit & assetGroup & deleted; - } - - public: - Edge(int, int, Exploit &, AssetGroup &); - Edge(); - - std::string get_query(); - std::string get_asset_query(); - - int get_id(); - int set_id(); - int get_from_id(); - int get_to_id(); - void force_from_id(int i); - void force_to_id(int i); + //int get_from_id(); + //int get_to_id(); + unsigned long get_from_id(); + unsigned long get_to_id(); + void force_from_id(unsigned long i); + void force_to_id(unsigned long i); int get_exploit_id(); void set_deleted(); bool is_deleted(); diff --git a/src/ag_gen/network_state.cpp b/src/ag_gen/network_state.cpp index 8474693..a44f73e 100755 --- a/src/ag_gen/network_state.cpp +++ b/src/ag_gen/network_state.cpp @@ -111,22 +111,33 @@ void NetworkState::update_quality(Quality &q) { auto attr = q.get_name(); auto val = q.get_value(); auto op = q.get_op(); - //std::cout<<"OP" << op<::const_iterator got = kv_facts.hash_table.find(value); if(got == kv_facts.hash_table.end()){ - kv_facts.hash_table[value] = kv_facts.length; - kv_facts.str_vector.push_back(value); - kv_facts.length++; + //kv_facts.hash_table[value] = kv_facts.length; + //Reasoning: Populate fills table with all string values. If we run across an instance where + //we don't have a value, we know that it must be due to a compound operator, so we can safely assign it its own value. + kv_facts.hash_table[value] = stoi(value); + kv_facts.str_vector.push_back(value); + kv_facts.length++; } qual.dec.val = kv_facts[value]; diff --git a/src/main.cpp b/src/main.cpp index d476695..112d549 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -704,16 +704,16 @@ int main(int argc, char *argv[]) { //Serialization Unit Testing on Postinstance Data //serialization_unit_testing(postinstance, world); world.barrier(); - std::cout << "Finished generation." << std::endl; //std::cout << "# of edges " <= mem_threshold/2){ mpi::request fb_req = world.isend(send_check(world, alloc+two_alloc), 7, instance.factbases); + edge_adjustment(instance, hash_map); mpi::request ed_req = world.isend(send_check(world, alloc+two_alloc), 8, instance.edges); //save_ag_to_db(instance, true); fb_req.wait(); @@ -481,14 +482,16 @@ void task_three(AGGenInstance &instance, NetworkState &new_state, std::deque node_factbases, std::vector node_edges,\ std::unordered_map &hash_map, AGGenInstance &instance, double mem_threshold,\ - mpi::communicator &world, int last_known_id){ + mpi::communicator &world){ auto tot_sys_mem = getTotalSystemMemory(); - for(auto fb : node_factbases){ + for(auto &fb : node_factbases){ auto hash_num = fb.hash(instance.facts); //although local frontier is updated, the global hash is also updated to avoid testing on explored states. if (hash_map.find(hash_num) == hash_map.end()) { //Each MPI Node has its own copy of states and factbases: we need to correct it with the Root's IDs - auto old_id = fb.get_id(); - fb.reset_curr_id(last_known_id+1); + int reset_num = instance.factbases.size(); + if(get_num_states() > 1) + reset_num += get_num_states()-1; + fb.reset_curr_id(reset_num); fb.set_id(); - auto new_id = fb.get_id(); - last_known_id = new_id; - for(auto ed: node_edges){ - if (ed.get_from_id() == old_id){ - ed.force_from_id(new_id); - } - if (ed.get_to_id() == old_id){ - ed.force_to_id(new_id); - } - } instance.factbases.push_back(fb); hash_map.insert(std::make_pair(fb.hash(instance.facts), fb.get_id())); @@ -538,6 +533,7 @@ void state_merge(std::vector node_factbases, std::vector node_ed double i_usage = instance.factbases.back().get_size() * instance.factbases.size() * 2 + sizeof(instance.edges[0]) * instance.edges.size(); if (i_alpha >= mem_threshold/2){ + edge_adjustment(instance, hash_map); mpi::request fb_req = world.isend(1, 7, instance.factbases); mpi::request ed_req = world.isend(1, 8, instance.edges); //save_ag_to_db(instance, true); @@ -557,24 +553,32 @@ void state_merge(std::vector node_factbases, std::vector node_ed else{ //Each MPI Node has its own copy of states and factbases: we need to correct it with the Root's IDs - auto old_id = fb.get_id(); - fb.reset_curr_id(last_known_id+1); - fb.set_id(); - auto new_id = fb.get_id(); - last_known_id = new_id; - for(auto ed: node_edges){ - if (ed.get_from_id() == old_id){ - ed.force_from_id(new_id); - } - if (ed.get_to_id() == old_id){ - ed.force_to_id(new_id); - } - } + int reset_num = instance.factbases.size(); + if(get_num_states() > 1) + reset_num += get_num_states()-1; + fb.reset_curr_id(reset_num); + fb.set_id(); } } - //This does add duplicate edges - taken care of through graphviz' "strict" graphing for (auto ed : node_edges){ instance.edges.push_back(ed); } +} + +void edge_adjustment(AGGenInstance &instance, std::unordered_map &hash_map) +{ + for (auto &ed : instance.edges){ + auto itr = hash_map.find(ed.get_to_id()); + if(itr != hash_map.end()){ + auto to_id = std::get<1>(*itr); + ed.force_to_id(to_id); + } + + auto itr2 = hash_map.find(ed.get_from_id()); + if(itr2 != hash_map.end()){ + auto from_id = std::get<1>(*itr2); + ed.force_from_id(from_id); + } + } } \ No newline at end of file diff --git a/src/mpi/tasks.h b/src/mpi/tasks.h index 8811bbb..8e12e40 100644 --- a/src/mpi/tasks.h +++ b/src/mpi/tasks.h @@ -25,6 +25,8 @@ int send_check(boost::mpi::communicator &world, int curr_node); void state_merge(std::vector node_factbases, std::vector node_edges,\ std::unordered_map &hash_map, AGGenInstance &instance, double mem_threshold,\ - mpi::communicator &world, int last_known_id); + mpi::communicator &world); + +void edge_adjustment(AGGenInstance &instance, std::unordered_map &hash_map); #endif //TASKS_H