Starting work on MPI

This commit is contained in:
Noah L. Schrick 2022-01-13 14:11:56 -06:00
parent f4b5e433f9
commit 7cf0889d31
13 changed files with 736 additions and 319 deletions

Binary file not shown.

View File

@ -153,6 +153,7 @@ createPostConditions(std::tuple<Exploit, AssetGroup> &group, Keyvalue &facts) {
AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, int initQSize,\ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, int initQSize,\
double mem_threshold, boost::mpi::communicator &world) { double mem_threshold, boost::mpi::communicator &world) {
//Init all Nodes with these variables
std::vector<Exploit> exploit_list = instance.exploits; std::vector<Exploit> exploit_list = instance.exploits;
//Create a vector that contains all the groups of exploits to be fired synchonously //Create a vector that contains all the groups of exploits to be fired synchonously
@ -181,7 +182,9 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
printf("esize: %lu\n", esize); printf("esize: %lu\n", esize);
bool save_queued = false; bool save_queued = false;
std::cout << "Generating Attack Graph" << std::endl; if (world.rank() == 0){
std::cout << "Generating Attack Graph" << std::endl;
}
std::unordered_map<size_t, PermSet<size_t>> od_map; std::unordered_map<size_t, PermSet<size_t>> od_map;
size_t assets_size = instance.assets.size(); size_t assets_size = instance.assets.size();
@ -332,352 +335,323 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
double total_t=0.0; double total_t=0.0;
struct timeval t1,t2; struct timeval t1,t2;
gettimeofday(&t1,NULL); gettimeofday(&t1,NULL);
//#pragma omp parallel for num_threads(numThrd) default(none) shared(esize,counter,exploit_list,od_map,frt_size,total_t,t1,t2) schedule(dynamic,1)
#pragma omp parallel for num_threads(numThrd) default(none) shared(esize,counter,exploit_list,od_map,frt_size,total_t,t1,t2,std::cout, mem_threshold) schedule(dynamic,1) int num_tasks = 6;
#pragma omp parallel for num_threads(numThrd) default(none) shared(esize,counter,exploit_list,od_map,frt_size,total_t,t1,t2,std::cout, mem_threshold, num_tasks) schedule(dynamic,1)
//auto ag_start = std::chrono::system_clock::now(); //auto ag_start = std::chrono::system_clock::now();
for(int k=0;k<frt_size;k++){ for(int k=0;k<frt_size;k++){
double f_alpha = 0.0;
auto tot_sys_mem = getTotalSystemMemory();
std::deque<NetworkState> localFrontier;
localFrontier.emplace_front(frontier[k]);
//TODO: Abort signal from Task 0 when all nodes should be done since they don't keep
//up with frontier
while (!localFrontier.empty() || !unex_empty()){//while starts
//We need to refill the localFrontier with states from the database if it's empty
if(localFrontier.empty() && world.rank() == 0) {
task_zero(instance, localFrontier, mem_threshold);
}
//Have all nodes wait until Frontier is refilled?
//world.barrier();
//printf("State %d in Frontier\n",k); //Task 1 Node Allocating
//double alpha = get_alpha(); int alloc;
double f_alpha = 0.0; if(world.size() <= num_tasks)
auto tot_sys_mem = getTotalSystemMemory(); alloc = 1;
std::deque<NetworkState> localFrontier; else
localFrontier.emplace_front(frontier[k]); alloc = ceil((world.size()-num_tasks)/2);
while (!localFrontier.empty() || !unex_empty()){//while starts
//We need to refill the localFrontier with states from the database if it's empty //Task 2 Node Allocating
if(localFrontier.empty()) { int reduc_factor = 0;
std::cout << "Frontier empty, retrieving from database" << std::endl; int two_alloc = alloc;
double total_tt = 0.0; if(world.size() % 2 != 0 && world.size() > num_tasks){
struct timeval tt1,tt2; int two_alloc = alloc-1;
gettimeofday(&tt1,NULL); reduc_factor = 1;
int retrv_counter = 0; }
//Task 0 to Task 1 Communication
if(world.rank() == 0)
{
auto current_state = localFrontier.back();
auto current_hash = current_state.get_hash(instance.facts);
localFrontier.pop_back();
for(l=0; l <= alloc; l++){
world.isend(send_check(world, world.rank()), 0, current_state);
}
}
//Execute Task 1
if (world.rank() > 0 && world.rank() <= alloc){
NetworkState{current_state};
world.irecv(mpi::any_source, 0, current_state);
task_one(instance, current_state, exploit_list, od_map, alloc, two_alloc, reduc_factor, num_tasks, world);
}
//Execute Task 2
if(world.rank() == 1 + alloc && world.rank() <= two_alloc)
{
task_two(instance);
}
//TODO: One (or a few) larger queries to pull in new states, rather than single queries that pull states one-by-one //recv
do { //world.isend(world.rank()+1+s, 0, appl_exploits);
NetworkState db_new_state = fetch_unexplored(instance.facts);
localFrontier.emplace_front(db_new_state);
//alpha = get_alpha();
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
retrv_counter += 1;
}
//Leave a 30% buffer in alpha
while((f_alpha <= (mem_threshold * 0.7)) && !unex_empty());
std::cout << "Retrieved " << retrv_counter << " factbases from the database." << std::endl;
gettimeofday(&tt2,NULL);
total_tt+=(tt2.tv_sec-tt1.tv_sec)*1000.0+(tt2.tv_usec-tt1.tv_usec)/1000.0;
//printf("Retrieving from db took %lf s.\n", total_tt);
}
//std::cout<<"FRONTIER SIZE: "<<localFrontier.size()<<std::endl;
auto current_state = localFrontier.back();
auto current_hash = current_state.get_hash(instance.facts);
localFrontier.pop_back();
std::vector<std::tuple<Exploit, AssetGroup>> appl_exploits;
for (size_t i = 0; i < esize; i++) {//for loop for applicable exploits starts
auto e = exploit_list.at(i);
size_t num_params = e.get_num_params();
auto preconds_q = e.precond_list_q();
auto preconds_t = e.precond_list_t();
auto perms = od_map[num_params];
std::vector<AssetGroup> asset_groups;
for (auto perm : perms) {
std::vector<Quality> asset_group_quals;
std::vector<Topology> asset_group_topos;
asset_group_quals.reserve(preconds_q.size());
asset_group_topos.reserve(preconds_t.size());
//std::vector<int>::size_type sz;
//sz=asset_group_quals.capacity();
for (auto &precond : preconds_q) {
//Old quality encode caused this to crash
asset_group_quals.emplace_back(
perm[precond.get_param_num()], precond.name, precond.op,
precond.value, instance.facts);
}
for (auto &precond : preconds_t) {
auto dir = precond.get_dir();
auto prop = precond.get_property();
auto op = precond.get_operation();
auto val = precond.get_value();
asset_group_topos.emplace_back(
perm[precond.get_from_param()],
perm[precond.get_to_param()], dir, prop, op, val, instance.facts);
}
asset_groups.emplace_back(asset_group_quals, asset_group_topos,
perm);
}
auto assetgroup_size = asset_groups.size();
for (size_t j = 0; j < assetgroup_size; j++) {
auto asset_group = asset_groups.at(j);
for (auto &quality : asset_group.get_hypo_quals()) {
if (!current_state.get_factbase().find_quality(quality)) {
goto LOOPCONTINUE1;
}
}
for (auto &topology : asset_group.get_hypo_topos()) {
if (!current_state.get_factbase().find_topology(topology)) {
goto LOOPCONTINUE1;
}
}
{
auto new_appl_exploit = std::make_tuple(e, asset_group);
appl_exploits.push_back(new_appl_exploit);
}
LOOPCONTINUE1:;
}
} //for loop for applicable exploits ends
auto appl_expl_size = appl_exploits.size();
//skip flag is used to ensure that the egroup loop is not repeatedly run more than necessary
int skip_flag=0;
//for (size_t j = 0; j < appl_expl_size; j++) { //(OLD) for loop for new states starts
//vector for holding the appl_exploits indices at which groups exist
std::vector<int> idr_idx;
//vector for holding indices that have already fired //std::vector<std::tuple<Exploit, AssetGroup>> appl_exploits;
std::vector<int> fired_idx;
//iterator for the applicable exploits vector //if (world.size() > num_tasks){
auto itr=appl_exploits.begin(); // if (world.rank() >= two_alloc + 1 && world.rank() <= (2 * two_alloc + reduc_factor)){
//recv the appl exploits
// }
//}
int break_flag=0;
int testing_flag=0;
//loop through the vector
for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){
//keep track of index for later use
auto index=std::distance(appl_exploits.begin(), itr);
//reset break flag
break_flag=0;
//To avoid double-fire, check if an index has already been run.
//If it has, then there is no need to run through this loop again.
for(auto itr_f=fired_idx.begin(); itr_f!=fired_idx.end(); itr_f++){
auto index_f=std::distance(fired_idx.begin(),itr_f);
if(index==index_f)
break_flag=1;
}
if (break_flag==1) //task_two();
break;
//empty the appl_exploits index vector at the start of each loop so that
//it doesn't contain stale data from a previous loop
idr_idx.clear();
NetworkState new_state{current_state};
//auto e = appl_exploits.at(j);
/* Synchronous fire function
First: double/sanity checks to see if there are other exploits that need to be fired
This also prevents the firing from occurring when it shouldn't via a regular passthrough
After popping, it checks if the vector is empty. If it is, then we no longer need to
re-fill the vector since we've gone through all possibilities
*/
SYNCH_FIRE:;
if(!idr_idx.empty()){
//std::cout<<"IDR Size " << idr_idx.size()<<std::endl;
index=idr_idx.back();
idr_idx.pop_back();
if(idr_idx.empty())
skip_flag=1;
fired_idx.push_back(index);
}
auto appl_expl_size = appl_exploits.size();
//skip flag is used to ensure that the egroup loop is not repeatedly run more than necessary
int skip_flag=0;
auto e = appl_exploits.at(index); //for (size_t j = 0; j < appl_expl_size; j++) { //(OLD) for loop for new states starts
auto exploit = std::get<0>(e);
//std::cout<<exploit.get_name()<<std::endl;
//For synchronous firing: get indices of all exploits in the same group and
//push them onto the index vector for later use
auto egroup=exploit.get_group();
if (egroup!="null" && idr_idx.empty() && skip_flag==0){ //vector for holding the appl_exploits indices at which groups exist
for(int i=0; i!=appl_exploits.size(); i++){ std::vector<int> idr_idx;
if((std::get<0>(appl_exploits.at(i))).get_group()==egroup && i!=index){
idr_idx.emplace_back(i); //vector for holding indices that have already fired
} std::vector<int> fired_idx;
//iterator for the applicable exploits vector
auto itr=appl_exploits.begin();
int break_flag=0;
int testing_flag=0;
//loop through the vector
for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){
//keep track of index for later use
auto index=std::distance(appl_exploits.begin(), itr);
//reset break flag
break_flag=0;
//To avoid double-fire, check if an index has already been run.
//If it has, then there is no need to run through this loop again.
for(auto itr_f=fired_idx.begin(); itr_f!=fired_idx.end(); itr_f++){
auto index_f=std::distance(fired_idx.begin(),itr_f);
if(index==index_f)
break_flag=1;
} }
//TODO: If there are other assets in group, if (break_flag==1)
//but you check idr_idx after filling and it's still empty
//you know that the other asset isn't ready to be fired yet, so wait.
//THIS BREAKS CODE IF ONLY 1 ASSET IN GROUP EXPLOIT. NEED TO FIGURE OUT HOW TO SEE HOW MANY ASSETS ARE IN GROUP
//std::cout<<std::get<1>(e).size()<<std::endl;
//if(std::get<1>(e).size()>1){
if(idr_idx.empty()){
testing_flag=1;
}
// }
}
if(testing_flag==1)
break;
skip_flag=0;
auto assetGroup = std::get<1>(e);
//assetGroup.print_group();
//std::cout<<std::endl;
auto postconditions = createPostConditions(e, instance.facts);
auto qualities = std::get<0>(postconditions);
auto topologies = std::get<1>(postconditions);
for(auto &qual : qualities) {
auto action = std::get<0>(qual);
auto fact = std::get<1>(qual);
switch(action) {
case ADD_T:
new_state.add_quality(fact);
break; break;
case UPDATE_T:
new_state.update_quality(fact);
//TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess
if(fact.get_op()=="+="){
//std::cout<<" AFTER UPDATE "<<new_state.compound_assign(fact)<<std::endl;
std::unordered_map<std::string,int>::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact));
//If the value is not already in the hash_table, insert it. //empty the appl_exploits index vector at the start of each loop so that
//Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it //it doesn't contain stale data from a previous loop
//As a result, you have to manually add it. idr_idx.clear();
if(got==instance.facts.hash_table.end()){
instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size(); NetworkState new_state{current_state};
instance.facts.length++; //auto e = appl_exploits.at(j);
instance.facts.str_vector.push_back(new_state.compound_assign(fact));
/* Synchronous fire function
First: double/sanity checks to see if there are other exploits that need to be fired
This also prevents the firing from occurring when it shouldn't via a regular passthrough
(as in, when this gets checked from NOT the goto.)
After popping, it checks if the vector is empty. If it is, then we no longer need to
re-fill the vector since we've gone through all possibilities
*/
SYNCH_FIRE:;
if(!idr_idx.empty()){
//std::cout<<"IDR Size " << idr_idx.size()<<std::endl;
index=idr_idx.back();
idr_idx.pop_back();
if(idr_idx.empty())
skip_flag=1;
fired_idx.push_back(index);
}
auto e = appl_exploits.at(index);
auto exploit = std::get<0>(e);
//std::cout<<exploit.get_name()<<std::endl;
//For synchronous firing: get indices of all exploits in the same group and
//push them onto the index vector for later use
auto egroup=exploit.get_group();
if (egroup!="null" && idr_idx.empty() && skip_flag==0){
for(int i=0; i!=appl_exploits.size(); i++){
if((std::get<0>(appl_exploits.at(i))).get_group()==egroup && i!=index){
idr_idx.emplace_back(i);
} }
} }
break;
case DELETE_T: //TODO: If there are other assets in group,
new_state.delete_quality(fact); //but you check idr_idx after filling and it's still empty
break; //you know that the other asset isn't ready to be fired yet, so wait.
//THIS BREAKS CODE IF ONLY 1 ASSET IN GROUP EXPLOIT. NEED TO FIGURE OUT HOW TO SEE HOW MANY ASSETS ARE IN GROUP
//std::cout<<std::get<1>(e).size()<<std::endl;
//if(std::get<1>(e).size()>1){
if(idr_idx.empty()){
testing_flag=1;
}
// }
} }
} if(testing_flag==1)
for(auto &topo : topologies) { break;
auto action = std::get<0>(topo); skip_flag=0;
auto fact = std::get<1>(topo); auto assetGroup = std::get<1>(e);
switch(action) { //assetGroup.print_group();
case ADD_T: //std::cout<<std::endl;
new_state.add_topology(fact); auto postconditions = createPostConditions(e, instance.facts);
break; auto qualities = std::get<0>(postconditions);
case UPDATE_T: auto topologies = std::get<1>(postconditions);
new_state.update_topology(fact);
break; for(auto &qual : qualities) {
case DELETE_T: auto action = std::get<0>(qual);
new_state.delete_topology(fact); auto fact = std::get<1>(qual);
break; switch(action) {
case ADD_T:
new_state.add_quality(fact);
break;
case UPDATE_T:
new_state.update_quality(fact);
//TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess
if(fact.get_op()=="+="){
//std::cout<<" AFTER UPDATE "<<new_state.compound_assign(fact)<<std::endl;
std::unordered_map<std::string,int>::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact));
//If the value is not already in the hash_table, insert it.
//Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it
//As a result, you have to manually add it.
if(got==instance.facts.hash_table.end()){
instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size();
instance.facts.length++;
instance.facts.str_vector.push_back(new_state.compound_assign(fact));
}
}
break;
case DELETE_T:
new_state.delete_quality(fact);
break;
}
} }
} for(auto &topo : topologies) {
//appl_exploits.erase(appl_exploits.begin()+index); auto action = std::get<0>(topo);
auto fact = std::get<1>(topo);
if(!idr_idx.empty()) switch(action) {
goto SYNCH_FIRE; case ADD_T:
new_state.add_topology(fact);
break;
case UPDATE_T:
new_state.update_topology(fact);
break;
case DELETE_T:
new_state.delete_topology(fact);
break;
}
}
//appl_exploits.erase(appl_exploits.begin()+index);
if(!idr_idx.empty())
goto SYNCH_FIRE;
//THIS ALSO CRASHES auto hash_num = new_state.get_hash(instance.facts);
auto hash_num = new_state.get_hash(instance.facts);
if (hash_num == current_hash)
if (hash_num == current_hash) continue;
continue; //gettimeofday(&t1,NULL);
//gettimeofday(&t1,NULL); #pragma omp critical
#pragma omp critical if (hash_map.find(hash_num) == hash_map.end()) {//although local frontier is updated, the global hash is also updated to avoid testing on explored states.
if (hash_map.find(hash_num) == hash_map.end()) {//although local frontier is updated, the global hash is also updated to avoid testing on explored states. new_state.set_id();
new_state.set_id(); auto facts_tuple = new_state.get_factbase().get_facts_tuple();
auto facts_tuple = new_state.get_factbase().get_facts_tuple(); FactbaseItems new_items =
FactbaseItems new_items = std::make_tuple(facts_tuple, new_state.get_id());
std::make_tuple(facts_tuple, new_state.get_id()); instance.factbase_items.push_back(new_items);
instance.factbase_items.push_back(new_items); instance.factbases.push_back(new_state.get_factbase());
instance.factbases.push_back(new_state.get_factbase()); hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id()));
hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id()));
//localFrontier.emplace_front(new_state);
//localFrontier.emplace_front(new_state); //See memory usage. If it exceeds the threshold, store new states in the DB
//See memory usage. If it exceeds the threshold, store new states in the DB double i_alpha = 0.0;
double i_alpha = 0.0; //double i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\
//double i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\ sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\
sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\ sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size()));
sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size())); //Get the most recent Factbase's size * total number of factbases, rough approximation of *2 to account for factbase_items
//Get the most recent Factbase's size * total number of factbases, rough approximation of *2 to account for factbase_items double i_usage = instance.factbases.back().get_size() * instance.factbases.size() * 2 + sizeof(instance.edges[0]) * instance.edges.size();
double i_usage = instance.factbases.back().get_size() * instance.factbases.size() * 2 + sizeof(instance.edges[0]) * instance.edges.size();
i_alpha = i_usage/tot_sys_mem; i_alpha = i_usage/tot_sys_mem;
if (!localFrontier.empty())
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
else
f_alpha = 0.0;
//std::cout << "Frontier Alpha: " << f_alpha << std::endl;
//std::cout << "Instance Alpha: " << i_alpha << std::endl;
//std::cout << "Mem Threshold: " << mem_threshold << std::endl;
if (f_alpha >= (mem_threshold/2)) {
std::cout << "Frontier Alpha prior to database storing: " << f_alpha << std::endl;
//std::cout << "Factbase Usage Before: " << sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) << std::endl;
//std::cout << "Factbase Item Usage Before: " << sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) << std::endl;
//std::cout << "Edge Usage Before: " << sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size()) << std::endl;
save_unexplored_to_db(new_state);
if (!localFrontier.empty()) if (!localFrontier.empty())
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem; f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
else else
f_alpha = 0; f_alpha = 0.0;
std::cout << "Frontier Alpha after database storing: " << f_alpha << std::endl; //std::cout << "Frontier Alpha: " << f_alpha << std::endl;
//std::cout << "Instance Alpha: " << i_alpha << std::endl;
//std::cout << "Mem Threshold: " << mem_threshold << std::endl;
if (f_alpha >= (mem_threshold/2)) {
std::cout << "Frontier Alpha prior to database storing: " << f_alpha << std::endl;
//std::cout << "Factbase Usage Before: " << sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) << std::endl;
//std::cout << "Factbase Item Usage Before: " << sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) << std::endl;
//std::cout << "Edge Usage Before: " << sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size()) << std::endl;
//std::cout << "Storing in database." << std::endl; save_unexplored_to_db(new_state);
} if (!localFrontier.empty())
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
else
f_alpha = 0;
std::cout << "Frontier Alpha after database storing: " << f_alpha << std::endl;
//Store new state in database to ensure proper ordering of the FIFO queue //std::cout << "Storing in database." << std::endl;
else if (!unex_empty()){ }
save_unexplored_to_db(new_state);
}
//Otherwise, we can just store in memory
else {
localFrontier.emplace_front(new_state);
}
if (i_alpha >= mem_threshold/2){ //Store new state in database to ensure proper ordering of the FIFO queue
std::cout << "Instance Alpha prior to database storing: " << i_alpha << std::endl; else if (!unex_empty()){
save_ag_to_db(instance, true); save_unexplored_to_db(new_state);
}
//Otherwise, we can just store in memory
else {
localFrontier.emplace_front(new_state);
}
//Clear vectors and free memory if (i_alpha >= mem_threshold/2){
std::vector<Factbase>().swap(instance.factbases); std::cout << "Instance Alpha prior to database storing: " << i_alpha << std::endl;
std::vector<FactbaseItems>().swap(instance.factbase_items); save_ag_to_db(instance, true);
std::vector<Edge>().swap(instance.edges);
//Clear vectors and free memory
std::vector<Factbase>().swap(instance.factbases);
std::vector<FactbaseItems>().swap(instance.factbase_items);
std::vector<Edge>().swap(instance.edges);
i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\
sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\
sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size()));
i_alpha = i_usage/tot_sys_mem;
std::cout << "Instance Alpha after database storing: " << i_alpha << std::endl;
}
i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\ //NetworkState new_state = fetch_unexplored(instance.facts);
sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\
sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size())); Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup);
i_alpha = i_usage/tot_sys_mem; ed.set_id();
std::cout << "Instance Alpha after database storing: " << i_alpha << std::endl; instance.edges.push_back(ed);
counter++;
} }
else {
//NetworkState new_state = fetch_unexplored(instance.facts); int id = hash_map[hash_num];
Edge ed(current_state.get_id(), id, exploit, assetGroup);
Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup); ed.set_id();
ed.set_id(); instance.edges.push_back(ed);
instance.edges.push_back(ed); }
counter++; } //for loop for new states ends
} } //while ends
else { auto ag_end= std::chrono::system_clock::now();
int id = hash_map[hash_num];
Edge ed(current_state.get_id(), id, exploit, assetGroup);
ed.set_id();
instance.edges.push_back(ed);
}
//gettimeofday(&t2,NULL);
//total_t+=(t2.tv_sec-t1.tv_sec)*1000.0+(t2.tv_usec-t1.tv_usec)/1000.0;
} //for loop for new states ends
} //while ends
auto ag_end= std::chrono::system_clock::now();
// std::chrono::duration<double> ag_elapsed = ag_end - ag_start;
//std::cout << "AG TOOK" << ag_elapsed.count() << std::endl;
}//OpenMP block ends }//OpenMP block ends
//printf("The critical took %lf s.\n",total_t);
gettimeofday(&t2,NULL); gettimeofday(&t2,NULL);
total_t+=(t2.tv_sec-t1.tv_sec)*1000.0+(t2.tv_usec-t1.tv_usec)/1000.0; total_t+=(t2.tv_sec-t1.tv_sec)*1000.0+(t2.tv_usec-t1.tv_usec)/1000.0;
std::cout << "Graph generation took " << total_t << " ms for process " << world.rank() << std::endl; std::cout << "Graph generation took " << total_t << " ms for process " << world.rank() << std::endl;

