#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "../util/db_functions.h" #include "../util/avail_mem.h" #include "../util/odometer.h" #include "serialize_tuple.h" #include "../ag_gen/ag_gen.h" #include "tasks.h" namespace mpi = boost::mpi; void task_zero(AGGenInstance &instance, std::deque &localFrontier, double mem_threshold) { //std::cout << "Frontier empty, retrieving from database" << std::endl; double f_alpha = 0.0; double total_tt = 0.0; struct timeval tt1,tt2; int retrv_counter = 0; auto tot_sys_mem = getTotalSystemMemory(); gettimeofday(&tt1,NULL); //TODO: One (or a few) larger queries to pull in new states, rather than single queries that pull states one-by-one do { NetworkState db_new_state = fetch_unexplored(instance.facts); localFrontier.emplace_front(db_new_state); //alpha = get_alpha(); f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; retrv_counter += 1; } //Leave a 30% buffer in alpha while((f_alpha <= (mem_threshold * 0.7)) && !unex_empty()); gettimeofday(&tt2,NULL); total_tt+=(tt2.tv_sec-tt1.tv_sec)*1000.0+(tt2.tv_usec-tt1.tv_usec)/1000.0; //printf("Retrieving from db took %lf s.\n", total_tt); } void task_one(AGGenInstance &instance, NetworkState ¤t_state,\ std::vector &exploit_list, std::unordered_map> &od_map,\ int alloc, int two_alloc, int reduc_factor, int num_tasks, mpi::communicator &world,\ mpi::communicator &tcomm){ std::vector> appl_exploits; unsigned long esize = exploit_list.size(); //Distribute work to all nodes for (size_t i = 0; i < esize; i++) {//for loop for applicable exploits starts if (i % alloc != tcomm.rank()) continue; auto e = exploit_list.at(i); size_t num_params = e.get_num_params(); auto preconds_q = e.precond_list_q(); auto preconds_t = e.precond_list_t(); auto perms = od_map[num_params]; std::vector asset_groups; for (auto perm : perms) { std::vector asset_group_quals; std::vector asset_group_topos; asset_group_quals.reserve(preconds_q.size()); asset_group_topos.reserve(preconds_t.size()); for (auto &precond : preconds_q) { asset_group_quals.emplace_back( perm[precond.get_param_num()], precond.name, precond.op, precond.value, instance.facts); } for (auto &precond : preconds_t) { auto dir = precond.get_dir(); auto prop = precond.get_property(); auto op = precond.get_operation(); auto val = precond.get_value(); asset_group_topos.emplace_back( perm[precond.get_from_param()], perm[precond.get_to_param()], dir, prop, op, val, instance.facts); } asset_groups.emplace_back(asset_group_quals, asset_group_topos, perm); } auto assetgroup_size = asset_groups.size(); for (size_t j = 0; j < assetgroup_size; j++) { auto asset_group = asset_groups.at(j); for (auto &quality : asset_group.get_hypo_quals()) { if (!current_state.get_factbase().find_quality(quality)) { goto LOOPCONTINUE1; } } for (auto &topology : asset_group.get_hypo_topos()) { if (!current_state.get_factbase().find_topology(topology)) { goto LOOPCONTINUE1; } } { auto new_appl_exploit = std::make_tuple(e, asset_group); appl_exploits.push_back(new_appl_exploit); } LOOPCONTINUE1:; } } //for loop for applicable exploits ends //Less nodes allocated to task 2 than task 1. //Distribute the appl_exploit list from the extra node in task 1 to all other nodes in this task if (alloc > two_alloc){ //Need to tell other nodes that we have data so they know to expect it int has_data; if (world.rank() == alloc){ if (appl_exploits.size() > 0){ has_data = 1; } else{ has_data = 0; } } //Convert world.rank() of alloc to tcomm rank by -1 broadcast(tcomm, has_data, alloc-1); if(has_data==1){ std::vector>> sub_partials; std::vector> partial_appl_exploits; if (world.rank() == alloc){ size_t num_data = appl_exploits.size()/(tcomm.size()-1); size_t remain = appl_exploits.size()%(tcomm.size()-1); int begin = 0; int end = 0; for (int i = 0; i < std::min((int)(tcomm.size()-1), (int)appl_exploits.size()); ++i){ //if our remainder hasn't been depleted, add num_data + 1 from remain, then decrement, otherwise just num_data end += (remain > 0) ? (num_data + !!(remain--)) : num_data; sub_partials.push_back(std::vector>(appl_exploits.begin()+begin, appl_exploits.begin()+end)); begin = end; if(!sub_partials.at(i).empty()){ mpi::request scatter_req = tcomm.isend(i, 21, sub_partials.at(i)); scatter_req.wait(); } } } tcomm.barrier(); if(world.rank() < alloc){ if(tcomm.iprobe(alloc-1, 21)){ tcomm.recv(alloc-1, 21, partial_appl_exploits); appl_exploits.insert(std::end(appl_exploits), std::begin(partial_appl_exploits), std::end(partial_appl_exploits)); } } } } //Send Applicable Exploit List int skip_greatest = 0; if (alloc > two_alloc) skip_greatest = 1; if(world.rank() <= (alloc - skip_greatest)){ mpi::request appl_req = world.isend(send_check(world, world.rank() + alloc -1), 30, appl_exploits); mpi::request cs_req = world.isend(send_check(world, world.rank() + alloc -1), 40, current_state); appl_req.wait(); cs_req.wait(); } } void task_two(AGGenInstance &instance, int alloc, int two_alloc, boost::mpi::communicator &world,\ std::deque &localFrontier, double mem_threshold, boost::mpi::communicator &ttwo_comm,\ std::vector ex_groups, std::unordered_map &hash_map){ NetworkState current_state; std::vector> appl_exploits; world.recv(mpi::any_source, 30, appl_exploits); world.recv(mpi::any_source, 40, current_state); //Check for new fact and new state that caused an update in the hash table and facts while(world.iprobe(mpi::any_source, 3) || world.iprobe(mpi::any_source, 4)){ NetworkState update_state; Quality update_fact; world.recv(mpi::any_source, 3, update_state); world.recv(mpi::any_source, 4, update_fact); instance.facts.hash_table[update_state.compound_assign(update_fact)]=instance.facts.size(); instance.facts.length++; instance.facts.str_vector.push_back(update_state.compound_assign(update_fact)); } std::vector> partial_appl_exploits; auto current_hash = current_state.get_hash(instance.facts); std::map group_fired; //Map to hold fired status per group std::map>> sync_vectors; //Map to hold all group exploits for (auto map_group : ex_groups) { group_fired.insert(std::pair (map_group, 0)); } std::string egroup; //Build up the map of synchronous fire exploits and send to 0th node of task 2 for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end();){ auto e = *itr; egroup = std::get<0>(e).get_group(); int tflag = 0; if (egroup != "null"){ sync_vectors[egroup].push_back(e); if(ttwo_comm.rank() != 0){ itr = appl_exploits.erase(itr); tflag = 1; } } if(tflag == 0) itr++; } if(ttwo_comm.rank() != 0){ for (auto map_group : ex_groups){ if(!sync_vectors[map_group].empty()){ mpi::request sync_req = ttwo_comm.isend(0, 9, std::make_tuple(map_group, sync_vectors[map_group])); sync_req.wait(); sync_vectors[map_group].clear(); } } } ttwo_comm.barrier(); if(ttwo_comm.rank() == 0 && ttwo_comm.size() > 1){ while(ttwo_comm.iprobe(mpi::any_source, 9)){ std::tuple>> sync_recv; ttwo_comm.recv(mpi::any_source, 9, sync_recv); partial_appl_exploits = std::get<1>(sync_recv); appl_exploits.insert(std::end(appl_exploits), std::begin(partial_appl_exploits), std::end(partial_appl_exploits)); sync_vectors[std::get<0>(sync_recv)].insert(std::end(sync_vectors[std::get<0>(sync_recv)]), std::begin(partial_appl_exploits), std::end(partial_appl_exploits)); } } //loop through the vector for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){ //Check for new fact and new state that caused an update in the hash table and facts while(world.iprobe(mpi::any_source, 3) || world.iprobe(mpi::any_source, 4)){ NetworkState update_state; Quality update_fact; world.recv(mpi::any_source, 3, update_state); world.recv(mpi::any_source, 4, update_fact); //Update instance.facts.hash_table[update_state.compound_assign(update_fact)]=instance.facts.size(); instance.facts.length++; instance.facts.str_vector.push_back(update_state.compound_assign(update_fact)); } auto e = *itr; auto exploit = std::get<0>(e); auto assetGroup = std::get<1>(e); egroup=exploit.get_group(); if ((egroup != "null" && group_fired[egroup] == 0) || egroup == "null"){ NetworkState new_state{current_state}; std::vector> sync_exploits; if (egroup == "null") sync_exploits.push_back(e); else { sync_exploits = sync_vectors[egroup]; //TODO: Does not work if only some assets belong to a group. This only works if //all assets are in the group if(sync_exploits.size() < instance.assets.size()){ continue; } } for(auto sync_itr=sync_exploits.begin(); sync_itr!=sync_exploits.end(); sync_itr++){ e = *sync_itr; exploit = std::get<0>(e); egroup=exploit.get_group(); assetGroup = std::get<1>(e); group_fired[egroup] = 1; auto postconditions = createPostConditions(e, instance.facts); auto qualities = std::get<0>(postconditions); auto topologies = std::get<1>(postconditions); for(auto &qual : qualities) { auto action = std::get<0>(qual); auto fact = std::get<1>(qual); switch(action) { case ADD_T: new_state.add_quality(fact); break; case UPDATE_T: new_state.update_quality(fact); /* //TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess if(fact.get_op()=="+="){ std::unordered_map::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact)); //If the value is not already in the hash_table, insert it. //Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it //As a result, you have to manually add it. if(got==instance.facts.hash_table.end()){ instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size(); instance.facts.length++; instance.facts.str_vector.push_back(new_state.compound_assign(fact)); //Update ALL nodes (include ttwo_comm nodes) with new data for (int w = 0; w < world.size(); w++) { if(w != world.rank()) { mpi::request ns_req = world.isend(w, 3, new_state); mpi::request fact_req = world.isend(w, 4, fact); ns_req.wait(); fact_req.wait(); } } } } */ break; case DELETE_T: new_state.delete_quality(fact); break; } } for(auto &topo : topologies) { auto action = std::get<0>(topo); auto fact = std::get<1>(topo); switch(action) { case ADD_T: new_state.add_topology(fact); break; case UPDATE_T: new_state.update_topology(fact); break; case DELETE_T: new_state.delete_topology(fact); break; } } }//Sync. Fire for auto hash_num = new_state.get_hash(instance.facts); if (hash_num == current_hash){ continue; } //<6 Node Edge Case Prevention: Node 0 unable to execute task 3 if(world.rank() != 0){ //std::cout << "Node " << world.rank() << " sending new state data to Node 0" << std::endl; mpi::request ns_req = world.isend(0, 5, new_state); mpi::request cs_req = world.isend(0, 6, current_state); mpi::request ex_req = world.isend(0, 10, exploit); mpi::request ag_req = world.isend(0, 11, assetGroup); ns_req.wait(); cs_req.wait(); ex_req.wait(); ag_req.wait(); } else { int last_known_id; if(instance.factbases.size() == 0){ last_known_id = 0; } else last_known_id = instance.factbases.size()-1; if(get_num_states() > 1) last_known_id += get_num_states()-1; task_three(instance, new_state, localFrontier, mem_threshold, world,\ alloc, two_alloc, current_state, exploit, assetGroup, hash_map, last_known_id); } } else{ continue; } } } void task_three(AGGenInstance &instance, NetworkState &new_state, std::deque &localFrontier,\ double mem_threshold, boost::mpi::communicator &world, int alloc, int two_alloc, NetworkState ¤t_state,\ Exploit &exploit, AssetGroup &assetGroup, std::unordered_map &hash_map, int last_known_id){ auto hash_num = new_state.get_hash(instance.facts); //although local frontier is updated, the global hash is also updated to avoid testing on explored states. if (hash_map.find(hash_num) == hash_map.end()) { new_state.reset_curr_id(last_known_id+1); new_state.set_id(); auto new_id = new_state.get_id(); last_known_id = new_id; auto facts_tuple = new_state.get_factbase().get_facts_tuple(); //FactbaseItems new_items = std::make_tuple(facts_tuple, new_state.get_id()); //instance.factbase_items.push_back(new_items); instance.factbases.push_back(new_state.get_factbase()); hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id())); //See memory usage. If it exceeds the threshold, store new states in the DB double i_alpha = 0.0; //Get the most recent Factbase's size * total number of factbases, rough approximation of *2 to account for factbase_items double i_usage = instance.factbases.back().get_size() * instance.factbases.size() * 2 + sizeof(instance.edges[0]) * instance.edges.size(); auto tot_sys_mem = getTotalSystemMemory(); i_alpha = i_usage/tot_sys_mem; double f_alpha; if (!localFrontier.empty()) f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; else f_alpha = 0.0; if (f_alpha >= (mem_threshold/2)) { mpi::request ns_req = world.isend(send_check(world, alloc+two_alloc+1), 50, new_state); ns_req.wait(); //save_unexplored_to_db(new_state); if (!localFrontier.empty()) f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; else f_alpha = 0; } //Store new state in database to ensure proper ordering of the FIFO queue else if (!unex_empty()){ mpi::request ns_req = world.isend(send_check(world, alloc+two_alloc+1), 50, new_state); ns_req.wait(); //save_unexplored_to_db(new_state); } //Otherwise, we can just store in memory else { localFrontier.emplace_front(new_state); } if (i_alpha >= mem_threshold/2){ mpi::request fb_req = world.isend(send_check(world, alloc+two_alloc), 7, instance.factbases); edge_adjustment(instance, hash_map); mpi::request ed_req = world.isend(send_check(world, alloc+two_alloc), 8, instance.edges); //save_ag_to_db(instance, true); fb_req.wait(); ed_req.wait(); //Clear vectors and free memory std::vector().swap(instance.factbases); std::vector().swap(instance.factbase_items); std::vector().swap(instance.edges); i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\ sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\ sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size())); i_alpha = i_usage/tot_sys_mem; } //Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup); Edge ed(current_state.get_hash(instance.facts), new_state.get_hash(instance.facts), exploit, assetGroup); ed.set_id(); instance.edges.push_back(ed); } //END if (hash_map.find(hash_num) == hash_map.end()) else{ int id = hash_map[hash_num]; //Edge ed(current_state.get_id(), id, exploit, assetGroup); Edge ed(current_state.get_hash(instance.facts), hash_num, exploit, assetGroup); ed.set_id(); instance.edges.push_back(ed); } } int send_check(boost::mpi::communicator &world, int curr_node){ int send_to = curr_node + 1; if (curr_node >= world.size()-1) send_to = 0; return send_to; } void state_merge(std::vector &node_factbases, std::vector &node_edges,\ std::unordered_map &hash_map, AGGenInstance &instance, double mem_threshold,\ mpi::communicator &world){ auto tot_sys_mem = getTotalSystemMemory(); for(auto &fb : node_factbases){ auto hash_num = fb.hash(instance.facts); //although local frontier is updated, the global hash is also updated to avoid testing on explored states. if (hash_map.find(hash_num) == hash_map.end()) { //Each MPI Node has its own copy of states and factbases: we need to correct it with the Root's IDs int reset_num = instance.factbases.size(); if(get_num_states() > 1) reset_num += get_num_states()-1; fb.reset_curr_id(reset_num); fb.set_id(); instance.factbases.push_back(fb); hash_map.insert(std::make_pair(hash_num, fb.get_id())); //See memory usage. If it exceeds the threshold, store new states in the DB double i_alpha = 0.0; //Get the most recent Factbase's size * total number of factbases, rough approximation of *2 to account for factbase_items double i_usage = instance.factbases.back().get_size() * instance.factbases.size() * 2 + sizeof(instance.edges[0]) * instance.edges.size(); if (i_alpha >= mem_threshold/2){ edge_adjustment(instance, hash_map); mpi::request fb_req = world.isend(1, 7, instance.factbases); mpi::request ed_req = world.isend(1, 8, instance.edges); //save_ag_to_db(instance, true); fb_req.wait(); ed_req.wait(); //Clear vectors and free memory std::vector().swap(instance.factbases); std::vector().swap(instance.factbase_items); std::vector().swap(instance.edges); i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\ sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\ sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size())); i_alpha = i_usage/tot_sys_mem; } } else{ instance.duplicates++; } } //Note: This does add duplicates. Having one duplicate removal sweep at the end is preferable to duplicate checking //every time we want to push new edges back on the frontier - verified through timing for both small and large networks. instance.edges.insert(std::end(instance.edges), std::begin(node_edges), std::end(node_edges)); } //Convert Edge to_hash and from_hash to to_id and from_id void edge_adjustment(AGGenInstance &instance, std::unordered_map &hash_map) { // 2 Loops: Can't convert hash->ID while simultaneously checking for duplicates for (auto &ed : instance.edges){ auto itr = hash_map.find(ed.get_to_id()); if(itr != hash_map.end()){ auto to_id = std::get<1>(*itr); ed.force_to_id(to_id); } auto itr2 = hash_map.find(ed.get_from_id()); if(itr2 != hash_map.end()){ auto from_id = std::get<1>(*itr2); ed.force_from_id(from_id); } } //Remove duplicates //Poor Approach, but functional. Examine replacing instance.edges with a set instead of a vector //Tmp Vector std::vector clean; int i = instance.edges.size() + get_num_edges(); for(auto itr = instance.edges.begin(); itr != instance.edges.end();){ int seen = 0; for(auto eitr = clean.begin(); eitr != clean.end(); eitr++){ if((*eitr).operator==(*itr)){ seen = 1; break; } } if(seen == 1){ //Remove duplicate itr = instance.edges.erase(itr); } else{ clean.push_back(*itr); (*itr).force_set_id(i); i++; itr++; } } }