#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "../util/db_functions.h" #include "../util/avail_mem.h" #include "../util/odometer.h" #include "serialize_tuple.h" #include "../ag_gen/ag_gen.h" #include "tasks.h" void task_zero(AGGenInstance &instance, std::deque &localFrontier, double mem_threshold) { //std::cout << "Frontier empty, retrieving from database" << std::endl; double f_alpha = 0.0; double total_tt = 0.0; struct timeval tt1,tt2; int retrv_counter = 0; auto tot_sys_mem = getTotalSystemMemory(); gettimeofday(&tt1,NULL); //TODO: One (or a few) larger queries to pull in new states, rather than single queries that pull states one-by-one do { NetworkState db_new_state = fetch_unexplored(instance.facts); localFrontier.emplace_front(db_new_state); //alpha = get_alpha(); f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; retrv_counter += 1; } //Leave a 30% buffer in alpha while((f_alpha <= (mem_threshold * 0.7)) && !unex_empty()); //std::cout << "Retrieved " << retrv_counter << " factbases from the database." << std::endl; gettimeofday(&tt2,NULL); total_tt+=(tt2.tv_sec-tt1.tv_sec)*1000.0+(tt2.tv_usec-tt1.tv_usec)/1000.0; //printf("Retrieving from db took %lf s.\n", total_tt); } void task_one(AGGenInstance &instance, NetworkState ¤t_state,\ std::vector &exploit_list, std::unordered_map> &od_map,\ int alloc, int two_alloc, int reduc_factor, int num_tasks, boost::mpi::communicator &world,\ boost::mpi::communicator &tcomm){ std::cout << "Process rank " << world.rank() << " with " << alloc << " node(s) allocated has started Task 1." << std::endl; std::vector> appl_exploits; unsigned long esize = exploit_list.size(); //Distribute work to all nodes for (size_t i = 0; i < esize; i++) {//for loop for applicable exploits starts if (i % alloc != send_check(world, world.rank())) continue; auto e = exploit_list.at(i); size_t num_params = e.get_num_params(); auto preconds_q = e.precond_list_q(); auto preconds_t = e.precond_list_t(); auto perms = od_map[num_params]; std::vector asset_groups; for (auto perm : perms) { std::vector asset_group_quals; std::vector asset_group_topos; asset_group_quals.reserve(preconds_q.size()); asset_group_topos.reserve(preconds_t.size()); for (auto &precond : preconds_q) { asset_group_quals.emplace_back( perm[precond.get_param_num()], precond.name, precond.op, precond.value, instance.facts); } for (auto &precond : preconds_t) { auto dir = precond.get_dir(); auto prop = precond.get_property(); auto op = precond.get_operation(); auto val = precond.get_value(); asset_group_topos.emplace_back( perm[precond.get_from_param()], perm[precond.get_to_param()], dir, prop, op, val, instance.facts); } asset_groups.emplace_back(asset_group_quals, asset_group_topos, perm); } auto assetgroup_size = asset_groups.size(); for (size_t j = 0; j < assetgroup_size; j++) { auto asset_group = asset_groups.at(j); for (auto &quality : asset_group.get_hypo_quals()) { if (!current_state.get_factbase().find_quality(quality)) { goto LOOPCONTINUE1; } } for (auto &topology : asset_group.get_hypo_topos()) { if (!current_state.get_factbase().find_topology(topology)) { goto LOOPCONTINUE1; } } { auto new_appl_exploit = std::make_tuple(e, asset_group); appl_exploits.push_back(new_appl_exploit); } LOOPCONTINUE1:; } } //for loop for applicable exploits ends //Less nodes allocated to task 2 than task 1. //Distribute the appl_exploit list from the extra node in task 1 to all other nodes in this task if (two_alloc < alloc){ std::vector> partial_appl_exploits; mpi::scatter(tcomm, &appl_exploits, partial_appl_exploits, alloc); if(world.rank() < alloc){ for(auto itr=partial_appl_exploits.begin(); itr!=partial_appl_exploits.end(); itr++){ auto index_r=std::distance(partial_appl_exploits.begin(),itr); appl_exploits.push_back(partial_appl_exploits.at(index_r)); } } } //Send Applicable Exploit List int skip_greatest = 0; if (two_alloc < alloc) skip_greatest = 1; if(world.rank() <= alloc - skip_greatest){ std::cout << "Process " << world.rank() << " sending applicable exploit list of size " << appl_exploits.size() <<\ " to Process " << send_check(world, world.rank() + alloc -1) << std::endl; world.isend(send_check(world, world.rank() + alloc -1), 30, appl_exploits); world.isend(send_check(world, world.rank() + alloc -1), 40, current_state); } } void task_two(AGGenInstance &instance, int alloc, int two_alloc, boost::mpi::communicator &world,\ std::deque &localFrontier, double mem_threshold, boost::mpi::communicator &ttwo_comm,\ std::vector ex_groups, std::unordered_map &hash_map){ std::cout << "Process rank " << world.rank() << " with " << two_alloc <<\ " node(s) allocated has started Task 2 with local communicator rank of " << ttwo_comm.rank() << std::endl; NetworkState current_state; std::vector> appl_exploits; world.recv(mpi::any_source, 30, appl_exploits); world.recv(mpi::any_source, 40, current_state); std::cout << "Process " << world.rank() << " received Task 1 data." << std::endl; std::cout << "Appl exploit size " << appl_exploits.size() << std::endl; std::vector> partial_appl_exploits; if(ttwo_comm.size() > 1) mpi::scatter(ttwo_comm, &partial_appl_exploits, appl_exploits, 0); auto current_hash = current_state.get_hash(instance.facts); std::map group_fired; //Map to hold fired status per group std::map>> sync_vectors; //Map to hold all group exploits for (auto map_group : ex_groups) { group_fired.insert(std::pair (map_group, 0)); } std::string egroup; //Build up the map of synchronous fire exploits and send to 0th node of task 2 for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){ //auto e = appl_exploits.at(itr); auto e = *itr; egroup = std::get<0>(e).get_group(); if (egroup != "null"){ sync_vectors[egroup].push_back(e); } } if(ttwo_comm.rank() != 0){ for (auto map_group : ex_groups){ ttwo_comm.isend(0, 6, sync_vectors[egroup]); } for(auto itr = appl_exploits.begin(); itr != appl_exploits.end();){ egroup = std::get<0>(*itr).get_group(); if (egroup == "null"){ itr = appl_exploits.erase(itr); } else{ itr++; } } } if(ttwo_comm.rank() == 0 && ttwo_comm.size() > 1){ for(auto itr = appl_exploits.begin(); itr != appl_exploits.end();){ itr = appl_exploits.erase(itr); } for (int r = 0; r < ex_groups.size() * (ttwo_comm.size()-1); r++){ ttwo_comm.irecv(mpi::any_source, 6, partial_appl_exploits); for(auto itr=partial_appl_exploits.begin(); itr!=partial_appl_exploits.end(); itr++){ auto index_r=std::distance(partial_appl_exploits.begin(),itr); appl_exploits.push_back(partial_appl_exploits.at(index_r)); } } } //loop through the vector for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){ auto e = *itr; auto exploit = std::get<0>(e); auto assetGroup = std::get<1>(e); egroup=exploit.get_group(); if ((egroup != "null" && group_fired[egroup] == 0) || egroup == "null"){ NetworkState new_state{current_state}; std::vector> sync_exploits; if (egroup == "null") sync_exploits.push_back(e); else { sync_exploits = sync_vectors[egroup]; //TODO: Does not work if only some assets belong to a group. This only works if //all assets are in the group if(sync_exploits.size() < instance.assets.size()){ break; } } std::cout << "Sync exploit size " << sync_exploits.size() << std::endl; for(auto sync_itr=sync_exploits.begin(); sync_itr!=sync_exploits.end(); sync_itr++){ e = *sync_itr; exploit = std::get<0>(e); egroup=exploit.get_group(); assetGroup = std::get<1>(e); group_fired[egroup] = 1; auto postconditions = createPostConditions(e, instance.facts); auto qualities = std::get<0>(postconditions); auto topologies = std::get<1>(postconditions); for(auto &qual : qualities) { auto action = std::get<0>(qual); auto fact = std::get<1>(qual); switch(action) { case ADD_T: new_state.add_quality(fact); break; case UPDATE_T: new_state.update_quality(fact); //TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess if(fact.get_op()=="+="){ //std::cout<<" AFTER UPDATE "<::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact)); //If the value is not already in the hash_table, insert it. //Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it //As a result, you have to manually add it. if(got==instance.facts.hash_table.end()){ instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size(); instance.facts.length++; instance.facts.str_vector.push_back(new_state.compound_assign(fact)); for (int w = 0; w < world.size(); w++) { if(w < 1 + alloc && w > two_alloc) { world.isend(w, 3, new_state); world.isend(w, 4, fact); } } //Update ttwo_comm Nodes if (ttwo_comm.size() > 1){ for (int t = 0; t < ttwo_comm.size(); t++){ if (t != ttwo_comm.rank()){ ttwo_comm.isend(t, 3, new_state); ttwo_comm.isend(t, 4, fact); } } } } } break; case DELETE_T: new_state.delete_quality(fact); break; } } for(auto &topo : topologies) { auto action = std::get<0>(topo); auto fact = std::get<1>(topo); switch(action) { case ADD_T: new_state.add_topology(fact); break; case UPDATE_T: new_state.update_topology(fact); break; case DELETE_T: new_state.delete_topology(fact); break; } } }//Sync. Fire for auto hash_num = new_state.get_hash(instance.facts); if (hash_num == current_hash){ std::cout << "Same hash." << std::endl; continue; } //<6 Node Edge Case Prevention: Node 0 unable to execute task 3 if(world.rank() != 0){ std::cout << "Node " << world.rank() << " sending new state data to Node 0" << std::endl; world.isend(0, 5, new_state); world.isend(0, 6, current_state); world.isend(0, 10, exploit); world.isend(0, 11, assetGroup); } else { task_three(instance, new_state, localFrontier, mem_threshold, world,\ two_alloc, current_state, exploit, assetGroup, hash_map); } } else{ std::cout << "Breaking." << std::endl; break; } } } void task_three(AGGenInstance &instance, NetworkState &new_state, std::deque &localFrontier,\ double mem_threshold, boost::mpi::communicator &world, int two_alloc, NetworkState ¤t_state,\ Exploit &exploit, AssetGroup &assetGroup, std::unordered_map &hash_map){ std::cout << "Started Task 3." << std::endl; auto hash_num = new_state.get_hash(instance.facts); //although local frontier is updated, the global hash is also updated to avoid testing on explored states. if (hash_map.find(hash_num) == hash_map.end()) { new_state.set_id(); auto facts_tuple = new_state.get_factbase().get_facts_tuple(); FactbaseItems new_items = std::make_tuple(facts_tuple, new_state.get_id()); instance.factbase_items.push_back(new_items); instance.factbases.push_back(new_state.get_factbase()); hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id())); //See memory usage. If it exceeds the threshold, store new states in the DB double i_alpha = 0.0; //Get the most recent Factbase's size * total number of factbases, rough approximation of *2 to account for factbase_items double i_usage = instance.factbases.back().get_size() * instance.factbases.size() * 2 + sizeof(instance.edges[0]) * instance.edges.size(); auto tot_sys_mem = getTotalSystemMemory(); i_alpha = i_usage/tot_sys_mem; double f_alpha; if (!localFrontier.empty()) f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; else f_alpha = 0.0; if (f_alpha >= (mem_threshold/2)) { //std::cout << "Frontier Alpha prior to database storing: " << f_alpha << std::endl; save_unexplored_to_db(new_state); if (!localFrontier.empty()) f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; else f_alpha = 0; //std::cout << "Frontier Alpha after database storing: " << f_alpha << std::endl; } //Store new state in database to ensure proper ordering of the FIFO queue else if (!unex_empty()){ save_unexplored_to_db(new_state); } //Otherwise, we can just store in memory else { localFrontier.emplace_front(new_state); } if (i_alpha >= mem_threshold/2){ //std::cout << "Instance Alpha prior to database storing: " << i_alpha << std::endl; world.isend(send_check(world, two_alloc), 7, instance.factbases); world.isend(send_check(world, two_alloc), 8, instance.edges); //save_ag_to_db(instance, true); //Clear vectors and free memory std::vector().swap(instance.factbases); std::vector().swap(instance.factbase_items); std::vector().swap(instance.edges); i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\ sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\ sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size())); i_alpha = i_usage/tot_sys_mem; //std::cout << "Instance Alpha after database storing: " << i_alpha << std::endl; } Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup); ed.set_id(); instance.edges.push_back(ed); } //END if (hash_map.find(hash_num) == hash_map.end()) else { int id = hash_map[hash_num]; Edge ed(current_state.get_id(), id, exploit, assetGroup); ed.set_id(); instance.edges.push_back(ed); } } int send_check(boost::mpi::communicator &world, int curr_node){ int send_to = curr_node + 1; if (curr_node >= world.size()-1) send_to = 0; return send_to; }