View File

@ -48,3 +48,8 @@ void AssetGroup::print_group() {
int AssetGroup::size() { int AssetGroup::size() {
return perm.size(); return perm.size();
} }
AssetGroup::AssetGroup()
{
}

View File

@ -10,6 +10,18 @@
#include "quality.h" #include "quality.h"
#include "topology.h" #include "topology.h"
#include <boost/archive/tmpdir.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/base_object.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/list.hpp>
#include <boost/serialization/assume_abstract.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/is_bitwise_serializable.hpp>
/** AssetGroup class /** AssetGroup class
* @brief Holds information about multiple Assets * @brief Holds information about multiple Assets
* @details Holds the hypothetical qualities and topologies of * @details Holds the hypothetical qualities and topologies of
@ -23,6 +35,11 @@ class AssetGroup {
std::vector<size_t> perm; std::vector<size_t> perm;
template<class Archive>
void serialize(Archive &ar, const unsigned int /* file_version */){
ar & hypothetical_qualities & hypothetical_topologies & perm;
}
public: public:
/** /**
* @brief Constructor for AssetGroup * @brief Constructor for AssetGroup
@ -32,6 +49,7 @@ class AssetGroup {
* @param hypo_topos The hyptothetcial topologies of Assets * @param hypo_topos The hyptothetcial topologies of Assets
* @param pperm IDs of the Assets * @param pperm IDs of the Assets
*/ */
AssetGroup();
AssetGroup(std::vector<Quality> hypo_quals, AssetGroup(std::vector<Quality> hypo_quals,
std::vector<Topology> hypo_topos, std::vector<size_t> pperm) std::vector<Topology> hypo_topos, std::vector<size_t> pperm)
: hypothetical_qualities(move(hypo_quals)), : hypothetical_qualities(move(hypo_quals)),

