MPI Subgraphing

This commit is contained in:
hammer 2022-02-11 02:02:45 -06:00
parent 52aed47d00
commit daf5d6bbbc
16 changed files with 16648 additions and 2453 deletions

Binary file not shown.

View File

@ -58,7 +58,7 @@ if [ "$TYPE" == "$strval1" ]; then
if [ "$(dnsdomainname)" = "hammer.esg.utulsa.edu" ]; then
#4 Exploit Option
mpiexec --mca btl_openib_allow_ib 1 --mca btl openib,self,vader --mca opal_warn_on_missing_libcuda 0 --bind-to numa --map-by numa -np "$NODES" --timeout 129600 ./ag_gen -n ../Oct_2021/nm_files/"$CARS"_car_timeline_maintenance.nm -x ../Oct_2021/Sync/4_Exploits/"$NUM_SERV"_Serv/sync_timeline_maintenance.xp -t "$NUM_THREADS" -q 1 -p -a 0.6 -z "$DBNAME" -s -l 20
mpiexec --mca btl_openib_allow_ib 1 --mca btl openib,self,vader --mca opal_warn_on_missing_libcuda 0 --bind-to numa --map-by numa -np "$NODES" --timeout 129600 ./ag_gen -n ../Oct_2021/nm_files/"$CARS"_car_timeline_maintenance.nm -x ../Oct_2021/Sync/4_Exploits/"$NUM_SERV"_Serv/sync_timeline_maintenance.xp -t "$NUM_THREADS" -q 1 -p -a 0.6 -z "$DBNAME" -l 40 -s
#6 Exploit Option
#mpiexec --mca btl_openib_allow_ib 1 --mca btl openib,self,vader --mca opal_warn_on_missing_libcuda 0 --bind-to numa --map-by numa -np "$NODES" --timeout 129600 ./ag_gen -n ../Oct_2021/nm_files/"$CARS"_car_timeline_maintenance.nm -x ../Oct_2021/Sync/6_Exploits/"$NUM_SERV"_Serv/sync_timeline_maintenance.xp -t "$NUM_THREADS" -q 1 -p -a 0.6 -z "$DBNAME" -s -l 20
else

View File

@ -308,7 +308,7 @@ ALTER SEQUENCE public.factbase_id_seq OWNED BY public.factbase.id;
CREATE TABLE public.factbase_item (
factbase_id integer NOT NULL,
f bigint NOT NULL,
f integer NOT NULL,
type text NOT NULL
);

File diff suppressed because it is too large Load Diff

View File

