diff --git a/build/ag_gen b/build/ag_gen index 66de6b6..a329b3c 100755 Binary files a/build/ag_gen and b/build/ag_gen differ diff --git a/src/ag_gen/ag_gen.cpp b/src/ag_gen/ag_gen.cpp index 8477e60..21f5f5e 100755 --- a/src/ag_gen/ag_gen.cpp +++ b/src/ag_gen/ag_gen.cpp @@ -153,6 +153,7 @@ createPostConditions(std::tuple &group, Keyvalue &facts) { AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, int initQSize,\ double mem_threshold, boost::mpi::communicator &world) { + //Init all Nodes with these variables std::vector exploit_list = instance.exploits; //Create a vector that contains all the groups of exploits to be fired synchonously @@ -181,7 +182,9 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, printf("esize: %lu\n", esize); bool save_queued = false; - std::cout << "Generating Attack Graph" << std::endl; + if (world.rank() == 0){ + std::cout << "Generating Attack Graph" << std::endl; + } std::unordered_map> od_map; size_t assets_size = instance.assets.size(); @@ -332,352 +335,323 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, double total_t=0.0; struct timeval t1,t2; gettimeofday(&t1,NULL); - //#pragma omp parallel for num_threads(numThrd) default(none) shared(esize,counter,exploit_list,od_map,frt_size,total_t,t1,t2) schedule(dynamic,1) - #pragma omp parallel for num_threads(numThrd) default(none) shared(esize,counter,exploit_list,od_map,frt_size,total_t,t1,t2,std::cout, mem_threshold) schedule(dynamic,1) + + int num_tasks = 6; + #pragma omp parallel for num_threads(numThrd) default(none) shared(esize,counter,exploit_list,od_map,frt_size,total_t,t1,t2,std::cout, mem_threshold, num_tasks) schedule(dynamic,1) //auto ag_start = std::chrono::system_clock::now(); for(int k=0;k localFrontier; + localFrontier.emplace_front(frontier[k]); + + //TODO: Abort signal from Task 0 when all nodes should be done since they don't keep + //up with frontier + while (!localFrontier.empty() || !unex_empty()){//while starts + //We need to refill the localFrontier with states from the database if it's empty + if(localFrontier.empty() && world.rank() == 0) { + task_zero(instance, localFrontier, mem_threshold); + } + //Have all nodes wait until Frontier is refilled? + //world.barrier(); - //printf("State %d in Frontier\n",k); - //double alpha = get_alpha(); - double f_alpha = 0.0; - auto tot_sys_mem = getTotalSystemMemory(); - std::deque localFrontier; - localFrontier.emplace_front(frontier[k]); - while (!localFrontier.empty() || !unex_empty()){//while starts - //We need to refill the localFrontier with states from the database if it's empty - if(localFrontier.empty()) { - std::cout << "Frontier empty, retrieving from database" << std::endl; - double total_tt = 0.0; - struct timeval tt1,tt2; - gettimeofday(&tt1,NULL); - int retrv_counter = 0; + //Task 1 Node Allocating + int alloc; + if(world.size() <= num_tasks) + alloc = 1; + else + alloc = ceil((world.size()-num_tasks)/2); + + //Task 2 Node Allocating + int reduc_factor = 0; + int two_alloc = alloc; + if(world.size() % 2 != 0 && world.size() > num_tasks){ + int two_alloc = alloc-1; + reduc_factor = 1; + } + + //Task 0 to Task 1 Communication + if(world.rank() == 0) + { + auto current_state = localFrontier.back(); + auto current_hash = current_state.get_hash(instance.facts); + localFrontier.pop_back(); + for(l=0; l <= alloc; l++){ + world.isend(send_check(world, world.rank()), 0, current_state); + } + } + + //Execute Task 1 + if (world.rank() > 0 && world.rank() <= alloc){ + NetworkState{current_state}; + world.irecv(mpi::any_source, 0, current_state); + task_one(instance, current_state, exploit_list, od_map, alloc, two_alloc, reduc_factor, num_tasks, world); + } + + //Execute Task 2 + if(world.rank() == 1 + alloc && world.rank() <= two_alloc) + { + task_two(instance); + + } - //TODO: One (or a few) larger queries to pull in new states, rather than single queries that pull states one-by-one - do { - NetworkState db_new_state = fetch_unexplored(instance.facts); - localFrontier.emplace_front(db_new_state); - //alpha = get_alpha(); - f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; - retrv_counter += 1; - } - //Leave a 30% buffer in alpha - while((f_alpha <= (mem_threshold * 0.7)) && !unex_empty()); - - std::cout << "Retrieved " << retrv_counter << " factbases from the database." << std::endl; - gettimeofday(&tt2,NULL); - total_tt+=(tt2.tv_sec-tt1.tv_sec)*1000.0+(tt2.tv_usec-tt1.tv_usec)/1000.0; - //printf("Retrieving from db took %lf s.\n", total_tt); - } - //std::cout<<"FRONTIER SIZE: "<> appl_exploits; - for (size_t i = 0; i < esize; i++) {//for loop for applicable exploits starts - auto e = exploit_list.at(i); - size_t num_params = e.get_num_params(); - auto preconds_q = e.precond_list_q(); - auto preconds_t = e.precond_list_t(); - auto perms = od_map[num_params]; - std::vector asset_groups; - for (auto perm : perms) { - std::vector asset_group_quals; - std::vector asset_group_topos; - asset_group_quals.reserve(preconds_q.size()); - asset_group_topos.reserve(preconds_t.size()); - - - //std::vector::size_type sz; - //sz=asset_group_quals.capacity(); - for (auto &precond : preconds_q) { - - //Old quality encode caused this to crash - asset_group_quals.emplace_back( - perm[precond.get_param_num()], precond.name, precond.op, - precond.value, instance.facts); - } - for (auto &precond : preconds_t) { - auto dir = precond.get_dir(); - auto prop = precond.get_property(); - auto op = precond.get_operation(); - auto val = precond.get_value(); - asset_group_topos.emplace_back( - perm[precond.get_from_param()], - perm[precond.get_to_param()], dir, prop, op, val, instance.facts); - } - asset_groups.emplace_back(asset_group_quals, asset_group_topos, - perm); - } - auto assetgroup_size = asset_groups.size(); - for (size_t j = 0; j < assetgroup_size; j++) { - auto asset_group = asset_groups.at(j); - for (auto &quality : asset_group.get_hypo_quals()) { - if (!current_state.get_factbase().find_quality(quality)) { - goto LOOPCONTINUE1; - } - } - for (auto &topology : asset_group.get_hypo_topos()) { - if (!current_state.get_factbase().find_topology(topology)) { - goto LOOPCONTINUE1; - } - } - { - auto new_appl_exploit = std::make_tuple(e, asset_group); - appl_exploits.push_back(new_appl_exploit); - } - LOOPCONTINUE1:; - } - } //for loop for applicable exploits ends - - - auto appl_expl_size = appl_exploits.size(); - - //skip flag is used to ensure that the egroup loop is not repeatedly run more than necessary - int skip_flag=0; - - //for (size_t j = 0; j < appl_expl_size; j++) { //(OLD) for loop for new states starts + //recv + //world.isend(world.rank()+1+s, 0, appl_exploits); - //vector for holding the appl_exploits indices at which groups exist - std::vector idr_idx; - //vector for holding indices that have already fired - std::vector fired_idx; + //std::vector> appl_exploits; - //iterator for the applicable exploits vector - auto itr=appl_exploits.begin(); + //if (world.size() > num_tasks){ + // if (world.rank() >= two_alloc + 1 && world.rank() <= (2 * two_alloc + reduc_factor)){ + //recv the appl exploits + // } + //} - int break_flag=0; - int testing_flag=0; - //loop through the vector - for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){ - //keep track of index for later use - auto index=std::distance(appl_exploits.begin(), itr); - //reset break flag - break_flag=0; - //To avoid double-fire, check if an index has already been run. - //If it has, then there is no need to run through this loop again. - for(auto itr_f=fired_idx.begin(); itr_f!=fired_idx.end(); itr_f++){ - auto index_f=std::distance(fired_idx.begin(),itr_f); - if(index==index_f) - break_flag=1; - } - if (break_flag==1) - break; - - //empty the appl_exploits index vector at the start of each loop so that - //it doesn't contain stale data from a previous loop - idr_idx.clear(); - - NetworkState new_state{current_state}; - //auto e = appl_exploits.at(j); - - /* Synchronous fire function - - First: double/sanity checks to see if there are other exploits that need to be fired - This also prevents the firing from occurring when it shouldn't via a regular passthrough - After popping, it checks if the vector is empty. If it is, then we no longer need to - re-fill the vector since we've gone through all possibilities - */ - SYNCH_FIRE:; - if(!idr_idx.empty()){ - //std::cout<<"IDR Size " << idr_idx.size()<(e); - //std::cout<(appl_exploits.at(i))).get_group()==egroup && i!=index){ - idr_idx.emplace_back(i); - } + //vector for holding the appl_exploits indices at which groups exist + std::vector idr_idx; + + //vector for holding indices that have already fired + std::vector fired_idx; + + //iterator for the applicable exploits vector + auto itr=appl_exploits.begin(); + + int break_flag=0; + int testing_flag=0; + //loop through the vector + for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){ + //keep track of index for later use + auto index=std::distance(appl_exploits.begin(), itr); + //reset break flag + break_flag=0; + + //To avoid double-fire, check if an index has already been run. + //If it has, then there is no need to run through this loop again. + for(auto itr_f=fired_idx.begin(); itr_f!=fired_idx.end(); itr_f++){ + auto index_f=std::distance(fired_idx.begin(),itr_f); + if(index==index_f) + break_flag=1; } - //TODO: If there are other assets in group, - //but you check idr_idx after filling and it's still empty - //you know that the other asset isn't ready to be fired yet, so wait. - //THIS BREAKS CODE IF ONLY 1 ASSET IN GROUP EXPLOIT. NEED TO FIGURE OUT HOW TO SEE HOW MANY ASSETS ARE IN GROUP - //std::cout<(e).size()<(e).size()>1){ - if(idr_idx.empty()){ - testing_flag=1; - } - // } - } - if(testing_flag==1) - break; - skip_flag=0; - auto assetGroup = std::get<1>(e); - //assetGroup.print_group(); - //std::cout<(postconditions); - auto topologies = std::get<1>(postconditions); - - for(auto &qual : qualities) { - auto action = std::get<0>(qual); - auto fact = std::get<1>(qual); - switch(action) { - case ADD_T: - new_state.add_quality(fact); + if (break_flag==1) break; - case UPDATE_T: - new_state.update_quality(fact); - - //TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess - if(fact.get_op()=="+="){ - - //std::cout<<" AFTER UPDATE "<::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact)); - //If the value is not already in the hash_table, insert it. - //Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it - //As a result, you have to manually add it. - if(got==instance.facts.hash_table.end()){ - instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size(); - instance.facts.length++; - instance.facts.str_vector.push_back(new_state.compound_assign(fact)); + //empty the appl_exploits index vector at the start of each loop so that + //it doesn't contain stale data from a previous loop + idr_idx.clear(); + + NetworkState new_state{current_state}; + //auto e = appl_exploits.at(j); + + /* Synchronous fire function + + First: double/sanity checks to see if there are other exploits that need to be fired + This also prevents the firing from occurring when it shouldn't via a regular passthrough + (as in, when this gets checked from NOT the goto.) + After popping, it checks if the vector is empty. If it is, then we no longer need to + re-fill the vector since we've gone through all possibilities + */ + SYNCH_FIRE:; + if(!idr_idx.empty()){ + //std::cout<<"IDR Size " << idr_idx.size()<(e); + //std::cout<(appl_exploits.at(i))).get_group()==egroup && i!=index){ + idr_idx.emplace_back(i); } } - break; - case DELETE_T: - new_state.delete_quality(fact); - break; + + //TODO: If there are other assets in group, + //but you check idr_idx after filling and it's still empty + //you know that the other asset isn't ready to be fired yet, so wait. + //THIS BREAKS CODE IF ONLY 1 ASSET IN GROUP EXPLOIT. NEED TO FIGURE OUT HOW TO SEE HOW MANY ASSETS ARE IN GROUP + //std::cout<(e).size()<(e).size()>1){ + if(idr_idx.empty()){ + testing_flag=1; + } + // } } - } - for(auto &topo : topologies) { - auto action = std::get<0>(topo); - auto fact = std::get<1>(topo); - switch(action) { - case ADD_T: - new_state.add_topology(fact); - break; - case UPDATE_T: - new_state.update_topology(fact); - break; - case DELETE_T: - new_state.delete_topology(fact); - break; + if(testing_flag==1) + break; + skip_flag=0; + auto assetGroup = std::get<1>(e); + //assetGroup.print_group(); + //std::cout<(postconditions); + auto topologies = std::get<1>(postconditions); + + for(auto &qual : qualities) { + auto action = std::get<0>(qual); + auto fact = std::get<1>(qual); + switch(action) { + case ADD_T: + new_state.add_quality(fact); + break; + case UPDATE_T: + new_state.update_quality(fact); + + //TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess + if(fact.get_op()=="+="){ + + //std::cout<<" AFTER UPDATE "<::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact)); + + //If the value is not already in the hash_table, insert it. + //Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it + //As a result, you have to manually add it. + if(got==instance.facts.hash_table.end()){ + instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size(); + instance.facts.length++; + instance.facts.str_vector.push_back(new_state.compound_assign(fact)); + } + } + break; + case DELETE_T: + new_state.delete_quality(fact); + break; + } } - } - //appl_exploits.erase(appl_exploits.begin()+index); - - if(!idr_idx.empty()) - goto SYNCH_FIRE; - + for(auto &topo : topologies) { + auto action = std::get<0>(topo); + auto fact = std::get<1>(topo); + switch(action) { + case ADD_T: + new_state.add_topology(fact); + break; + case UPDATE_T: + new_state.update_topology(fact); + break; + case DELETE_T: + new_state.delete_topology(fact); + break; + } + } + //appl_exploits.erase(appl_exploits.begin()+index); + + if(!idr_idx.empty()) + goto SYNCH_FIRE; + - //THIS ALSO CRASHES - auto hash_num = new_state.get_hash(instance.facts); - - if (hash_num == current_hash) - continue; - //gettimeofday(&t1,NULL); - #pragma omp critical - if (hash_map.find(hash_num) == hash_map.end()) {//although local frontier is updated, the global hash is also updated to avoid testing on explored states. - new_state.set_id(); - auto facts_tuple = new_state.get_factbase().get_facts_tuple(); - FactbaseItems new_items = - std::make_tuple(facts_tuple, new_state.get_id()); - instance.factbase_items.push_back(new_items); - instance.factbases.push_back(new_state.get_factbase()); - hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id())); - - //localFrontier.emplace_front(new_state); - //See memory usage. If it exceeds the threshold, store new states in the DB - double i_alpha = 0.0; - //double i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\ - sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\ - sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size())); - //Get the most recent Factbase's size * total number of factbases, rough approximation of *2 to account for factbase_items - double i_usage = instance.factbases.back().get_size() * instance.factbases.size() * 2 + sizeof(instance.edges[0]) * instance.edges.size(); + auto hash_num = new_state.get_hash(instance.facts); + + if (hash_num == current_hash) + continue; + //gettimeofday(&t1,NULL); + #pragma omp critical + if (hash_map.find(hash_num) == hash_map.end()) {//although local frontier is updated, the global hash is also updated to avoid testing on explored states. + new_state.set_id(); + auto facts_tuple = new_state.get_factbase().get_facts_tuple(); + FactbaseItems new_items = + std::make_tuple(facts_tuple, new_state.get_id()); + instance.factbase_items.push_back(new_items); + instance.factbases.push_back(new_state.get_factbase()); + hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id())); + + //localFrontier.emplace_front(new_state); + //See memory usage. If it exceeds the threshold, store new states in the DB + double i_alpha = 0.0; + //double i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\ + sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\ + sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size())); + //Get the most recent Factbase's size * total number of factbases, rough approximation of *2 to account for factbase_items + double i_usage = instance.factbases.back().get_size() * instance.factbases.size() * 2 + sizeof(instance.edges[0]) * instance.edges.size(); - i_alpha = i_usage/tot_sys_mem; - if (!localFrontier.empty()) - f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; - else - f_alpha = 0.0; - //std::cout << "Frontier Alpha: " << f_alpha << std::endl; - //std::cout << "Instance Alpha: " << i_alpha << std::endl; - //std::cout << "Mem Threshold: " << mem_threshold << std::endl; - if (f_alpha >= (mem_threshold/2)) { - std::cout << "Frontier Alpha prior to database storing: " << f_alpha << std::endl; - //std::cout << "Factbase Usage Before: " << sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) << std::endl; - //std::cout << "Factbase Item Usage Before: " << sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) << std::endl; - //std::cout << "Edge Usage Before: " << sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size()) << std::endl; - - save_unexplored_to_db(new_state); + i_alpha = i_usage/tot_sys_mem; if (!localFrontier.empty()) f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; else - f_alpha = 0; - std::cout << "Frontier Alpha after database storing: " << f_alpha << std::endl; + f_alpha = 0.0; + //std::cout << "Frontier Alpha: " << f_alpha << std::endl; + //std::cout << "Instance Alpha: " << i_alpha << std::endl; + //std::cout << "Mem Threshold: " << mem_threshold << std::endl; + if (f_alpha >= (mem_threshold/2)) { + std::cout << "Frontier Alpha prior to database storing: " << f_alpha << std::endl; + //std::cout << "Factbase Usage Before: " << sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) << std::endl; + //std::cout << "Factbase Item Usage Before: " << sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) << std::endl; + //std::cout << "Edge Usage Before: " << sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size()) << std::endl; - //std::cout << "Storing in database." << std::endl; - } + save_unexplored_to_db(new_state); + if (!localFrontier.empty()) + f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; + else + f_alpha = 0; + std::cout << "Frontier Alpha after database storing: " << f_alpha << std::endl; - //Store new state in database to ensure proper ordering of the FIFO queue - else if (!unex_empty()){ - save_unexplored_to_db(new_state); - } - //Otherwise, we can just store in memory - else { - localFrontier.emplace_front(new_state); - } + //std::cout << "Storing in database." << std::endl; + } - if (i_alpha >= mem_threshold/2){ - std::cout << "Instance Alpha prior to database storing: " << i_alpha << std::endl; - save_ag_to_db(instance, true); + //Store new state in database to ensure proper ordering of the FIFO queue + else if (!unex_empty()){ + save_unexplored_to_db(new_state); + } + //Otherwise, we can just store in memory + else { + localFrontier.emplace_front(new_state); + } - //Clear vectors and free memory - std::vector().swap(instance.factbases); - std::vector().swap(instance.factbase_items); - std::vector().swap(instance.edges); + if (i_alpha >= mem_threshold/2){ + std::cout << "Instance Alpha prior to database storing: " << i_alpha << std::endl; + save_ag_to_db(instance, true); + + //Clear vectors and free memory + std::vector().swap(instance.factbases); + std::vector().swap(instance.factbase_items); + std::vector().swap(instance.edges); + + i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\ + sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\ + sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size())); + i_alpha = i_usage/tot_sys_mem; + std::cout << "Instance Alpha after database storing: " << i_alpha << std::endl; + + } - i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\ - sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\ - sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size())); - i_alpha = i_usage/tot_sys_mem; - std::cout << "Instance Alpha after database storing: " << i_alpha << std::endl; - - } - - //NetworkState new_state = fetch_unexplored(instance.facts); - - Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup); - ed.set_id(); - instance.edges.push_back(ed); - counter++; - } - else { - int id = hash_map[hash_num]; - Edge ed(current_state.get_id(), id, exploit, assetGroup); - ed.set_id(); - instance.edges.push_back(ed); - } - //gettimeofday(&t2,NULL); - //total_t+=(t2.tv_sec-t1.tv_sec)*1000.0+(t2.tv_usec-t1.tv_usec)/1000.0; - } //for loop for new states ends - } //while ends - auto ag_end= std::chrono::system_clock::now(); - // std::chrono::duration ag_elapsed = ag_end - ag_start; - //std::cout << "AG TOOK" << ag_elapsed.count() << std::endl; - + //NetworkState new_state = fetch_unexplored(instance.facts); + + Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup); + ed.set_id(); + instance.edges.push_back(ed); + counter++; + } + else { + int id = hash_map[hash_num]; + Edge ed(current_state.get_id(), id, exploit, assetGroup); + ed.set_id(); + instance.edges.push_back(ed); + } + } //for loop for new states ends + } //while ends + auto ag_end= std::chrono::system_clock::now(); }//OpenMP block ends - //printf("The critical took %lf s.\n",total_t); gettimeofday(&t2,NULL); total_t+=(t2.tv_sec-t1.tv_sec)*1000.0+(t2.tv_usec-t1.tv_usec)/1000.0; std::cout << "Graph generation took " << total_t << " ms for process " << world.rank() << std::endl; diff --git a/src/ag_gen/assetgroup.cpp b/src/ag_gen/assetgroup.cpp index 8d4091d..fd94dec 100755 --- a/src/ag_gen/assetgroup.cpp +++ b/src/ag_gen/assetgroup.cpp @@ -48,3 +48,8 @@ void AssetGroup::print_group() { int AssetGroup::size() { return perm.size(); } + +AssetGroup::AssetGroup() +{ + +} diff --git a/src/ag_gen/assetgroup.h b/src/ag_gen/assetgroup.h index 19105a4..f1fe0a0 100755 --- a/src/ag_gen/assetgroup.h +++ b/src/ag_gen/assetgroup.h @@ -10,6 +10,18 @@ #include "quality.h" #include "topology.h" +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + /** AssetGroup class * @brief Holds information about multiple Assets * @details Holds the hypothetical qualities and topologies of @@ -23,6 +35,11 @@ class AssetGroup { std::vector perm; + template + void serialize(Archive &ar, const unsigned int /* file_version */){ + ar & hypothetical_qualities & hypothetical_topologies & perm; + } + public: /** * @brief Constructor for AssetGroup @@ -32,6 +49,7 @@ class AssetGroup { * @param hypo_topos The hyptothetcial topologies of Assets * @param pperm IDs of the Assets */ + AssetGroup(); AssetGroup(std::vector hypo_quals, std::vector hypo_topos, std::vector pperm) : hypothetical_qualities(move(hypo_quals)), diff --git a/src/ag_gen/exploit.cpp b/src/ag_gen/exploit.cpp index 37841ea..039d849 100755 --- a/src/ag_gen/exploit.cpp +++ b/src/ag_gen/exploit.cpp @@ -66,6 +66,11 @@ Exploit::Exploit(int preId, string &preName, int preNumParams, preconds_q(std::get<0>(preconds)), preconds_t(std::get<1>(preconds)), postconds_q(std::get<0>(postconds)), postconds_t(std::get<1>(postconds)) {} +Exploit::Exploit() +{ + +} + /** * @brief Prints the Exploit ID */ diff --git a/src/ag_gen/exploit.h b/src/ag_gen/exploit.h index 654520d..0d4ac47 100755 --- a/src/ag_gen/exploit.h +++ b/src/ag_gen/exploit.h @@ -9,6 +9,19 @@ #include "topology.h" #include "../util/build_sql.h" +#include "../mpi/serialize_tuple.h" +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + typedef std::tuple PostconditionQ; typedef std::tuple PostconditionT; @@ -33,6 +46,10 @@ class Exploit { std::vector postconds_q; std::vector postconds_t; + template + void serialize(Archive &ar, const unsigned int /* file_version */){ + ar & id & num_params & group & preconds_q & preconds_t & postconds_q & postconds_t; + } public: Exploit(int preId, std::string &preName, int preNumParams, std::string &groupName, @@ -42,6 +59,7 @@ class Exploit { std::tuple, std::vector> postconds); + Exploit(); int get_id() const { return id; } diff --git a/src/ag_gen/quality.h b/src/ag_gen/quality.h index ee6ce13..d513d79 100755 --- a/src/ag_gen/quality.h +++ b/src/ag_gen/quality.h @@ -7,6 +7,7 @@ #include +#include "../mpi/serialize_tuple.h" #include #include #include @@ -54,6 +55,7 @@ struct ParameterizedQuality { std::cout << "Value: " + value << std::endl << std::endl; } }; +BOOST_IS_BITWISE_SERIALIZABLE(ParameterizedQuality) using PostconditionQuality = std::tuple; diff --git a/src/ag_gen/topology.h b/src/ag_gen/topology.h index 8483055..b21e073 100755 --- a/src/ag_gen/topology.h +++ b/src/ag_gen/topology.h @@ -3,6 +3,8 @@ #include #include + +#include "../mpi/serialize_tuple.h" #include #include #include @@ -64,6 +66,9 @@ struct ParameterizedTopology { } }; +BOOST_IS_BITWISE_SERIALIZABLE(ParameterizedTopology) + + using PostconditionTopology = std::tuple; /** Topology class diff --git a/src/mpi/serialize.cpp b/src/mpi/serialize.cpp index d0ca58f..0fb37a4 100644 --- a/src/mpi/serialize.cpp +++ b/src/mpi/serialize.cpp @@ -189,8 +189,6 @@ void serialization_unit_testing(AGGenInstance &instance, boost::mpi::communicato } world.barrier(); - //std::cout << "\nHello from process " << world.rank() << " of " << world.size() - // << " running on " << hammer_host << "." << std::endl; std::string rollcall = "Hello from process " + std::to_string(world.rank())\ + " of " + std::to_string(world.size()) + " running on "\ + str_host + "."; diff --git a/src/mpi/serialize_tuple.h b/src/mpi/serialize_tuple.h new file mode 100644 index 0000000..0e6c0b5 --- /dev/null +++ b/src/mpi/serialize_tuple.h @@ -0,0 +1,65 @@ +/* +Copyright 2011 Christopher Allen Ogden. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are +permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, this list of + conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, this list + of conditions and the following disclaimer in the documentation and/or other materials + provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY CHRISTOPHER ALLEN OGDEN ``AS IS'' AND ANY EXPRESS OR IMPLIED +WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CHRISTOPHER ALLEN OGDEN OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +The views and conclusions contained in the software and documentation are those of the +authors and should not be interpreted as representing official policies, either expressed +or implied, of Christopher Allen Ogden. +*/ + +#pragma once +#include + +namespace boost { +namespace serialization { + +template +struct Serialize +{ + template + static void serialize(Archive & ar, std::tuple & t, const unsigned int version) + { + ar & std::get(t); + Serialize::serialize(ar, t, version); + } +}; + +template<> +struct Serialize<0> +{ + template + static void serialize(Archive & ar, std::tuple & t, const unsigned int version) + { + (void) ar; + (void) t; + (void) version; + } +}; + +template +void serialize(Archive & ar, std::tuple & t, const unsigned int version) +{ + Serialize::serialize(ar, t, version); +} + +} +} \ No newline at end of file diff --git a/src/mpi/tasks.cpp b/src/mpi/tasks.cpp index a076d78..7cdb11b 100644 --- a/src/mpi/tasks.cpp +++ b/src/mpi/tasks.cpp @@ -1,2 +1,305 @@ -#include #include +#include +#include + +#include "../util/db_functions.h" +#include "../util/avail_mem.h" +#include "../util/odometer.h" +#include "serialize_tuple.h" + + +void task_zero(AGGenInstance &instance, std::deque &localFrontier, double mem_threshold) +{ + //std::cout << "Frontier empty, retrieving from database" << std::endl; + double f_alpha = 0.0; + double total_tt = 0.0; + struct timeval tt1,tt2; + int retrv_counter = 0; + + auto tot_sys_mem = getTotalSystemMemory(); + gettimeofday(&tt1,NULL); + + //TODO: One (or a few) larger queries to pull in new states, rather than single queries that pull states one-by-one + do { + NetworkState db_new_state = fetch_unexplored(instance.facts); + localFrontier.emplace_front(db_new_state); + //alpha = get_alpha(); + f_alpha = (static_cast(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; + retrv_counter += 1; + } + //Leave a 30% buffer in alpha + while((f_alpha <= (mem_threshold * 0.7)) && !unex_empty()); + + std::cout << "Retrieved " << retrv_counter << " factbases from the database." << std::endl; + gettimeofday(&tt2,NULL); + total_tt+=(tt2.tv_sec-tt1.tv_sec)*1000.0+(tt2.tv_usec-tt1.tv_usec)/1000.0; + //printf("Retrieving from db took %lf s.\n", total_tt); +} + +//TODO: These nodes need an updated instance +void task_one(AGGenInstance &instance, NetworkState ¤t_state,\ + std::vector &exploit_list, std::unordered_map> &od_map,\ + int alloc, int two_alloc, int reduc_factor, int num_tasks, boost::mpi::communicator &world){ + + boost::mpi::communicator tcomm = world.split(world.rank() > 0 && world.rank() <= alloc); + + std::vector> appl_exploits; + unsigned long esize = exploit_list.size(); + + //Distribute work to all nodes + for (size_t i = 0; i < esize; i++) {//for loop for applicable exploits starts + if (i % alloc != world.rank()+1) + continue; + auto e = exploit_list.at(i); + size_t num_params = e.get_num_params(); + auto preconds_q = e.precond_list_q(); + auto preconds_t = e.precond_list_t(); + auto perms = od_map[num_params]; + std::vector asset_groups; + for (auto perm : perms) { + std::vector asset_group_quals; + std::vector asset_group_topos; + asset_group_quals.reserve(preconds_q.size()); + asset_group_topos.reserve(preconds_t.size()); + + + //std::vector::size_type sz; + //sz=asset_group_quals.capacity(); + for (auto &precond : preconds_q) { + + //Old quality encode caused this to crash + asset_group_quals.emplace_back( + perm[precond.get_param_num()], precond.name, precond.op, + precond.value, instance.facts); + } + for (auto &precond : preconds_t) { + auto dir = precond.get_dir(); + auto prop = precond.get_property(); + auto op = precond.get_operation(); + auto val = precond.get_value(); + asset_group_topos.emplace_back( + perm[precond.get_from_param()], + perm[precond.get_to_param()], dir, prop, op, val, instance.facts); + } + asset_groups.emplace_back(asset_group_quals, asset_group_topos, + perm); + } + auto assetgroup_size = asset_groups.size(); + for (size_t j = 0; j < assetgroup_size; j++) { + auto asset_group = asset_groups.at(j); + for (auto &quality : asset_group.get_hypo_quals()) { + if (!current_state.get_factbase().find_quality(quality)) { + goto LOOPCONTINUE1; + } + } + for (auto &topology : asset_group.get_hypo_topos()) { + if (!current_state.get_factbase().find_topology(topology)) { + goto LOOPCONTINUE1; + } + } + { + auto new_appl_exploit = std::make_tuple(e, asset_group); + appl_exploits.push_back(new_appl_exploit); + } + LOOPCONTINUE1:; + } + } //for loop for applicable exploits ends + + //Less nodes allocated to task 2 than task 1. + //Distribute the appl_exploit list from the extra node in task 1 to all other nodes in this task + if (two_alloc < alloc){ + std::vector> partial_appl_exploits; + mpi::scatter(local, appl_exploits, partial_appl_exploits, world.rank()==alloc); + + if(world.rank() < alloc){ + for(auto itr=partial_appl_exploits.begin(); itr!=partial_appl_exploits.end(); itr++){ + auto index_r=std::distance(partial_appl_exploits.begin(),itr); + appl_exploits.push_back(partial_appl_exploits.at(index_r)); + } + } + + //Send Applicable Exploit List + int skip_greatest = 0; + if (two_alloc < alloc) + skip_greatest = 1; + if(world.rank() <= alloc - skip_greatest){ + world.isend(world.rank() + alloc, 0, appl_exploits); + world.isend(world.rank() + alloc, 0, current_state); + } +} + +//Note: This means that these nodes also need to update their instance!! +void task_two(AGGenInstance &instance) +{ + NetworkState{current_state}; + std::vector> appl_exploits; + world.irecv(mpi::any_source, 0, current_state); + world.irecv(mpi::any_source, 0, appl_exploits); + + auto current_hash = current_state.get_hash(instance.facts); + auto appl_expl_size = appl_exploits.size(); + + //Sync Fire work with lowest rank node + //ifworld.rank() == lowest + + //skip flag is used to ensure that the egroup loop is not repeatedly run more than necessary + int skip_flag=0; + + //vector for holding the appl_exploits indices at which groups exist + std::vector idr_idx; + + //vector for holding indices that have already fired + std::vector fired_idx; + + //iterator for the applicable exploits vector + auto itr=appl_exploits.begin(); + + int break_flag=0; + int testing_flag=0; + //loop through the vector + for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){ + //keep track of index for later use + auto index=std::distance(appl_exploits.begin(), itr); + //reset break flag + break_flag=0; + + //To avoid double-fire, check if an index has already been run. + //If it has, then there is no need to run through this loop again. + for(auto itr_f=fired_idx.begin(); itr_f!=fired_idx.end(); itr_f++){ + auto index_f=std::distance(fired_idx.begin(),itr_f); + if(index==index_f) + break_flag=1; + } + + if (break_flag==1) + break; + + //empty the appl_exploits index vector at the start of each loop so that + //it doesn't contain stale data from a previous loop + idr_idx.clear(); + + NetworkState new_state{current_state}; + //auto e = appl_exploits.at(j); + + /* Synchronous fire function + + First: double/sanity checks to see if there are other exploits that need to be fired + This also prevents the firing from occurring when it shouldn't via a regular passthrough + After popping, it checks if the vector is empty. If it is, then we no longer need to + re-fill the vector since we've gone through all possibilities + */ + SYNCH_FIRE:; + if(!idr_idx.empty()){ + //std::cout<<"IDR Size " << idr_idx.size()<(e); + //std::cout<(appl_exploits.at(i))).get_group()==egroup && i!=index){ + idr_idx.emplace_back(i); + } + } + + //TODO: If there are other assets in group, + //but you check idr_idx after filling and it's still empty + //you know that the other asset isn't ready to be fired yet, so wait. + //THIS BREAKS CODE IF ONLY 1 ASSET IN GROUP EXPLOIT. NEED TO FIGURE OUT HOW TO SEE HOW MANY ASSETS ARE IN GROUP + //std::cout<(e).size()<(e).size()>1){ + if(idr_idx.empty()){ + testing_flag=1; + } + // } + } + if(testing_flag==1) + break; + skip_flag=0; + auto assetGroup = std::get<1>(e); + //assetGroup.print_group(); + //std::cout<(postconditions); + auto topologies = std::get<1>(postconditions); + + for(auto &qual : qualities) { + auto action = std::get<0>(qual); + auto fact = std::get<1>(qual); + switch(action) { + case ADD_T: + new_state.add_quality(fact); + break; + case UPDATE_T: + new_state.update_quality(fact); + + //TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess + if(fact.get_op()=="+="){ + + //std::cout<<" AFTER UPDATE "<::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact)); + + //If the value is not already in the hash_table, insert it. + //Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it + //As a result, you have to manually add it. + if(got==instance.facts.hash_table.end()){ + instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size(); + instance.facts.length++; + instance.facts.str_vector.push_back(new_state.compound_assign(fact)); + } + } + break; + case DELETE_T: + new_state.delete_quality(fact); + break; + } + } + for(auto &topo : topologies) { + auto action = std::get<0>(topo); + auto fact = std::get<1>(topo); + switch(action) { + case ADD_T: + new_state.add_topology(fact); + break; + case UPDATE_T: + new_state.update_topology(fact); + break; + case DELETE_T: + new_state.delete_topology(fact); + break; + } + } + + if(!idr_idx.empty()) + goto SYNCH_FIRE; + + auto hash_num = new_state.get_hash(instance.facts); + + if (hash_num == current_hash) + continue; + //gettimeofday(&t1,NULL); + + //CRTICIAL IS IN HERE + } +} + + +int send_check(boost::mpi::communicator &world, int curr_node){ + int send_to = curr_node + 1; + if (curr_node == world.size()-1) + send_to = 0; + + return send_to; +} \ No newline at end of file diff --git a/src/mpi/tasks.h b/src/mpi/tasks.h index e69de29..7529584 100644 --- a/src/mpi/tasks.h +++ b/src/mpi/tasks.h @@ -0,0 +1,14 @@ +#ifndef TASKS_H +#define TASKS_H + +void task_zero(AGGenInstance &instance, std::deque &localFrontier, double mem_threshold); + +void task_one(AGGenInstance &instance, NetworkState ¤t_state,\ + std::vector &exploit_list, std::unordered_map> &od_map,\ + int alloc, int two_alloc, int reduc_factor, int num_tasks, boost::mpi::communicator &world); + +void task_two(AGGenInstance &instance); + +int send_check(boost::mpi::communicator &world, int curr_node); + +#endif //TASKS_H diff --git a/src/util/db_functions.cpp b/src/util/db_functions.cpp index 2844ffc..08a4fd1 100755 --- a/src/util/db_functions.cpp +++ b/src/util/db_functions.cpp @@ -12,6 +12,7 @@ #include "db_functions.h" +#include "avail_mem.h" #include "keyvalue.h" #include "db.h" @@ -573,6 +574,7 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){ std::vector& factbase_items = instance.factbase_items; std::vector& factbases = instance.factbases; std::vector& edges = instance.edges; + auto tot_sys_mem = getTotalSystemMemory(); Keyvalue& factlist = instance.facts; @@ -592,6 +594,8 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){ std::string factbase_sql_query = "INSERT INTO factbase VALUES "; int factbase_counter = 0; int flag = 0; + double fbsize = 0; + for (int i = 0; i < factbases.size(); ++i) { @@ -611,7 +615,8 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){ } //Break up query due to memory constraints. Suboptimal approach. factbase_counter += 1; - if (factbase_counter >= 500){ + fbsize = sizeof(factbase_sql_query); + if (fbsize/tot_sys_mem >= 0.1){ factbase_sql_query += ";"; db.execAsync(factbase_sql_query); flag = 1; @@ -778,6 +783,8 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){ std::string edge_sql_query = "INSERT INTO edge VALUES "; std::string edge_assets_sql_query = "INSERT INTO edge_asset_binding VALUES "; int ii = 0; + double esql = 0; + double easql = 0; for (auto ei : eq) { int i = ei.second; @@ -796,7 +803,10 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){ ++ii; //Break up query due to memory constraints. Suboptimal approach. edge_counter += 1; - if (edge_counter >= 500){ + esql = sizeof(edge_sql_query); + easql = sizeof(edge_assets_sql_query); + + if (esql/tot_sys_mem >= 0.1 || easql/tot_sys_mem >= 0.1){ edge_sql_query += "ON CONFLICT DO NOTHING;"; edge_assets_sql_query += "ON CONFLICT DO NOTHING;"; db.execAsync(edge_sql_query);