View File

@ -66,6 +66,11 @@ Exploit::Exploit(int preId, string &preName, int preNumParams,
preconds_q(std::get<0>(preconds)), preconds_t(std::get<1>(preconds)), preconds_q(std::get<0>(preconds)), preconds_t(std::get<1>(preconds)),
postconds_q(std::get<0>(postconds)), postconds_t(std::get<1>(postconds)) {} postconds_q(std::get<0>(postconds)), postconds_t(std::get<1>(postconds)) {}
Exploit::Exploit()
{
}
/** /**
* @brief Prints the Exploit ID * @brief Prints the Exploit ID
*/ */

View File

@ -9,6 +9,19 @@
#include "topology.h" #include "topology.h"
#include "../util/build_sql.h" #include "../util/build_sql.h"
#include "../mpi/serialize_tuple.h"
#include <boost/archive/tmpdir.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/base_object.hpp>
#include <boost/serialization/utility.hpp>
#include <boost/serialization/list.hpp>
#include <boost/serialization/assume_abstract.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/is_bitwise_serializable.hpp>
typedef std::tuple<ACTION_T, ParameterizedQuality> PostconditionQ; typedef std::tuple<ACTION_T, ParameterizedQuality> PostconditionQ;
typedef std::tuple<ACTION_T, ParameterizedTopology> PostconditionT; typedef std::tuple<ACTION_T, ParameterizedTopology> PostconditionT;
@ -33,6 +46,10 @@ class Exploit {
std::vector<PostconditionQ> postconds_q; std::vector<PostconditionQ> postconds_q;
std::vector<PostconditionT> postconds_t; std::vector<PostconditionT> postconds_t;
template<class Archive>
void serialize(Archive &ar, const unsigned int /* file_version */){
ar & id & num_params & group & preconds_q & preconds_t & postconds_q & postconds_t;
}
public: public:
Exploit(int preId, std::string &preName, int preNumParams, Exploit(int preId, std::string &preName, int preNumParams,
std::string &groupName, std::string &groupName,
@ -42,6 +59,7 @@ class Exploit {
std::tuple<std::vector<PostconditionQ>, std::tuple<std::vector<PostconditionQ>,
std::vector<PostconditionT>> std::vector<PostconditionT>>
postconds); postconds);
Exploit();
int get_id() const { return id; } int get_id() const { return id; }

View File

@ -7,6 +7,7 @@
#include <string> #include <string>
#include "../mpi/serialize_tuple.h"
#include <boost/archive/tmpdir.hpp> #include <boost/archive/tmpdir.hpp>
#include <boost/archive/text_iarchive.hpp> #include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp> #include <boost/archive/text_oarchive.hpp>
@ -54,6 +55,7 @@ struct ParameterizedQuality {
std::cout << "Value: " + value << std::endl << std::endl; std::cout << "Value: " + value << std::endl << std::endl;
} }
}; };
BOOST_IS_BITWISE_SERIALIZABLE(ParameterizedQuality)
using PostconditionQuality = std::tuple<ParameterizedQuality, std::string>; using PostconditionQuality = std::tuple<ParameterizedQuality, std::string>;