@ -13,6 +13,7 @@
#include <map>
#include <random>
#include <unordered_set>
#include <set>
#include "ag_gen.h"
@ -24,6 +25,7 @@
#include <boost/archive/tmpdir.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/function_output_iterator.hpp>
#include <boost/serialization/base_object.hpp>
#include <boost/serialization/utility.hpp>
@ -178,7 +180,8 @@ AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd,
//Print out the groups if desired
if (world.rank() == 0){
std::cout <<"\nThere are "<<ex_groups.size()<<" groups: ";
std::cout <<"\nGenerating through MPI Tasking" << std::endl;
std::cout <<"There are "<<ex_groups.size()<<" groups: ";
for(int i=0; i<ex_groups.size(); i++){
std::cout<<ex_groups[i] << ". ";
}
@ -1246,10 +1249,13 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
int finished_signal = 0;
int send_msg = 0;
int state_count = 0;
int ns_inc = 0;
int first_loop = 1;
int msg_sends = 0;
NetworkState buffer_state;
Quality buffer_fact;
std::unordered_set<size_t> front_expl;
std::random_device rd; // obtain a random number from hardware
std::mt19937 gen(rd()); // seed the generator
@ -1258,12 +1264,14 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
mpi::communicator work_comm = world.split(world.rank() != 1);
//std::unordered_set<NetworkState> localFrontier_seen;
int last_known_id;
//Send new Network State to all worker nodes, if we have enough unex states to do so
if(world.rank() == 0){
//2 offset for root node and db node
for (int w = 0; w < std::min((int)world.size()-2, (int)localFrontier.size()); w++){
localFrontier.front().force_set_id(localFrontier.front().get_hash(instance.facts));
//localFrontier.front().force_set_id(localFrontier.front().get_hash(instance.facts));
std::cout << "Root says initial state has ID: " << localFrontier.front().get_id() << std::endl;
last_known_id = localFrontier.front().get_id();
mpi::request state_req = world.isend(w+2, 1, localFrontier.front());
localFrontier.pop_front();
state_req.wait();
@ -1274,12 +1282,16 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
std::cout << "World Size: " << world.size() << std::endl;
}
mpi::request requests[20];
//Main Work Loop - Just make this a do while instead of first_loop
while(!localFrontier.empty() || finished_signal == 0 || world.rank() > 0 || first_loop == 1){
first_loop = 0;
//Refill localFrontier if needed
if(localFrontier.empty() && world.rank() == 0) {
if(localFrontier.empty() && world.rank() == 0 && !unex_empty()) {
std::cout << "Running Task 0" << std::endl;
task_zero(instance, localFrontier, mem_threshold);
std::cout << "Frontier is now of size: " << localFrontier.size() << std::endl;
}
if (world.rank() > 1){
@ -1299,11 +1311,12 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
std::vector<FactbaseItems>().swap(instance.factbase_items);
NetworkState current_state;
current_state.reset_curr_id();
world.recv(0, 1, current_state);
state_count = 0;
send_msg = 1;
localFrontier.emplace_front(current_state);
std::cout << "Node " << world.rank() << " received a new state to explore" << std::endl;
std::cout << "Node " << world.rank() << " received a new state to explore with ID " <<current_state.get_id() << std::endl;
while(!localFrontier.empty()){
if (state_count < state_limit){
//Do work
@ -1447,16 +1460,16 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size();
instance.facts.length++;
instance.facts.str_vector.push_back(new_state.compound_assign(fact));
buffer_state = new_state;
buffer_fact = fact;
//buffer_state = new_state;
//buffer_fact = fact;
//Update ALL nodes (include ttwo_comm nodes) with new data
for (int w = 0; w < world.size(); w++)
{
if(w != world.rank() && w != 1)
{
mpi::request ns_req = world.isend(w, 5, buffer_state);
mpi::request fact_req = world.isend(w, 6, buffer_fact);
requests[0] = world.isend(w, 5, new_state);
requests[1] = world.isend(w, 6, fact);
//ns_req.wait();
//fact_req.wait();
}
@ -1495,12 +1508,14 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
#pragma omp critical
//although local frontier is updated, the global hash is also updated to avoid testing on explored states.
if (hash_map.find(hash_num) == hash_map.end()) {
new_state.force_set_id(hash_num);
new_state.set_id();
//new_state.force_set_id(hash_num);
instance.factbases.push_back(new_state.get_factbase());
hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id()));
localFrontier.emplace_front(new_state);
Edge ed(current_state.get_hash(instance.facts), new_state.get_id(), exploit, assetGroup);
Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup);
//Edge ed(current_state.get_hash(instance.facts), new_state.get_id(), exploit, assetGroup);
ed.set_id();
instance.edges.push_back(ed);
} //END if (hash_map.find(hash_num) == hash_map.end())
@ -1530,28 +1545,32 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
send_msg = 0;
std::cout << "Node " << world.rank() << " finished subgraphing work." << std::endl;
if(localFrontier.size() != 0){
mpi::request lf_req = world.isend(0, 3, localFrontier);
if(!localFrontier.empty()){
//std::vector<NetworkState> tmp_lf(localFrontier.begin(), localFrontier.end());
//std::cout << "Tmp LF size: " << tmp_lf.size() << std::endl;
requests[2] = world.isend(0, 3, localFrontier);
//mpi::request lf_req = world.isend(0, 3, tmp_lf);
std::cout << "Node " << world.rank() << " has sent a localFrontier of size " << localFrontier.size() << std::endl;
//lf_req.wait();
//std::cout << "Node " << world.rank() << " has finished sending localFrontier" << std::endl;
}
//Send new states and edges, then clear worker instance
if(instance.factbases.size() != 0){
std::cout << "Node " << world.rank() << " preparing to send new factbases" << std::endl;
mpi::request if_req = world.isend(0, 10, instance.factbases);
if(!instance.factbases.empty()){
std::cout << "Node " << world.rank() << " preparing to send " << instance.factbases.size() << " factbases" << std::endl;
requests[3] = world.isend(0, 10, instance.factbases);
//if_req.wait();
//std::cout << "Node " << world.rank() << " finished sending new factbases" << std::endl;
}
if(instance.edges.size() != 0){
std::cout << "Node " << world.rank() << " preparing to send new edges" << std::endl;
mpi::request ie_req = world.isend(0, 11, instance.edges);
if(!instance.edges.empty()){
std::cout << "Node " << world.rank() << " preparing to send new edges: " << instance.edges.size() << std::endl;
requests[4] = world.isend(0, 11, instance.edges);
//ie_req.wait();
//std::cout << "Node " << world.rank() << " finished sending new edges" << std::endl;
}
//Finish Signal
world.isend(0, 2, 1);
std::cout << "Node " << world.rank() << " is sending the finish message" << std::endl;
requests[5] = world.isend(0, 2, 1);
}
//Check for new fact and new state that caused an update in the hash table and facts
@ -1573,6 +1592,8 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
instance.facts.length++;
instance.facts.str_vector.push_back(update_state.compound_assign(update_fact));
}
//std::cout << "Node " << world.rank() << " finished loop " << std::endl;
mpi::wait_all(requests, requests+5);
} //end worker nodes
else if (world.rank() == 1){
@ -1634,9 +1655,11 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
if(world.iprobe(mpi::any_source, 11)){
world.recv(mpi::any_source, 11, node_edges);
}
if(node_factbases.size() != 0 && node_edges.size() != 0){
if(!node_factbases.empty() || !node_edges.empty()){
std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl;
state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world);
std::cout << "We are merging " << node_factbases.size() << " node fbs and " << node_edges.size() << " node edges" << std::endl;
state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world, last_known_id);
last_known_id = instance.factbases.size()-1;
std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl;
}
}
@ -1654,7 +1677,7 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
//Check for new fact and new state that caused an update in the hash table and facts
//while(world.iprobe(mpi::any_source, 5) || world.iprobe(mpi::any_source, 6)){
while(true){
if(!world.iprobe(mpi::any_source, 5) && !world.iprobe(mpi::any_source, 6)){
if(!world.iprobe(mpi::any_source, 5) || !world.iprobe(mpi::any_source, 6)){
break;
}
NetworkState update_state;
@ -1698,33 +1721,59 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
instance.facts.str_vector.push_back(update_state.compound_assign(update_fact));
}
std::cout << "Root has received all hash update messages. " << std::endl;
//Check for factbases or nodes, and add them to our network
while(true){
if(!world.iprobe(mpi::any_source, 10) && !world.iprobe(mpi::any_source, 11)){
break;
}
if(world.iprobe(mpi::any_source, 10)){
world.recv(mpi::any_source, 10, node_factbases);
}
if(world.iprobe(mpi::any_source, 11)){
world.recv(mpi::any_source, 11, node_edges);
}
if(node_factbases.size() != 0 || node_edges.size() != 0){
if(!node_factbases.empty() || !node_edges.empty()){
std::cout << "Merging states. Starting with: " << instance.factbases.size() << " states." << std::endl;
state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world);
state_merge(node_factbases, node_edges, hash_map, instance, mem_threshold, world, last_known_id);
last_known_id = instance.factbases.size()-1;
std::cout << "Finished merging. Now have: " << instance.factbases.size() << " states." << std::endl;
}
else{
break;
}
}
std::cout << "Root has collected all merging messages." << std::endl;
//Receive node frontiers and merge them into root frontier
for(int w = 2; w < world.size(); w++){
//if(world.iprobe(w, 3)){
if(world.iprobe(w, 3)){
std::deque<NetworkState> nodeFrontier;
std::cout << "Root attempting to receive the new Node Frontier from Node " << w << std::endl;
//std::vector<NetworkState> nodeFrontier;
world.recv(w, 3, nodeFrontier);
localFrontier.insert(localFrontier.begin(), \
std::make_move_iterator(nodeFrontier.begin()),\
std::make_move_iterator(nodeFrontier.end()));
//std::set_difference(nodeFrontier.begin(), nodeFrontier.end(), node_factbases.begin(), node_factbases.end(),
// std::inserter(front_expl, front_expl.begin()),[&](NetworkState &ns) -> size_t {return ns.get_hash(instance.facts);});
//std::set_difference(nodeFrontier.begin(), nodeFrontier.end(), node_factbases.begin(), node_factbases.end(), boost::make_function_output_iterator([&front_expl](NetworkState &ns, AGGenInstance &instance) {front_expl.insert(ns.get_hash(instance.facts));}));
//for(auto itr = nodeFrontier.begin(); itr != nodeFrontier.end(); itr++){
for (NetworkState ns : nodeFrontier){
front_expl.insert(ns.get_hash(instance.facts));
}
//for(auto itr = node_factbases.begin(), itr != node_factbases.end(); itr++){
for (Factbase fb : node_factbases){
if (front_expl.count(fb.hash(instance.facts))){
front_expl.erase(fb.hash(instance.facts));
}
}
// std::set_difference(nodeFrontier.begin(), nodeFrontier.end(), node_factbases.begin(), node_factbases.end(), boost::make_function_output_iterator([&front_expl](NetworkState &ns, AGGenInstance &instance) {front_expl.insert(ns.get_hash(instance.facts));}), [](const NetworkState &ns1, const NetworkState &ns2, AGGenInstance &instance) const {return ns1.get_hash(instance.facts) < ns2.get_hash(instance.facts);});
//std::set_difference(nodeFrontier.begin(), nodeFrontier.end(), node_factbases.begin(), node_factbases.end(), funcs{[](NetworkState &ns1, NetworkState &ns2, AGGenInstance &instance) {return ns1.get_hash(instance.facts) < ns2.get_hash(instance.facts);}, [](NetworkState &ns2, NetworkState &ns1, AGGenInstance &instance) {return ns2.get_hash(instance.facts) < ns1.get_hash(instance.facts);}}boost::make_function_output_iterator([&front_expl](NetworkState &ns, AGGenInstance &instance) {front_expl.insert(ns.get_hash(instance.facts));}) funcs{[](NetworkState &ns1, NetworkState &ns2, AGGenInstance &instance) {return ns1.get_hash(instance.facts) < ns2.get_hash(instance.facts);}, [](NetworkState &ns2, NetworkState &ns1, AGGenInstance &instance) {return ns2.get_hash(instance.facts) < ns1.get_hash(instance.facts);}});
//std::set_difference(nodeFrontier.begin(), nodeFrontier.end(), node_factbases.begin(), node_factbases.end(), std::transform(
//std::set_difference(nodeFrontier.begin(), nodeFrontier.end(), node_factbases.begin(), node_factbases.end(), std::transform(
std::cout << "Root received the frontier. Now attempting to add it to our local Frontier, which is currently of size " << localFrontier.size() << std::endl;
localFrontier.insert(std::end(localFrontier), std::begin(nodeFrontier), std::end(nodeFrontier));
std::cout << "Local Frontier now has size: " << localFrontier.size() << std::endl;
deque_marker[w] = (int)(localFrontier.size()-1);
std::cout << "Deque marker for node " << w << " is " << deque_marker[w] << std::endl;
@ -1732,8 +1781,13 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
//If a node doesn't have a specific state to pull, randomly assign it one
if (!deque_marker.count(w)){
//Randomly assign a state index to pop
if(!localFrontier.empty()){
std::uniform_int_distribution<> distr(0, localFrontier.size()-1); // define the range
deque_marker[w] = distr(gen);
}
else{
deque_marker[w] = 0;
}
std::cout << "Random deque marker for node " << w << " is " << deque_marker[w] << std::endl;
}
}
@ -1746,18 +1800,17 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
for (int w = 2; w < world.size(); w++){
std::cout << "Attempting to send a new state to node " << w << std::endl;
int proceed = 0;
int break_flag = 0;
while(proceed == 0){
while(deque_marker[w] >= localFrontier.size()){
if(localFrontier.size() == 1 || localFrontier.size() == 0){
while(proceed == 0 && !localFrontier.empty()){
if(deque_marker[w] >= localFrontier.size()){
if(localFrontier.empty()){
deque_marker[w] = 0;
break;
}
deque_marker[w] = localFrontier.size()-1;
}
std::cout << "<While> LF Size: " << localFrontier.size() << std::endl;
if(localFrontier.size() != 0){
std::cout << "<IF> LF Size: " << localFrontier.size() << std::endl;
if(!localFrontier.empty()){
//while(deque_marker[w] >= localFrontier.size())
// deque_marker[w]--;
//auto deque_access = localFrontier.begin() + deque_marker[w+2];
@ -1766,7 +1819,7 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
NetworkState send_state = localFrontier.at(deque_access);
std::cout << "Got our state from deque" << std::endl;
//Don't explore on states we already have explored
if(hash_map.find(send_state.get_id()) != hash_map.end()){
if(front_expl.find(send_state.get_hash(instance.facts)) != front_expl.end()){
std::cout << "Already explored this state." << std::endl;
if(localFrontier.size() == 1){
localFrontier.clear();
@ -1781,16 +1834,25 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
}
else{
std::cout << " We have not seen this state yet." << std::endl;
//localFrontier.erase(localFrontier.begin()+deque_access);
proceed = 1;
}
if(proceed == 1){
std::cout << "Preparing the send to World Rank " << w << std::endl;
//Rest State back to ID 0
send_state.force_set_id(0);
mpi::request state_req = world.isend(w, 1, send_state);
state_req.wait();
if(localFrontier.size() == 1){
localFrontier.clear();
}
else{
localFrontier.erase(localFrontier.begin()+deque_access);
}
//state_req.wait();
msg_sends++;
std::cout << "Root sent new states to World Rank " << w << std::endl;
std::cout << "Root sent new states to World Rank " << w << " with state ID " << send_state.get_id() << std::endl;
}
}
@ -1829,6 +1891,7 @@ AGGenInstance &AGGen::sg_generate(bool batch_process, int batch_num, int numThrd
std::chrono::duration<double> elapsed_seconds = end - start;
instance.elapsed_seconds = elapsed_seconds;
std::cout << "TOTAL STATES IN LOOP: " << instance.factbases.size() << std::endl;
}
return instance;

View File

@ -20,7 +20,7 @@
//Edge::Edge(int iFrom, int iTo, Exploit &ex, AssetGroup &ag)
// : from_node(iFrom), to_node(iTo), exploit(ex), assetGroup(ag), deleted(false) {}
Edge::Edge(size_t iFrom, size_t iTo, Exploit &ex, AssetGroup &ag)
Edge::Edge(int iFrom, int iTo, Exploit &ex, AssetGroup &ag)
: from_node(iFrom), to_node(iTo), exploit(ex), assetGroup(ag), deleted(false) {}
Edge::Edge()
@ -44,7 +44,7 @@ bool Edge::is_deleted() { return deleted; }
//}
size_t Edge::get_from_id()
int Edge::get_from_id()
{
return from_node;
}
@ -56,7 +56,7 @@ size_t Edge::get_from_id()
//}
size_t Edge::get_to_id()
int Edge::get_to_id()
{
return to_node;
}
@ -86,6 +86,14 @@ int Edge::set_id() {
return id;
}
void Edge::force_from_id(int i) {
from_node = i;
}
void Edge::force_to_id(int i) {
to_node = i;
}
int Edge::edge_current_id = 0;
/**

View File

@ -62,8 +62,8 @@ class Edge {
class Edge {
static int edge_current_id;
int id;
size_t from_node;
size_t to_node;
int from_node;
int to_node;
Exploit exploit;
AssetGroup assetGroup;
bool deleted;
@ -77,7 +77,7 @@ class Edge {
}
public:
Edge(size_t, size_t, Exploit &, AssetGroup &);
Edge(int, int, Exploit &, AssetGroup &);
Edge();
std::string get_query();
@ -85,8 +85,10 @@ class Edge {
int get_id();
int set_id();
size_t get_from_id();
size_t get_to_id();
int get_from_id();
int get_to_id();
void force_from_id(int i);
void force_to_id(int i);
int get_exploit_id();
void set_deleted();
bool is_deleted();

View File

@ -49,12 +49,20 @@ void Factbase::force_set_id(size_t i) {
/**
* @return The current Factbase ID.
*/
size_t Factbase::get_id() const { return id; }
int Factbase::get_id() const { return id; }
std::tuple<std::vector<Quality>, std::vector<Topology>> Factbase::get_facts_tuple() const {
return std::make_tuple(qualities, topologies);
}
void Factbase::reset_curr_id() {
current_id = 0;
}
void Factbase::reset_curr_id(int i){
current_id = i;
}
/**
* @brief Searches for a Quality in the Factbase.
* @details Returns true if the Quality is found and false otherwise.
@ -168,13 +176,13 @@ void Factbase::delete_topology(Topology &t) {
* @param factlist The current Keyvalue
* @return The hash of the Factbase
*/
size_t Factbase::hash(Keyvalue &factlist) const {
size_t Factbase::hash(Keyvalue &factlist){
// size_t hash = 0xf848b64e; // Random seed value
// size_t seed = 0x0c32a12fe19d2119;
size_t seed = 0;
std::set<size_t> factset_q;
std::transform(qualities.begin(), qualities.end(), std::inserter(factset_q, factset_q.end()),
[&](const Quality &q) -> size_t {
[&](Quality &q) -> size_t {
return q.encode(factlist).enc;});
std::for_each(factset_q.begin(), factset_q.end(),

View File

@ -29,10 +29,10 @@ class NetworkState;
*/
class Factbase {
static int current_id;
//int current_id;
friend std::ostream & operator << (std::ostream &os, const Factbase &fb);
friend class boost::serialization::access;
size_t id;
int id;
int qsize;
int tsize;
@ -46,8 +46,8 @@ class Factbase {
template<class Archive>
void serialize(Archive &ar, const unsigned int /* file_version */){
ar & qualities & topologies & id;
//ar & current_id & id & qsize & tsize & qualities & topologies;
//ar & qualities & topologies & id;
ar & current_id & id & qsize & tsize & qualities & topologies;
//ar & qualities;
//ar & topologies;
}
@ -74,9 +74,10 @@ class Factbase {
void set_id();
void force_set_id(int i);
void force_set_id(size_t i);
size_t get_id() const;
size_t hash(Keyvalue &factlist) const;
void reset_curr_id();
void reset_curr_id(int i);
int get_id() const;
size_t hash(Keyvalue &factlist);
int get_size();
};

View File

@ -42,7 +42,7 @@ void NetworkState::force_set_id(int i) { factbase.force_set_id(i); }
/**
* @return The ID of the NetworkState
*/
size_t NetworkState::get_id() { return factbase.get_id(); }
int NetworkState::get_id() { return factbase.get_id(); }
int NetworkState::get_size() { return factbase.get_size(); }
@ -57,10 +57,17 @@ const Factbase &NetworkState::get_factbase() const { return factbase; }
* @param factlist The current Keyvalue
* @return The hash of the Factbase
*/
size_t NetworkState::get_hash(Keyvalue &factlist) const {
size_t NetworkState::get_hash(Keyvalue &factlist){
return factbase.hash(factlist);
}
void NetworkState::reset_curr_id(){
factbase.reset_curr_id();
}
void NetworkState::reset_curr_id(int i){
factbase.reset_curr_id(i);
}
/**
* @brief Adds all unique Quality elements of a vector to the Factbase
*

View File

@ -53,13 +53,14 @@ class NetworkState {
NetworkState();
const Factbase &get_factbase() const;
size_t get_hash(Keyvalue &factlist) const;
size_t get_hash(Keyvalue &factlist);
void set_id();
void force_set_id(int i);
size_t get_id();
int get_id();
int get_size();
void reset_curr_id();
void reset_curr_id(int i);
void add_qualities(std::vector<Quality> q);
void add_topologies(std::vector<Topology> t);
@ -73,7 +74,7 @@ class NetworkState {
void delete_quality(Quality &q);
void delete_topology(Topology &t);
//bool operator==(NetworkState& foo) {return get_id() == foo.get_id();}
//bool operator<(const NetworkState& foo) const {return get_hash(Keyvalue &factlist) < foo.get_hash(Keyvalue &factlist);}
};
BOOST_SERIALIZATION_ASSUME_ABSTRACT(NetworkState)

View File

@ -16,13 +16,13 @@
//ORIGINAL QUALITY
//Quality::Quality(int asset, std::string qualName, std::string o, std::string qualValue, Keyvalue &facts)
// : asset_id(asset), name(std::move(qualName)), op(std::move(o)), value(std::move(qualValue)), encoded(encode(facts).enc) {}
Quality::Quality(int asset, std::string qualName, std::string o, std::string qualValue, Keyvalue &facts)
: asset_id(asset), name(std::move(qualName)), op(std::move(o)), value(std::move(qualValue)), encoded(encode(facts).enc) {}
//TESTING ENCODING ERROR
Quality::Quality(int asset, std::string qualName, std::string o, std::string qualValue, Keyvalue &facts)
: asset_id(asset), name(std::move(qualName)), op(std::move(o)), value(std::move(qualValue)), encoded((size_t)facts.size()) {}
//Quality::Quality(int asset, std::string qualName, std::string o, std::string qualValue, Keyvalue &facts)
// : asset_id(asset), name(std::move(qualName)), op(std::move(o)), value(std::move(qualValue)), encoded((size_t)facts.size()) {}
Quality::Quality()
{
@ -80,37 +80,20 @@ void Quality::print() const {
*
* @return The EncodedQuality
*/
EncodedQuality Quality::encode(const Keyvalue &kv_facts) const {
EncodedQuality Quality::encode(Keyvalue &kv_facts) {
EncodedQuality qual{};
qual.dec.asset_id = asset_id;
qual.dec.attr = kv_facts[name];
//std::cout<<"DONE PART 2"<<std::endl;
//std::cout<<"ASSET ID " <<asset_id<<std::endl;
//std::cout<<" NAME" <<kv_facts[name]<<std::endl;
//std::cout<<"VALUE " <<value<<std::endl;
//THIS IS THE ONE THAT THROWS THE OUT OF RANGE FOR THE UNORDERED MAP
// std::cout<<"Break"<<std::endl;
//std::cout<<"ATTEMPTING TO FIND " <<value<<std::endl;
//std::cout<<"SIZE FROM ENCODE " <<kv_facts.size() << std::endl;
//std::unordered_map<std::string,int>::const_iterator got = kv_facts.hash_table.find(value);
//if(got==kv_facts.hash_table.end())
// std::cout<< " NOT FOUND"<<std::endl;
//else
// std::cout<<" YES FOUND " <<std::endl;
//std::cout<<"NAME "<<name<<"VALUE " <<value<< "KEY OR W/E "<< kv_facts[value]<<std::endl;
// std::cout<<"SIZE FROM ENCODE "<<kv_facts.size()<<std::endl;
//try {
std::unordered_map<std::string,int>::const_iterator got = kv_facts.hash_table.find(value);
if(got == kv_facts.hash_table.end()){
kv_facts.hash_table[value] = kv_facts.length;
kv_facts.str_vector.push_back(value);
kv_facts.length++;
}
qual.dec.val = kv_facts[value];
//}
//catch (std::out_of_range outofrange)
// kv_facts.hash_table.insert(value, kv_facts.size()+1);
//kv_facts.length+=1;
//std::cout<<"YES" << std::endl;
//qual.dec.val=kv_facts[value];
//std::cout<<"DONE PART 3"<<std::endl;
return qual;
}

View File

@ -89,7 +89,7 @@ class Quality {
size_t encoded;
EncodedQuality encode(const Keyvalue &kv_facts) const;
EncodedQuality encode(Keyvalue &kv_facts);
friend class Factbase;

View File

@ -452,11 +452,12 @@ int main(int argc, char *argv[]) {
bool use_redis = false;
bool use_postgres = false;
bool mpi_subgraphing = false;
bool mpi_tasking = false;
double alpha = 0.5;
int opt;
while ((opt = getopt(argc, argv, "rb:g:dhc:l:n:x:t:q:pa:sm:z:")) != -1) {
while ((opt = getopt(argc, argv, "rb:g:dhc:l:n:x:t:q:pa:sem:z:")) != -1) {
switch (opt) {
case 'g':
should_graph = true;
@ -503,6 +504,9 @@ int main(int argc, char *argv[]) {
case 's':
mpi_subgraphing = true;
break;
case 'e':
mpi_tasking = true;
break;
case 'm':
mpi_nodes = atoi(optarg);
break;
@ -688,12 +692,12 @@ int main(int argc, char *argv[]) {
if (mpi_subgraphing && world.size() > 3)
postinstance = gen.sg_generate(batch_process, batch_size, thread_count, init_qsize, alpha, world, depth_limit); //The method call to generate the attack graph, defined in ag_gen.cpp.
else if (world.size() > 1)
else if (mpi_tasking && world.size() > 1)
postinstance = gen.generate(batch_process, batch_size, thread_count, init_qsize, alpha, world); //The method call to generate the attack graph, defined in ag_gen.cpp.
else
postinstance = gen.single_generate(batch_process, batch_size, thread_count, init_qsize, alpha, world); //The method call to generate the attack graph, defined in ag_gen.cpp.
world.barrier();
//world.barrier();
//Serialization Unit Testing on Postinstance Data
//serialization_unit_testing(postinstance, world);
world.barrier();
@ -704,8 +708,9 @@ int main(int argc, char *argv[]) {
//std::cout << "# of factbase_item " <<postinstance.factbase_items.size()<<std::endl;
if(world.rank() == 0){
std::cout << "Saving to Database." << std::endl;
save_ag_to_db(postinstance, true);
std::cout << "PostInstance states: " << postinstance.factbases.size() << std::endl;
std::cout << "Total Edges: " << get_num_edges() << std::endl;
std::cout << "Total Time: " << postinstance.elapsed_seconds.count() << " seconds\n";

View File

@ -508,19 +508,35 @@ int send_check(boost::mpi::communicator &world, int curr_node){
}
void state_merge(std::vector<Factbase> node_factbases, std::vector<Edge> node_edges,\
std::unordered_map<size_t, int> &hash_map, AGGenInstance &instance, double mem_threshold, mpi::communicator &world){
std::unordered_map<size_t, int> &hash_map, AGGenInstance &instance, double mem_threshold,\
mpi::communicator &world, int last_known_id){
auto tot_sys_mem = getTotalSystemMemory();
for(auto fb : node_factbases){
//auto fb = ns.get_factbase();
//std::cout << "Started Task 3." << std::endl;
auto hash_num = fb.get_id();
auto hash_num = fb.hash(instance.facts);
//although local frontier is updated, the global hash is also updated to avoid testing on explored states.
if (hash_map.find(hash_num) == hash_map.end()) {
auto old_id = fb.get_id();
fb.reset_curr_id(last_known_id+1);
fb.set_id();
auto new_id = fb.get_id();
last_known_id = new_id;
for(auto ed: node_edges){
if (ed.get_from_id() == old_id){
ed.force_from_id(new_id);
}
if (ed.get_to_id() == old_id){
ed.force_to_id(new_id);
}
}
instance.factbases.push_back(fb);
hash_map.insert(std::make_pair(fb.get_id(), fb.get_id()));
hash_map.insert(std::make_pair(fb.hash(instance.facts), fb.get_id()));
//See memory usage. If it exceeds the threshold, store new states in the DB
double i_alpha = 0.0;
@ -549,7 +565,7 @@ void state_merge(std::vector<Factbase> node_factbases, std::vector<Edge> node_ed
}
}
//This does add duplicate edges
//This does add duplicate edges - taken care of through graphviz' "strict" graphing
for (auto ed : node_edges){
instance.edges.push_back(ed);
}

View File

@ -24,6 +24,7 @@ void task_four(NetworkState &new_state);
int send_check(boost::mpi::communicator &world, int curr_node);
void state_merge(std::vector<Factbase> node_factbases, std::vector<Edge> node_edges,\
std::unordered_map<size_t, int> &hash_map, AGGenInstance &instance, double mem_threshold, mpi::communicator &world);
std::unordered_map<size_t, int> &hash_map, AGGenInstance &instance, double mem_threshold,\
mpi::communicator &world, int last_known_id);
#endif //TASKS_H