View File

@ -3,6 +3,8 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "../mpi/serialize_tuple.h"
#include <boost/archive/tmpdir.hpp> #include <boost/archive/tmpdir.hpp>
#include <boost/archive/text_iarchive.hpp> #include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp> #include <boost/archive/text_oarchive.hpp>
@ -64,6 +66,9 @@ struct ParameterizedTopology {
} }
}; };
BOOST_IS_BITWISE_SERIALIZABLE(ParameterizedTopology)
using PostconditionTopology = std::tuple<ParameterizedTopology, std::string>; using PostconditionTopology = std::tuple<ParameterizedTopology, std::string>;
/** Topology class /** Topology class

View File

@ -189,8 +189,6 @@ void serialization_unit_testing(AGGenInstance &instance, boost::mpi::communicato
} }
world.barrier(); world.barrier();
//std::cout << "\nHello from process " << world.rank() << " of " << world.size()
// << " running on " << hammer_host << "." << std::endl;
std::string rollcall = "Hello from process " + std::to_string(world.rank())\ std::string rollcall = "Hello from process " + std::to_string(world.rank())\
+ " of " + std::to_string(world.size()) + " running on "\ + " of " + std::to_string(world.size()) + " running on "\
+ str_host + "."; + str_host + ".";

65
src/mpi/serialize_tuple.h Normal file
View File

@ -0,0 +1,65 @@
/*
Copyright 2011 Christopher Allen Ogden. All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are
permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list
of conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.
THIS SOFTWARE IS PROVIDED BY CHRISTOPHER ALLEN OGDEN ``AS IS'' AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL CHRISTOPHER ALLEN OGDEN OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The views and conclusions contained in the software and documentation are those of the
authors and should not be interpreted as representing official policies, either expressed
or implied, of Christopher Allen Ogden.
*/
#pragma once
#include <tuple>
namespace boost {
namespace serialization {
template<uint N>
struct Serialize
{
template<class Archive, typename... Args>
static void serialize(Archive & ar, std::tuple<Args...> & t, const unsigned int version)
{
ar & std::get<N-1>(t);
Serialize<N-1>::serialize(ar, t, version);
}
};
template<>
struct Serialize<0>
{
template<class Archive, typename... Args>
static void serialize(Archive & ar, std::tuple<Args...> & t, const unsigned int version)
{
(void) ar;
(void) t;
(void) version;
}
};
template<class Archive, typename... Args>
void serialize(Archive & ar, std::tuple<Args...> & t, const unsigned int version)
{
Serialize<sizeof...(Args)>::serialize(ar, t, version);
}
}
}

View File

@ -1,2 +1,305 @@
#include <boost/mpi.hpp>
#include <iostream> #include <iostream>
#include <sys/time.h>
#include <chrono>
#include "../util/db_functions.h"
#include "../util/avail_mem.h"
#include "../util/odometer.h"
#include "serialize_tuple.h"
void task_zero(AGGenInstance &instance, std::deque<NetworkState> &localFrontier, double mem_threshold)
{
//std::cout << "Frontier empty, retrieving from database" << std::endl;
double f_alpha = 0.0;
double total_tt = 0.0;
struct timeval tt1,tt2;
int retrv_counter = 0;
auto tot_sys_mem = getTotalSystemMemory();
gettimeofday(&tt1,NULL);
//TODO: One (or a few) larger queries to pull in new states, rather than single queries that pull states one-by-one
do {
NetworkState db_new_state = fetch_unexplored(instance.facts);
localFrontier.emplace_front(db_new_state);
//alpha = get_alpha();
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
retrv_counter += 1;
}
//Leave a 30% buffer in alpha
while((f_alpha <= (mem_threshold * 0.7)) && !unex_empty());
std::cout << "Retrieved " << retrv_counter << " factbases from the database." << std::endl;
gettimeofday(&tt2,NULL);
total_tt+=(tt2.tv_sec-tt1.tv_sec)*1000.0+(tt2.tv_usec-tt1.tv_usec)/1000.0;
//printf("Retrieving from db took %lf s.\n", total_tt);
}
//TODO: These nodes need an updated instance
void task_one(AGGenInstance &instance, NetworkState &current_state,\
std::vector<Exploit> &exploit_list, std::unordered_map<size_t, PermSet<size_t>> &od_map,\
int alloc, int two_alloc, int reduc_factor, int num_tasks, boost::mpi::communicator &world){
boost::mpi::communicator tcomm = world.split(world.rank() > 0 && world.rank() <= alloc);
std::vector<std::tuple<Exploit, AssetGroup>> appl_exploits;
unsigned long esize = exploit_list.size();
//Distribute work to all nodes
for (size_t i = 0; i < esize; i++) {//for loop for applicable exploits starts
if (i % alloc != world.rank()+1)
continue;
auto e = exploit_list.at(i);
size_t num_params = e.get_num_params();
auto preconds_q = e.precond_list_q();
auto preconds_t = e.precond_list_t();
auto perms = od_map[num_params];
std::vector<AssetGroup> asset_groups;
for (auto perm : perms) {
std::vector<Quality> asset_group_quals;
std::vector<Topology> asset_group_topos;
asset_group_quals.reserve(preconds_q.size());
asset_group_topos.reserve(preconds_t.size());
//std::vector<int>::size_type sz;
//sz=asset_group_quals.capacity();
for (auto &precond : preconds_q) {
//Old quality encode caused this to crash
asset_group_quals.emplace_back(
perm[precond.get_param_num()], precond.name, precond.op,
precond.value, instance.facts);
}
for (auto &precond : preconds_t) {
auto dir = precond.get_dir();
auto prop = precond.get_property();
auto op = precond.get_operation();
auto val = precond.get_value();
asset_group_topos.emplace_back(
perm[precond.get_from_param()],
perm[precond.get_to_param()], dir, prop, op, val, instance.facts);
}
asset_groups.emplace_back(asset_group_quals, asset_group_topos,
perm);
}
auto assetgroup_size = asset_groups.size();
for (size_t j = 0; j < assetgroup_size; j++) {
auto asset_group = asset_groups.at(j);
for (auto &quality : asset_group.get_hypo_quals()) {
if (!current_state.get_factbase().find_quality(quality)) {
goto LOOPCONTINUE1;
}
}
for (auto &topology : asset_group.get_hypo_topos()) {
if (!current_state.get_factbase().find_topology(topology)) {
goto LOOPCONTINUE1;
}
}
{
auto new_appl_exploit = std::make_tuple(e, asset_group);
appl_exploits.push_back(new_appl_exploit);
}
LOOPCONTINUE1:;
}
} //for loop for applicable exploits ends
//Less nodes allocated to task 2 than task 1.
//Distribute the appl_exploit list from the extra node in task 1 to all other nodes in this task
if (two_alloc < alloc){
std::vector<std::tuple<Exploit, AssetGroup>> partial_appl_exploits;
mpi::scatter(local, appl_exploits, partial_appl_exploits, world.rank()==alloc);
if(world.rank() < alloc){
for(auto itr=partial_appl_exploits.begin(); itr!=partial_appl_exploits.end(); itr++){
auto index_r=std::distance(partial_appl_exploits.begin(),itr);
appl_exploits.push_back(partial_appl_exploits.at(index_r));
}
}
//Send Applicable Exploit List
int skip_greatest = 0;
if (two_alloc < alloc)
skip_greatest = 1;
if(world.rank() <= alloc - skip_greatest){
world.isend(world.rank() + alloc, 0, appl_exploits);
world.isend(world.rank() + alloc, 0, current_state);
}
}
//Note: This means that these nodes also need to update their instance!!
void task_two(AGGenInstance &instance)
{
NetworkState{current_state};
std::vector<std::tuple<Exploit, AssetGroup>> appl_exploits;
world.irecv(mpi::any_source, 0, current_state);
world.irecv(mpi::any_source, 0, appl_exploits);
auto current_hash = current_state.get_hash(instance.facts);
auto appl_expl_size = appl_exploits.size();
//Sync Fire work with lowest rank node
//ifworld.rank() == lowest
//skip flag is used to ensure that the egroup loop is not repeatedly run more than necessary
int skip_flag=0;
//vector for holding the appl_exploits indices at which groups exist
std::vector<int> idr_idx;
//vector for holding indices that have already fired
std::vector<int> fired_idx;
//iterator for the applicable exploits vector
auto itr=appl_exploits.begin();
int break_flag=0;
int testing_flag=0;
//loop through the vector
for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){
//keep track of index for later use
auto index=std::distance(appl_exploits.begin(), itr);
//reset break flag
break_flag=0;
//To avoid double-fire, check if an index has already been run.
//If it has, then there is no need to run through this loop again.
for(auto itr_f=fired_idx.begin(); itr_f!=fired_idx.end(); itr_f++){
auto index_f=std::distance(fired_idx.begin(),itr_f);
if(index==index_f)
break_flag=1;
}
if (break_flag==1)
break;
//empty the appl_exploits index vector at the start of each loop so that
//it doesn't contain stale data from a previous loop
idr_idx.clear();
NetworkState new_state{current_state};
//auto e = appl_exploits.at(j);
/* Synchronous fire function
First: double/sanity checks to see if there are other exploits that need to be fired
This also prevents the firing from occurring when it shouldn't via a regular passthrough
After popping, it checks if the vector is empty. If it is, then we no longer need to
re-fill the vector since we've gone through all possibilities
*/
SYNCH_FIRE:;
if(!idr_idx.empty()){
//std::cout<<"IDR Size " << idr_idx.size()<<std::endl;
index=idr_idx.back();
idr_idx.pop_back();
if(idr_idx.empty())
skip_flag=1;
fired_idx.push_back(index);
}
auto e = appl_exploits.at(index);
auto exploit = std::get<0>(e);
//std::cout<<exploit.get_name()<<std::endl;
//For synchronous firing: get indices of all exploits in the same group and
//push them onto the index vector for later use
auto egroup=exploit.get_group();
if (egroup!="null" && idr_idx.empty() && skip_flag==0){
for(int i=0; i!=appl_exploits.size(); i++){
if((std::get<0>(appl_exploits.at(i))).get_group()==egroup && i!=index){
idr_idx.emplace_back(i);
}
}
//TODO: If there are other assets in group,
//but you check idr_idx after filling and it's still empty
//you know that the other asset isn't ready to be fired yet, so wait.
//THIS BREAKS CODE IF ONLY 1 ASSET IN GROUP EXPLOIT. NEED TO FIGURE OUT HOW TO SEE HOW MANY ASSETS ARE IN GROUP
//std::cout<<std::get<1>(e).size()<<std::endl;
//if(std::get<1>(e).size()>1){
if(idr_idx.empty()){
testing_flag=1;
}
// }
}
if(testing_flag==1)
break;
skip_flag=0;
auto assetGroup = std::get<1>(e);
//assetGroup.print_group();
//std::cout<<std::endl;
auto postconditions = createPostConditions(e, instance.facts);
auto qualities = std::get<0>(postconditions);
auto topologies = std::get<1>(postconditions);
for(auto &qual : qualities) {
auto action = std::get<0>(qual);
auto fact = std::get<1>(qual);
switch(action) {
case ADD_T:
new_state.add_quality(fact);
break;
case UPDATE_T:
new_state.update_quality(fact);
//TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess
if(fact.get_op()=="+="){
//std::cout<<" AFTER UPDATE "<<new_state.compound_assign(fact)<<std::endl;
std::unordered_map<std::string,int>::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact));
//If the value is not already in the hash_table, insert it.
//Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it
//As a result, you have to manually add it.
if(got==instance.facts.hash_table.end()){
instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size();
instance.facts.length++;
instance.facts.str_vector.push_back(new_state.compound_assign(fact));
}
}
break;
case DELETE_T:
new_state.delete_quality(fact);
break;
}
}
for(auto &topo : topologies) {
auto action = std::get<0>(topo);
auto fact = std::get<1>(topo);
switch(action) {
case ADD_T:
new_state.add_topology(fact);
break;
case UPDATE_T:
new_state.update_topology(fact);
break;
case DELETE_T:
new_state.delete_topology(fact);
break;
}
}
if(!idr_idx.empty())
goto SYNCH_FIRE;
auto hash_num = new_state.get_hash(instance.facts);
if (hash_num == current_hash)
continue;
//gettimeofday(&t1,NULL);
//CRTICIAL IS IN HERE
}
}
int send_check(boost::mpi::communicator &world, int curr_node){
int send_to = curr_node + 1;
if (curr_node == world.size()-1)
send_to = 0;
return send_to;
}

View File

@ -0,0 +1,14 @@
#ifndef TASKS_H
#define TASKS_H
void task_zero(AGGenInstance &instance, std::deque<NetworkState> &localFrontier, double mem_threshold);
void task_one(AGGenInstance &instance, NetworkState &current_state,\
std::vector<Exploit> &exploit_list, std::unordered_map<size_t, PermSet<size_t>> &od_map,\
int alloc, int two_alloc, int reduc_factor, int num_tasks, boost::mpi::communicator &world);
void task_two(AGGenInstance &instance);
int send_check(boost::mpi::communicator &world, int curr_node);
#endif //TASKS_H

View File

@ -12,6 +12,7 @@
#include "db_functions.h" #include "db_functions.h"
#include "avail_mem.h"
#include "keyvalue.h" #include "keyvalue.h"
#include "db.h" #include "db.h"
@ -573,6 +574,7 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){
std::vector<FactbaseItems>& factbase_items = instance.factbase_items; std::vector<FactbaseItems>& factbase_items = instance.factbase_items;
std::vector<Factbase>& factbases = instance.factbases; std::vector<Factbase>& factbases = instance.factbases;
std::vector<Edge>& edges = instance.edges; std::vector<Edge>& edges = instance.edges;
auto tot_sys_mem = getTotalSystemMemory();
Keyvalue& factlist = instance.facts; Keyvalue& factlist = instance.facts;
@ -592,6 +594,8 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){
std::string factbase_sql_query = "INSERT INTO factbase VALUES "; std::string factbase_sql_query = "INSERT INTO factbase VALUES ";
int factbase_counter = 0; int factbase_counter = 0;
int flag = 0; int flag = 0;
double fbsize = 0;
for (int i = 0; i < factbases.size(); ++i) { for (int i = 0; i < factbases.size(); ++i) {
@ -611,7 +615,8 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){
} }
//Break up query due to memory constraints. Suboptimal approach. //Break up query due to memory constraints. Suboptimal approach.
factbase_counter += 1; factbase_counter += 1;
if (factbase_counter >= 500){ fbsize = sizeof(factbase_sql_query);
if (fbsize/tot_sys_mem >= 0.1){
factbase_sql_query += ";"; factbase_sql_query += ";";
db.execAsync(factbase_sql_query); db.execAsync(factbase_sql_query);
flag = 1; flag = 1;
@ -778,6 +783,8 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){
std::string edge_sql_query = "INSERT INTO edge VALUES "; std::string edge_sql_query = "INSERT INTO edge VALUES ";
std::string edge_assets_sql_query = "INSERT INTO edge_asset_binding VALUES "; std::string edge_assets_sql_query = "INSERT INTO edge_asset_binding VALUES ";
int ii = 0; int ii = 0;
double esql = 0;
double easql = 0;
for (auto ei : eq) { for (auto ei : eq) {
int i = ei.second; int i = ei.second;
@ -796,7 +803,10 @@ void save_ag_to_db(AGGenInstance &instance, bool save_keyvalue){
++ii; ++ii;
//Break up query due to memory constraints. Suboptimal approach. //Break up query due to memory constraints. Suboptimal approach.
edge_counter += 1; edge_counter += 1;
if (edge_counter >= 500){ esql = sizeof(edge_sql_query);
easql = sizeof(edge_assets_sql_query);
if (esql/tot_sys_mem >= 0.1 || easql/tot_sys_mem >= 0.1){
edge_sql_query += "ON CONFLICT DO NOTHING;"; edge_sql_query += "ON CONFLICT DO NOTHING;";
edge_assets_sql_query += "ON CONFLICT DO NOTHING;"; edge_assets_sql_query += "ON CONFLICT DO NOTHING;";
db.execAsync(edge_sql_query); db.execAsync(edge_sql_query);