1069 lines
47 KiB
C++
Executable File
1069 lines
47 KiB
C++
Executable File
// ag_gen.cpp contains the methods for building an attack graph and generating
|
|
// an attack graph's exploits and printing them
|
|
|
|
#include <algorithm>
|
|
#include <chrono>
|
|
#include <iostream>
|
|
#include <vector>
|
|
#include <tuple>
|
|
#include <unordered_map>
|
|
#include <omp.h>
|
|
#include <sys/time.h>
|
|
#include <string.h>
|
|
|
|
#include "ag_gen.h"
|
|
|
|
#include "../util/odometer.h"
|
|
#include "../util/db_functions.h"
|
|
#include "../util/avail_mem.h"
|
|
#include "../mpi/tasks.h"
|
|
|
|
#include <boost/archive/tmpdir.hpp>
|
|
#include <boost/archive/text_iarchive.hpp>
|
|
#include <boost/archive/text_oarchive.hpp>
|
|
|
|
#include <boost/serialization/base_object.hpp>
|
|
#include <boost/serialization/utility.hpp>
|
|
#include <boost/serialization/list.hpp>
|
|
#include <boost/serialization/assume_abstract.hpp>
|
|
#include <boost/serialization/string.hpp>
|
|
#include <boost/serialization/vector.hpp>
|
|
|
|
#include <boost/mpi.hpp>
|
|
#include <boost/mpi/environment.hpp>
|
|
#include <boost/mpi/communicator.hpp>
|
|
#include <boost/mpi/collectives.hpp>
|
|
|
|
|
|
#include <boost/serialization/is_bitwise_serializable.hpp>
|
|
#include <boost/range/irange.hpp>
|
|
|
|
namespace mpi = boost::mpi;
|
|
|
|
#ifdef REDIS
|
|
|
|
/**
|
|
* @brief Constructor for generator
|
|
* @details Builds a generator for creating attack graphs.
|
|
*
|
|
* @param _instance The initial information for generating the graph
|
|
*/
|
|
AGGen::AGGen(AGGenInstance &_instance, RedisManager &_rman) : instance(_instance), rman(&_rman) {
|
|
rman->clear();
|
|
auto init_quals = instance.initial_qualities;
|
|
auto init_topos = instance.initial_topologies;
|
|
NetworkState init_state(init_quals, init_topos);
|
|
init_state.set_id();
|
|
int init_id = init_state.get_id();
|
|
FactbaseItems init_items =
|
|
make_tuple(make_tuple(init_quals, init_topos), init_id);
|
|
instance.factbases.push_back(init_state.get_factbase());
|
|
instance.factbase_items.push_back(init_items);
|
|
std::string hash = std::to_string(init_state.get_hash(instance.facts));
|
|
// std::cout << "before init insertion" << std::endl;
|
|
rman->insert_factbase(hash, init_id);
|
|
// rman->insert_facts(hash, init_quals, init_topos);
|
|
rman->commit();
|
|
// std::cout << "after init insertion" << std::endl;
|
|
// hash_map.insert(std::make_pair(init_state.get_hash(instance.facts), init_id));
|
|
frontier.push_back(init_state);
|
|
use_redis = true;
|
|
}
|
|
|
|
#endif
|
|
|
|
AGGen::AGGen(AGGenInstance &_instance) : instance(_instance) {
|
|
auto init_quals = instance.initial_qualities;
|
|
auto init_topos = instance.initial_topologies;
|
|
NetworkState init_state(init_quals, init_topos);//instantiate an obj init_state with initial input
|
|
init_state.set_id();
|
|
int init_id = init_state.get_id();
|
|
FactbaseItems init_items =
|
|
make_tuple(make_tuple(init_quals, init_topos), init_id);
|
|
instance.factbases.push_back(init_state.get_factbase());
|
|
instance.factbase_items.push_back(init_items);
|
|
std::string hash = std::to_string(init_state.get_hash(instance.facts));
|
|
hash_map.insert(std::make_pair(init_state.get_hash(instance.facts), init_id));
|
|
frontier.push_back(init_state);
|
|
use_redis = false;
|
|
}
|
|
|
|
/**
|
|
* @brief Generates exploit postconditions
|
|
* @details When an exploit is known to apply to a set of assets,
|
|
* the postconditions must be generated. This is done by iterating
|
|
* through each parameterized fact and inserting the applicable
|
|
* assets.
|
|
*
|
|
* @param group A tuple containing the exploit and applicable assets
|
|
* @return A tuple containing the "real" qualities and "real" topologies
|
|
*/
|
|
std::tuple<std::vector<std::tuple<ACTION_T, Quality>>, std::vector<std::tuple<ACTION_T, Topology>>>
|
|
createPostConditions(std::tuple<Exploit, AssetGroup> &group, Keyvalue &facts) {
|
|
auto ex = std::get<0>(group);
|
|
auto ag = std::get<1>(group);
|
|
|
|
auto perm = ag.get_perm();
|
|
|
|
auto param_postconds_q = ex.postcond_list_q();
|
|
auto param_postconds_t = ex.postcond_list_t();
|
|
|
|
std::vector<std::tuple<ACTION_T, Quality>> postconds_q;
|
|
std::vector<std::tuple<ACTION_T, Topology>> postconds_t;
|
|
|
|
for (auto &postcond : param_postconds_q) {
|
|
auto action = std::get<0>(postcond);
|
|
auto fact = std::get<1>(postcond);
|
|
|
|
Quality q(perm[fact.get_param_num()], fact.name, fact.op,
|
|
fact.value, facts);
|
|
postconds_q.emplace_back(action, q);
|
|
}
|
|
|
|
for (auto &postcond : param_postconds_t) {
|
|
auto action = std::get<0>(postcond);
|
|
auto fact = std::get<1>(postcond);
|
|
|
|
auto dir = fact.get_dir();
|
|
auto prop = fact.get_property();
|
|
auto op = fact.get_operation();
|
|
auto val = fact.get_value();
|
|
|
|
Topology t(perm[fact.get_from_param()],
|
|
perm[fact.get_to_param()], dir, prop, op, val, facts);
|
|
postconds_t.emplace_back(action, t);
|
|
}
|
|
return make_tuple(postconds_q, postconds_t);
|
|
|
|
}
|
|
|
|
/**
|
|
* @brief Generate attack graph
|
|
* @details Begin the generation of the attack graph. The algorithm is as
|
|
* follows:
|
|
*
|
|
* 1. Fetch next factbase to expand from the frontier
|
|
* 2. Fetch all exploits
|
|
* 3. Loop over each exploit to determine if it is applicable.
|
|
* a. Fetch preconditions of the exploit
|
|
* b. Generate all permutations of assets using the Odometer utility
|
|
* c. Apply each permutation of the assets to the preconditions.
|
|
* d. Check if ALL generated preconditions are present in the current
|
|
* factbase. 4a. If all preconditions are found, apply the matching asset group
|
|
* to the postconditions of the exploit. 4b. If not all preconditions are found,
|
|
* break and continue checking with the next exploit.
|
|
* 5. Push the new network state onto the frontier to be expanded later.
|
|
*/
|
|
AGGenInstance &AGGen::generate(bool batch_process, int batch_size, int numThrd, int initQSize,\
|
|
double mem_threshold, boost::mpi::communicator &world) {
|
|
|
|
//Init all Nodes with these variables
|
|
std::vector<Exploit> exploit_list = instance.exploits;
|
|
|
|
//Create a vector that contains all the groups of exploits to be fired synchonously
|
|
std::vector<std::string> ex_groups;
|
|
for (const auto &ex : exploit_list) {
|
|
//If the group isn't already in the vector
|
|
if(!(std::find(ex_groups.begin(), ex_groups.end(), ex.get_group()) !=ex_groups.end())) {
|
|
//Don't include the "no" group
|
|
if(ex.get_group()!="null")
|
|
ex_groups.emplace_back(ex.get_group());
|
|
}
|
|
|
|
}
|
|
|
|
//Print out the groups if desired
|
|
std::cout <<"\nThere are "<<ex_groups.size()<<" groups: ";
|
|
for(int i=0; i<ex_groups.size(); i++){
|
|
std::cout<<ex_groups[i] << ". ";
|
|
}
|
|
std::cout<<"\n";
|
|
|
|
auto counter = 0;
|
|
auto start = std::chrono::system_clock::now();
|
|
|
|
unsigned long esize = exploit_list.size();
|
|
printf("esize: %lu\n", esize);
|
|
bool save_queued = false;
|
|
|
|
if (world.rank() == 0){
|
|
std::cout << "Generating Attack Graph" << std::endl;
|
|
}
|
|
|
|
std::unordered_map<size_t, PermSet<size_t>> od_map;
|
|
size_t assets_size = instance.assets.size();
|
|
for (const auto &ex : exploit_list) {
|
|
size_t num_params = ex.get_num_params();
|
|
if (od_map.find(num_params) == od_map.end()) {
|
|
Odometer<size_t> od(num_params, assets_size);
|
|
od_map[num_params] = od.get_all();
|
|
}
|
|
}
|
|
/*
|
|
//might be where to apply parallelization.
|
|
while (frontier.size()<initQSize){//while starts, test multiple thread case THIS WAS THE ONE MING USED
|
|
//while (frontier.size()!=0){//while starts, test single thread case
|
|
// while(!frontier.empty()) {
|
|
auto current_state = frontier.back();
|
|
auto current_hash = current_state.get_hash(instance.facts);
|
|
frontier.pop_back();
|
|
std::vector<std::tuple<Exploit, AssetGroup>> appl_exploits;
|
|
for (size_t i = 0; i < esize; i++) {//for loop for applicable exploits starts
|
|
auto e = exploit_list.at(i);
|
|
size_t num_params = e.get_num_params();
|
|
auto preconds_q = e.precond_list_q();
|
|
auto preconds_t = e.precond_list_t();
|
|
auto perms = od_map[num_params];
|
|
std::vector<AssetGroup> asset_groups;
|
|
for (auto perm : perms) {
|
|
std::vector<Quality> asset_group_quals;
|
|
std::vector<Topology> asset_group_topos;
|
|
asset_group_quals.reserve(preconds_q.size());
|
|
asset_group_topos.reserve(preconds_t.size());
|
|
for (auto &precond : preconds_q) {
|
|
asset_group_quals.emplace_back(
|
|
perm[precond.get_param_num()], precond.name, precond.op,
|
|
precond.value, instance.facts);
|
|
}
|
|
for (auto &precond : preconds_t) {
|
|
auto dir = precond.get_dir();
|
|
auto prop = precond.get_property();
|
|
auto op = precond.get_operation();
|
|
auto val = precond.get_value();
|
|
|
|
asset_group_topos.emplace_back(
|
|
perm[precond.get_from_param()],
|
|
perm[precond.get_to_param()], dir, prop, op, val, instance.facts);
|
|
}
|
|
|
|
asset_groups.emplace_back(asset_group_quals, asset_group_topos,
|
|
perm);
|
|
}
|
|
auto assetgroup_size = asset_groups.size();
|
|
for (size_t j = 0; j < assetgroup_size; j++) {
|
|
auto asset_group = asset_groups.at(j);
|
|
for (auto &quality : asset_group.get_hypo_quals()) {
|
|
if (!current_state.get_factbase().find_quality(quality)) {
|
|
goto LOOPCONTINUE;
|
|
}
|
|
}
|
|
for (auto &topology : asset_group.get_hypo_topos()) {
|
|
if (!current_state.get_factbase().find_topology(topology)) {
|
|
goto LOOPCONTINUE;
|
|
}
|
|
}
|
|
//MING DID NOT HAVE THIS CRITICAL, BUT KYLE DID
|
|
// #pragma omp critical
|
|
{
|
|
auto new_appl_exploit = std::make_tuple(e, asset_group);
|
|
appl_exploits.push_back(new_appl_exploit);
|
|
}
|
|
LOOPCONTINUE:;
|
|
}
|
|
} //for loop for applicable exploits ends
|
|
|
|
auto appl_expl_size = appl_exploits.size();
|
|
for (size_t j = 0; j < appl_expl_size; j++) { //for loop for new states starts
|
|
auto e = appl_exploits.at(j);
|
|
auto exploit = std::get<0>(e);
|
|
auto assetGroup = std::get<1>(e);
|
|
auto postconditions = createPostConditions(e, instance.facts);
|
|
auto qualities = std::get<0>(postconditions);
|
|
auto topologies = std::get<1>(postconditions);
|
|
NetworkState new_state{current_state};
|
|
for(auto &qual : qualities) {
|
|
auto action = std::get<0>(qual);
|
|
auto fact = std::get<1>(qual);
|
|
switch(action) {
|
|
case ADD_T:
|
|
new_state.add_quality(fact);
|
|
break;
|
|
case UPDATE_T:
|
|
new_state.update_quality(fact);
|
|
break;
|
|
case DELETE_T:
|
|
new_state.delete_quality(fact);
|
|
break;
|
|
}
|
|
}
|
|
for(auto &topo : topologies) {
|
|
auto action = std::get<0>(topo);
|
|
auto fact = std::get<1>(topo);
|
|
switch(action) {
|
|
case ADD_T:
|
|
new_state.add_topology(fact);
|
|
break;
|
|
case UPDATE_T:
|
|
new_state.update_topology(fact);
|
|
break;
|
|
case DELETE_T:
|
|
new_state.delete_topology(fact);
|
|
break;
|
|
}
|
|
}
|
|
|
|
auto hash_num = new_state.get_hash(instance.facts);
|
|
if (hash_num == current_hash)
|
|
continue;
|
|
if (hash_map.find(hash_num) == hash_map.end()) {
|
|
new_state.set_id();
|
|
auto facts_tuple = new_state.get_factbase().get_facts_tuple();
|
|
FactbaseItems new_items =
|
|
std::make_tuple(facts_tuple, new_state.get_id());
|
|
instance.factbase_items.push_back(new_items);
|
|
instance.factbases.push_back(new_state.get_factbase());
|
|
hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id()));
|
|
frontier.emplace_front(new_state);
|
|
Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup);
|
|
ed.set_id();
|
|
instance.edges.push_back(ed);
|
|
counter++;
|
|
}
|
|
else {
|
|
int id = hash_map[hash_num];
|
|
Edge ed(current_state.get_id(), id, exploit, assetGroup);
|
|
ed.set_id();
|
|
instance.edges.push_back(ed);
|
|
}
|
|
} //for loop for new states ends
|
|
} //while ends
|
|
|
|
//int numThrd=32;
|
|
printf("The number of threads used is %d\n",numThrd);
|
|
printf("The initial QSize is %d\n",initQSize);
|
|
*/
|
|
|
|
int frt_size=frontier.size();
|
|
printf("The actual QSize to start using multiple threads is %d\n",frt_size);
|
|
|
|
|
|
double total_t=0.0;
|
|
//unti:ms
|
|
double total_task0, total_task1, total_task2, total_task3, total_task4 = 0.0;
|
|
|
|
struct timeval t1,t2;
|
|
gettimeofday(&t1,NULL);
|
|
|
|
int num_tasks = 6;
|
|
#pragma omp parallel for num_threads(numThrd) default(none) shared(esize,counter,\
|
|
exploit_list,od_map,frt_size,total_t,t1,t2,std::cout, mem_threshold, num_tasks,\
|
|
ex_groups, world, total_task0, total_task1, total_task2, total_task3, total_task4) schedule(dynamic,1)
|
|
//auto ag_start = std::chrono::system_clock::now();
|
|
for(int k=0;k<frt_size;k++){
|
|
|
|
int mpi_exit = 0;
|
|
double f_alpha = 0.0;
|
|
auto tot_sys_mem = getTotalSystemMemory();
|
|
std::deque<NetworkState> localFrontier;
|
|
localFrontier.emplace_front(frontier[k]);
|
|
|
|
if (mpi_exit == 1){
|
|
#pragma omp cancel for
|
|
}
|
|
|
|
while (!localFrontier.empty() || !unex_empty() || world.rank() > 0){//while starts
|
|
|
|
//Node 0 needs to tell other nodes to continue
|
|
if(world.rank() == 0){
|
|
int dummy = 1;
|
|
for (int w = 1; w < world.size(); w++)
|
|
world.isend(w, 14, dummy);
|
|
}
|
|
|
|
else {
|
|
//If we don't have the go-ahead, check for the Finalize message.
|
|
while(!world.iprobe(0, 14)){
|
|
if(world.iprobe(0, 15)){
|
|
mpi_exit = 1;
|
|
break;
|
|
}
|
|
}
|
|
if(mpi_exit == 1)
|
|
break;
|
|
//Receive the message so it's not just sitting in the queue.
|
|
int dummy;
|
|
world.irecv(0, 14, dummy);
|
|
}
|
|
|
|
//We need to refill the localFrontier with states from the database if it's empty
|
|
//TODO: Investigate if it's faster to read from DB directly into Node 0
|
|
//Or if faster to have worker node read from DB, then comm to Node 0
|
|
if(localFrontier.empty() && world.rank() == 0) {
|
|
struct timeval t01,t02;
|
|
gettimeofday(&t01,NULL);
|
|
task_zero(instance, localFrontier, mem_threshold);
|
|
gettimeofday(&t02,NULL);
|
|
total_task0+=(t02.tv_sec-t01.tv_sec)*1000.0+(t02.tv_usec-t01.tv_usec)/1000.0;
|
|
}
|
|
|
|
//Task 1 Node Allocating
|
|
int alloc;
|
|
if(world.size() <= num_tasks)
|
|
alloc = 1;
|
|
else
|
|
alloc = ceil((world.size()-num_tasks)/2);
|
|
|
|
//Task 2 Node Allocating
|
|
int reduc_factor = 0;
|
|
int two_alloc = alloc;
|
|
if(world.size() % 2 != 0 && world.size() > num_tasks){
|
|
int two_alloc = alloc-1;
|
|
reduc_factor = 1;
|
|
}
|
|
|
|
//Create Communicators
|
|
boost::mpi::communicator tcomm = world.split(world.rank() > 0 && world.rank() <= alloc);
|
|
|
|
boost::mpi::communicator ttwo_comm = world.split(world.rank() == send_check(world, alloc) && world.rank() <= send_check(world, 2*two_alloc));
|
|
|
|
//Task 0 to Task 1 Communication
|
|
if(world.rank() == 0)
|
|
{
|
|
auto current_state = localFrontier.back();
|
|
auto current_hash = current_state.get_hash(instance.facts);
|
|
localFrontier.pop_back();
|
|
for(int l=0; l <= alloc; l++){
|
|
world.isend(send_check(world, world.rank()+l), 20, current_state);
|
|
}
|
|
}
|
|
|
|
//Execute Task 1
|
|
if (world.rank() > 0 && world.rank() <= alloc){
|
|
NetworkState current_state;
|
|
world.recv(mpi::any_source, 20, current_state);
|
|
|
|
struct timeval t11,t12;
|
|
gettimeofday(&t11,NULL);
|
|
task_one(instance, current_state, exploit_list, od_map, alloc, two_alloc, reduc_factor, num_tasks, world, tcomm);
|
|
gettimeofday(&t12,NULL);
|
|
total_task1+=(t12.tv_sec-t11.tv_sec)*1000.0+(t12.tv_usec-t11.tv_usec)/1000.0;
|
|
}
|
|
|
|
//Execute Task 2
|
|
if(world.rank() == send_check(world, alloc) && world.rank() <= send_check(world, 2*two_alloc))
|
|
{
|
|
//Execute Task 2
|
|
struct timeval t21,t22;
|
|
gettimeofday(&t21,NULL);
|
|
task_two(instance, alloc, two_alloc, world, localFrontier, mem_threshold,\
|
|
ttwo_comm, ex_groups, hash_map);
|
|
//Wait for all Task 2 nodes to finish
|
|
ttwo_comm.barrier();
|
|
gettimeofday(&t22,NULL);
|
|
total_task2+=(t22.tv_sec-t21.tv_sec)*1000.0+(t22.tv_usec-t21.tv_usec)/1000.0;
|
|
|
|
|
|
//Have the 0th Task 2 node tell the other world nodes that it's done
|
|
if(ttwo_comm.rank() == 0){
|
|
for (int w = 0; w < world.size(); w++)
|
|
{
|
|
if(w != world.rank() && w > send_check(world, 2*two_alloc))
|
|
{
|
|
world.isend(w, 2, 1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else{
|
|
/*
|
|
One, need to listen for Updates to instances.facts
|
|
MPI TAGS:
|
|
Tag 2 = Task 2 is done
|
|
Tag 3 = New fact
|
|
Tag 4 = Hash New State
|
|
Tag 5 = Critical New State
|
|
*/
|
|
//If we haven't been told that task 2 is finished, and if we still more facts or states to update:
|
|
//while(!world.iprobe(1+alloc, 2) && world.iprobe(mpi::any_source, 3) && world.iprobe(mpi::any_source, 4) && world.iprobe(mpi::any_source, 5))
|
|
|
|
while(!world.iprobe(send_check(world, alloc), 2) || world.iprobe(mpi::any_source, 3) || world.iprobe(mpi::any_source, 4) || world.iprobe(mpi::any_source, 5))
|
|
{
|
|
|
|
//If we get a new fact and new state, update
|
|
if(world.iprobe(mpi::any_source, 3) && world.iprobe(mpi::any_source, 4)){
|
|
NetworkState new_state;
|
|
Quality fact;
|
|
|
|
world.irecv(mpi::any_source, 3, fact);
|
|
world.irecv(mpi::any_source, 4, new_state);
|
|
|
|
instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size();
|
|
instance.facts.length++;
|
|
instance.facts.str_vector.push_back(new_state.compound_assign(fact));
|
|
}
|
|
if(world.rank() == 0){
|
|
if(world.iprobe(mpi::any_source, 5)){
|
|
NetworkState critical_state;
|
|
NetworkState current_state;
|
|
Exploit exploit;
|
|
AssetGroup assetGroup;
|
|
world.irecv(mpi::any_source, 5, critical_state);
|
|
world.irecv(mpi::any_source, 6, current_state);
|
|
world.irecv(mpi::any_source, 10, exploit);
|
|
world.irecv(mpi::any_source, 11, assetGroup);
|
|
|
|
struct timeval t31,t32;
|
|
gettimeofday(&t31,NULL);
|
|
task_three(instance, critical_state, localFrontier, mem_threshold, world,\
|
|
two_alloc, current_state, exploit, assetGroup, hash_map);
|
|
gettimeofday(&t32,NULL);
|
|
total_task3+=(t32.tv_sec-t31.tv_sec)*1000.0+(t32.tv_usec-t31.tv_usec)/1000.0;
|
|
}
|
|
}
|
|
|
|
}
|
|
//Receive the message so it doesn't just sit there
|
|
int ttwo_done;
|
|
world.recv(mpi::any_source, 2, ttwo_done);
|
|
}
|
|
|
|
//Task Four
|
|
if (world.rank() == send_check(world, 2*two_alloc+1)){
|
|
if(world.iprobe(0, 7) || world.iprobe(0, 8)){
|
|
std::vector<Factbase> factbases_dump;
|
|
std::vector<Edge> edges_dump;
|
|
world.irecv(0, 7, factbases_dump);
|
|
world.irecv(0, 8, edges_dump);
|
|
instance.factbases = factbases_dump;
|
|
instance.edges = edges_dump;
|
|
//task_four(instance);
|
|
struct timeval t41,t42;
|
|
gettimeofday(&t41,NULL);
|
|
save_ag_to_db(instance, true);
|
|
gettimeofday(&t42,NULL);
|
|
total_task4+=(t42.tv_sec-t41.tv_sec)*1000.0+(t42.tv_usec-t41.tv_usec)/1000.0;
|
|
}
|
|
}
|
|
} //while ends
|
|
|
|
if(world.rank() == 0){
|
|
for (int w = 1; w < world.size(); w++)
|
|
world.isend(w, 15, 1);
|
|
}
|
|
|
|
if (mpi_exit == 1){
|
|
#pragma omp cancel for
|
|
}
|
|
|
|
auto ag_end= std::chrono::system_clock::now();
|
|
|
|
}//OpenMP block ends
|
|
std::cout << "Process " << world.rank() << " is finishing." << std::endl;
|
|
//Wait for db ops to finish
|
|
world.barrier();
|
|
if(world.rank() == 0){
|
|
gettimeofday(&t2,NULL);
|
|
total_t+=(t2.tv_sec-t1.tv_sec)*1000.0+(t2.tv_usec-t1.tv_usec)/1000.0;
|
|
printf("AG TOOK %lf ms.\n", total_t);
|
|
|
|
auto end = std::chrono::system_clock::now();
|
|
|
|
if(world.rank() != 0){
|
|
if (total_task1 != 0.0)
|
|
world.send(0, 24, total_task1);
|
|
if (total_task2 != 0.0)
|
|
world.send(0, 25, total_task2);
|
|
if (total_task3 != 0.0)
|
|
world.send(0, 26, total_task3);
|
|
if (total_task4 != 0.0)
|
|
world.send(0, 27, total_task4);
|
|
}
|
|
//Don't really care about each node's time, we just want a time (node 1 time ~= node 2 time)
|
|
//Just grab any value.
|
|
else{
|
|
world.irecv(mpi::any_source, 24, total_task1);
|
|
world.irecv(mpi::any_source, 25, total_task2);
|
|
world.irecv(mpi::any_source, 26, total_task3);
|
|
world.irecv(mpi::any_source, 27, total_task4);
|
|
|
|
std::cout << "Task 0 time: " << total_task0 << "ms" << std::endl;
|
|
std::cout << "Task 1 time: " << total_task1 << "ms" << std::endl;
|
|
std::cout << "Task 2 time: " << total_task2 << "ms" << std::endl;
|
|
std::cout << "Task 3 time: " << total_task3 << "ms" << std::endl;
|
|
std::cout << "Task 4 time: " << total_task4 << "ms" << std::endl;
|
|
}
|
|
|
|
|
|
std::chrono::duration<double> elapsed_seconds = end - start;
|
|
instance.elapsed_seconds = elapsed_seconds;
|
|
}
|
|
|
|
return instance;
|
|
}
|
|
|
|
AGGenInstance &AGGen::single_generate(bool batch_process, int batch_num, int numThrd,\
|
|
int initQSize, double mem_threshold, boost::mpi::communicator &world){
|
|
|
|
std::vector<Exploit> exploit_list = instance.exploits;
|
|
|
|
//Create a vector that contains all the groups of exploits to be fired synchonously
|
|
std::vector<std::string> ex_groups;
|
|
for (const auto &ex : exploit_list) {
|
|
//If the group isn't already in the vector
|
|
if(!(std::find(ex_groups.begin(), ex_groups.end(), ex.get_group()) !=ex_groups.end())) {
|
|
//Don't include the "no" group
|
|
if(ex.get_group()!="null")
|
|
ex_groups.emplace_back(ex.get_group());
|
|
}
|
|
|
|
}
|
|
|
|
//Print out the groups if desired
|
|
std::cout <<"\nThere are "<<ex_groups.size()<<" groups: ";
|
|
for(int i=0; i<ex_groups.size(); i++){
|
|
std::cout<<ex_groups[i] << ". ";
|
|
}
|
|
std::cout<<"\n";
|
|
|
|
auto start = std::chrono::system_clock::now();
|
|
|
|
unsigned long esize = exploit_list.size();
|
|
printf("esize: %lu\n", esize);
|
|
bool save_queued = false;
|
|
|
|
std::cout << "Generating Attack Graph" << std::endl;
|
|
|
|
std::unordered_map<size_t, PermSet<size_t>> od_map;
|
|
size_t assets_size = instance.assets.size();
|
|
for (const auto &ex : exploit_list) {
|
|
size_t num_params = ex.get_num_params();
|
|
if (od_map.find(num_params) == od_map.end()) {
|
|
Odometer<size_t> od(num_params, assets_size);
|
|
od_map[num_params] = od.get_all();
|
|
}
|
|
}
|
|
/*
|
|
//might be where to apply parallelization.
|
|
while (frontier.size()<initQSize){//while starts, test multiple thread case THIS WAS THE ONE MING USED
|
|
//while (frontier.size()!=0){//while starts, test single thread case
|
|
// while(!frontier.empty()) {
|
|
auto current_state = frontier.back();
|
|
auto current_hash = current_state.get_hash(instance.facts);
|
|
frontier.pop_back();
|
|
std::vector<std::tuple<Exploit, AssetGroup>> appl_exploits;
|
|
for (size_t i = 0; i < esize; i++) {//for loop for applicable exploits starts
|
|
auto e = exploit_list.at(i);
|
|
size_t num_params = e.get_num_params();
|
|
auto preconds_q = e.precond_list_q();
|
|
auto preconds_t = e.precond_list_t();
|
|
auto perms = od_map[num_params];
|
|
std::vector<AssetGroup> asset_groups;
|
|
for (auto perm : perms) {
|
|
std::vector<Quality> asset_group_quals;
|
|
std::vector<Topology> asset_group_topos;
|
|
asset_group_quals.reserve(preconds_q.size());
|
|
asset_group_topos.reserve(preconds_t.size());
|
|
for (auto &precond : preconds_q) {
|
|
asset_group_quals.emplace_back(
|
|
perm[precond.get_param_num()], precond.name, precond.op,
|
|
precond.value, instance.facts);
|
|
}
|
|
for (auto &precond : preconds_t) {
|
|
auto dir = precond.get_dir();
|
|
auto prop = precond.get_property();
|
|
auto op = precond.get_operation();
|
|
auto val = precond.get_value();
|
|
|
|
asset_group_topos.emplace_back(
|
|
perm[precond.get_from_param()],
|
|
perm[precond.get_to_param()], dir, prop, op, val, instance.facts);
|
|
}
|
|
|
|
asset_groups.emplace_back(asset_group_quals, asset_group_topos,
|
|
perm);
|
|
}
|
|
auto assetgroup_size = asset_groups.size();
|
|
for (size_t j = 0; j < assetgroup_size; j++) {
|
|
auto asset_group = asset_groups.at(j);
|
|
for (auto &quality : asset_group.get_hypo_quals()) {
|
|
if (!current_state.get_factbase().find_quality(quality)) {
|
|
goto LOOPCONTINUE;
|
|
}
|
|
}
|
|
for (auto &topology : asset_group.get_hypo_topos()) {
|
|
if (!current_state.get_factbase().find_topology(topology)) {
|
|
goto LOOPCONTINUE;
|
|
}
|
|
}
|
|
//MING DID NOT HAVE THIS CRITICAL, BUT KYLE DID
|
|
// #pragma omp critical
|
|
{
|
|
auto new_appl_exploit = std::make_tuple(e, asset_group);
|
|
appl_exploits.push_back(new_appl_exploit);
|
|
}
|
|
LOOPCONTINUE:;
|
|
}
|
|
} //for loop for applicable exploits ends
|
|
|
|
auto appl_expl_size = appl_exploits.size();
|
|
for (size_t j = 0; j < appl_expl_size; j++) { //for loop for new states starts
|
|
auto e = appl_exploits.at(j);
|
|
auto exploit = std::get<0>(e);
|
|
auto assetGroup = std::get<1>(e);
|
|
auto postconditions = createPostConditions(e, instance.facts);
|
|
auto qualities = std::get<0>(postconditions);
|
|
auto topologies = std::get<1>(postconditions);
|
|
NetworkState new_state{current_state};
|
|
for(auto &qual : qualities) {
|
|
auto action = std::get<0>(qual);
|
|
auto fact = std::get<1>(qual);
|
|
switch(action) {
|
|
case ADD_T:
|
|
new_state.add_quality(fact);
|
|
break;
|
|
case UPDATE_T:
|
|
new_state.update_quality(fact);
|
|
break;
|
|
case DELETE_T:
|
|
new_state.delete_quality(fact);
|
|
break;
|
|
}
|
|
}
|
|
for(auto &topo : topologies) {
|
|
auto action = std::get<0>(topo);
|
|
auto fact = std::get<1>(topo);
|
|
switch(action) {
|
|
case ADD_T:
|
|
new_state.add_topology(fact);
|
|
break;
|
|
case UPDATE_T:
|
|
new_state.update_topology(fact);
|
|
break;
|
|
case DELETE_T:
|
|
new_state.delete_topology(fact);
|
|
break;
|
|
}
|
|
}
|
|
|
|
auto hash_num = new_state.get_hash(instance.facts);
|
|
if (hash_num == current_hash)
|
|
continue;
|
|
if (hash_map.find(hash_num) == hash_map.end()) {
|
|
new_state.set_id();
|
|
auto facts_tuple = new_state.get_factbase().get_facts_tuple();
|
|
FactbaseItems new_items =
|
|
std::make_tuple(facts_tuple, new_state.get_id());
|
|
instance.factbase_items.push_back(new_items);
|
|
instance.factbases.push_back(new_state.get_factbase());
|
|
hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id()));
|
|
frontier.emplace_front(new_state);
|
|
Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup);
|
|
ed.set_id();
|
|
instance.edges.push_back(ed);
|
|
counter++;
|
|
}
|
|
else {
|
|
int id = hash_map[hash_num];
|
|
Edge ed(current_state.get_id(), id, exploit, assetGroup);
|
|
ed.set_id();
|
|
instance.edges.push_back(ed);
|
|
}
|
|
} //for loop for new states ends
|
|
} //while ends
|
|
|
|
//int numThrd=32;
|
|
printf("The number of threads used is %d\n",numThrd);
|
|
printf("The initial QSize is %d\n",initQSize);
|
|
*/
|
|
int frt_size=frontier.size();
|
|
printf("The actual QSize to start using multiple threads is %d\n",frt_size);
|
|
|
|
|
|
double total_t=0.0;
|
|
struct timeval t1,t2;
|
|
gettimeofday(&t1,NULL);
|
|
//#pragma omp parallel for num_threads(numThrd) default(none) shared(esize,counter,exploit_list,od_map,frt_size,total_t,t1,t2) schedule(dynamic,1)
|
|
#pragma omp parallel for num_threads(numThrd) default(none) shared(esize,exploit_list,od_map,frt_size,total_t,t1,t2,std::cout, mem_threshold, ex_groups) schedule(dynamic,1)
|
|
//auto ag_start = std::chrono::system_clock::now();
|
|
for(int k=0;k<frt_size;k++){
|
|
|
|
double f_alpha = 0.0;
|
|
auto tot_sys_mem = getTotalSystemMemory();
|
|
std::deque<NetworkState> localFrontier;
|
|
localFrontier.emplace_front(frontier[k]);
|
|
while (!localFrontier.empty() || !unex_empty()){//while starts
|
|
//We need to refill the localFrontier with states from the database if it's empty
|
|
if(localFrontier.empty()) {
|
|
std::cout << "Frontier empty, retrieving from database" << std::endl;
|
|
double total_tt = 0.0;
|
|
struct timeval tt1,tt2;
|
|
gettimeofday(&tt1,NULL);
|
|
int retrv_counter = 0;
|
|
|
|
//TODO: One (or a few) larger queries to pull in new states, rather than single queries that pull states one-by-one
|
|
do {
|
|
NetworkState db_new_state = fetch_unexplored(instance.facts);
|
|
localFrontier.emplace_front(db_new_state);
|
|
//alpha = get_alpha();
|
|
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
|
|
retrv_counter += 1;
|
|
}
|
|
//Leave a 30% buffer in alpha
|
|
while((f_alpha <= (mem_threshold * 0.7)) && !unex_empty());
|
|
|
|
std::cout << "Retrieved " << retrv_counter << " factbases from the database." << std::endl;
|
|
gettimeofday(&tt2,NULL);
|
|
total_tt+=(tt2.tv_sec-tt1.tv_sec)*1000.0+(tt2.tv_usec-tt1.tv_usec)/1000.0;
|
|
//printf("Retrieving from db took %lf s.\n", total_tt);
|
|
}
|
|
//std::cout<<"FRONTIER SIZE: "<<localFrontier.size()<<std::endl;
|
|
auto current_state = localFrontier.back();
|
|
auto current_hash = current_state.get_hash(instance.facts);
|
|
localFrontier.pop_back();
|
|
std::vector<std::tuple<Exploit, AssetGroup>> appl_exploits;
|
|
for (size_t i = 0; i < esize; i++) {//for loop for applicable exploits starts
|
|
auto e = exploit_list.at(i);
|
|
size_t num_params = e.get_num_params();
|
|
auto preconds_q = e.precond_list_q();
|
|
auto preconds_t = e.precond_list_t();
|
|
auto perms = od_map[num_params];
|
|
std::vector<AssetGroup> asset_groups;
|
|
for (auto perm : perms) {
|
|
std::vector<Quality> asset_group_quals;
|
|
std::vector<Topology> asset_group_topos;
|
|
asset_group_quals.reserve(preconds_q.size());
|
|
asset_group_topos.reserve(preconds_t.size());
|
|
|
|
|
|
//std::vector<int>::size_type sz;
|
|
//sz=asset_group_quals.capacity();
|
|
for (auto &precond : preconds_q) {
|
|
|
|
//Old quality encode caused this to crash
|
|
asset_group_quals.emplace_back(
|
|
perm[precond.get_param_num()], precond.name, precond.op,
|
|
precond.value, instance.facts);
|
|
}
|
|
for (auto &precond : preconds_t) {
|
|
auto dir = precond.get_dir();
|
|
auto prop = precond.get_property();
|
|
auto op = precond.get_operation();
|
|
auto val = precond.get_value();
|
|
asset_group_topos.emplace_back(
|
|
perm[precond.get_from_param()],
|
|
perm[precond.get_to_param()], dir, prop, op, val, instance.facts);
|
|
}
|
|
asset_groups.emplace_back(asset_group_quals, asset_group_topos,
|
|
perm);
|
|
}
|
|
auto assetgroup_size = asset_groups.size();
|
|
for (size_t j = 0; j < assetgroup_size; j++) {
|
|
auto asset_group = asset_groups.at(j);
|
|
for (auto &quality : asset_group.get_hypo_quals()) {
|
|
if (!current_state.get_factbase().find_quality(quality)) {
|
|
goto LOOPCONTINUE1;
|
|
}
|
|
}
|
|
for (auto &topology : asset_group.get_hypo_topos()) {
|
|
if (!current_state.get_factbase().find_topology(topology)) {
|
|
goto LOOPCONTINUE1;
|
|
}
|
|
}
|
|
{
|
|
auto new_appl_exploit = std::make_tuple(e, asset_group);
|
|
appl_exploits.push_back(new_appl_exploit);
|
|
}
|
|
LOOPCONTINUE1:;
|
|
}
|
|
} //for loop for creating applicable exploits ends
|
|
|
|
std::map<std::string, int> group_fired; //Map to hold fired status per group
|
|
std::map<std::string, std::vector<std::tuple<Exploit, AssetGroup>>> sync_vectors; //Map to hold all group exploits
|
|
|
|
for (auto map_group : ex_groups)
|
|
{
|
|
group_fired.insert(std::pair<std::string, int> (map_group, 0));
|
|
}
|
|
|
|
//Build up the map of synchronous fire exploits
|
|
for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){
|
|
//auto e = appl_exploits.at(itr);
|
|
auto e = *itr;
|
|
auto egroup = std::get<0>(e).get_group();
|
|
|
|
if (egroup != "null"){
|
|
sync_vectors[egroup].push_back(e);
|
|
}
|
|
}
|
|
|
|
//loop through the vector
|
|
for(auto itr=appl_exploits.begin(); itr!=appl_exploits.end(); itr++){
|
|
|
|
auto e = *itr;
|
|
auto exploit = std::get<0>(e);
|
|
auto assetGroup = std::get<1>(e);
|
|
//std::cout<<exploit.get_name()<<std::endl;
|
|
|
|
auto egroup=exploit.get_group();
|
|
|
|
if ((egroup != "null" && group_fired[egroup] == 0) || egroup == "null"){
|
|
NetworkState new_state{current_state};
|
|
std::vector<std::tuple<Exploit, AssetGroup>> sync_exploits;
|
|
|
|
if (egroup == "null")
|
|
sync_exploits.push_back(e);
|
|
|
|
else {
|
|
sync_exploits = sync_vectors[egroup];
|
|
|
|
//TODO: Does not work if only some assets belong to a group. This only works if
|
|
//all assets are in the group
|
|
if(sync_exploits.size() < instance.assets.size()){
|
|
break;
|
|
}
|
|
}
|
|
|
|
for(auto sync_itr=sync_exploits.begin(); sync_itr!=sync_exploits.end(); sync_itr++){
|
|
e = *sync_itr;
|
|
exploit = std::get<0>(e);
|
|
egroup=exploit.get_group();
|
|
assetGroup = std::get<1>(e);
|
|
group_fired[egroup] = 1;
|
|
|
|
auto postconditions = createPostConditions(e, instance.facts);
|
|
auto qualities = std::get<0>(postconditions);
|
|
auto topologies = std::get<1>(postconditions);
|
|
|
|
for(auto &qual : qualities) {
|
|
auto action = std::get<0>(qual);
|
|
auto fact = std::get<1>(qual);
|
|
switch(action) {
|
|
case ADD_T:
|
|
new_state.add_quality(fact);
|
|
break;
|
|
case UPDATE_T:
|
|
new_state.update_quality(fact);
|
|
|
|
//TODO: if fact!= "="" call new_state function, passing fact and instance.facts. Update the quality, and insert it into the hash_table instead of this convoluted mess
|
|
if(fact.get_op()=="+="){
|
|
|
|
//std::cout<<" AFTER UPDATE "<<new_state.compound_assign(fact)<<std::endl;
|
|
std::unordered_map<std::string,int>::const_iterator got = instance.facts.hash_table.find(new_state.compound_assign(fact));
|
|
|
|
//If the value is not already in the hash_table, insert it.
|
|
//Since the compound operators include a value that is not in the original Keyvalue object, the unordered map does not include it
|
|
//As a result, you have to manually add it.
|
|
if(got==instance.facts.hash_table.end()){
|
|
instance.facts.hash_table[new_state.compound_assign(fact)]=instance.facts.size();
|
|
instance.facts.length++;
|
|
instance.facts.str_vector.push_back(new_state.compound_assign(fact));
|
|
}
|
|
}
|
|
break;
|
|
case DELETE_T:
|
|
new_state.delete_quality(fact);
|
|
break;
|
|
}
|
|
}
|
|
|
|
for(auto &topo : topologies) {
|
|
auto action = std::get<0>(topo);
|
|
auto fact = std::get<1>(topo);
|
|
switch(action) {
|
|
case ADD_T:
|
|
new_state.add_topology(fact);
|
|
break;
|
|
case UPDATE_T:
|
|
new_state.update_topology(fact);
|
|
break;
|
|
case DELETE_T:
|
|
new_state.delete_topology(fact);
|
|
break;
|
|
}
|
|
}
|
|
}//Sync. Fire for
|
|
|
|
auto hash_num = new_state.get_hash(instance.facts);
|
|
|
|
if (hash_num == current_hash)
|
|
continue;
|
|
|
|
#pragma omp critical
|
|
//although local frontier is updated, the global hash is also updated to avoid testing on explored states.
|
|
if (hash_map.find(hash_num) == hash_map.end()) {
|
|
new_state.set_id();
|
|
auto facts_tuple = new_state.get_factbase().get_facts_tuple();
|
|
FactbaseItems new_items = std::make_tuple(facts_tuple, new_state.get_id());
|
|
instance.factbase_items.push_back(new_items);
|
|
instance.factbases.push_back(new_state.get_factbase());
|
|
hash_map.insert(std::make_pair(new_state.get_hash(instance.facts), new_state.get_id()));
|
|
|
|
//See memory usage. If it exceeds the threshold, store new states in the DB
|
|
double i_alpha = 0.0;
|
|
//Get the most recent Factbase's size * total number of factbases, rough approximation of *2 to account for factbase_items
|
|
double i_usage = instance.factbases.back().get_size() * instance.factbases.size() * 2 + sizeof(instance.edges[0]) * instance.edges.size();
|
|
|
|
i_alpha = i_usage/tot_sys_mem;
|
|
double f_alpha;
|
|
if (!localFrontier.empty())
|
|
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
|
|
else
|
|
f_alpha = 0.0;
|
|
|
|
if (f_alpha >= (mem_threshold/2)) {
|
|
std::cout << "Frontier Alpha prior to database storing: " << f_alpha << std::endl;
|
|
save_unexplored_to_db(new_state);
|
|
if (!localFrontier.empty())
|
|
f_alpha = (static_cast<double>(localFrontier.size()) * (localFrontier.back().get_size()))/tot_sys_mem;
|
|
else
|
|
f_alpha = 0;
|
|
std::cout << "Frontier Alpha after database storing: " << f_alpha << std::endl;
|
|
}
|
|
|
|
//Store new state in database to ensure proper ordering of the FIFO queue
|
|
else if (!unex_empty()){
|
|
save_unexplored_to_db(new_state);
|
|
}
|
|
|
|
//Otherwise, we can just store in memory
|
|
else {
|
|
localFrontier.emplace_front(new_state);
|
|
}
|
|
|
|
if (i_alpha >= mem_threshold/2){
|
|
std::cout << "Instance Alpha prior to database storing: " << i_alpha << std::endl;
|
|
save_ag_to_db(instance, true);
|
|
|
|
//Clear vectors and free memory
|
|
std::vector<Factbase>().swap(instance.factbases);
|
|
std::vector<FactbaseItems>().swap(instance.factbase_items);
|
|
std::vector<Edge>().swap(instance.edges);
|
|
|
|
i_usage = (sizeof(instance.factbases) + (sizeof(instance.factbases[0]) * instance.factbases.size()) +\
|
|
sizeof(instance.factbase_items) + (sizeof(instance.factbase_items[0]) * instance.factbase_items.size()) +\
|
|
sizeof(instance.edges) + (sizeof(instance.edges[0]) * instance.edges.size()));
|
|
i_alpha = i_usage/tot_sys_mem;
|
|
std::cout << "Instance Alpha after database storing: " << i_alpha << std::endl;
|
|
|
|
}
|
|
|
|
Edge ed(current_state.get_id(), new_state.get_id(), exploit, assetGroup);
|
|
ed.set_id();
|
|
instance.edges.push_back(ed);
|
|
} //END if (hash_map.find(hash_num) == hash_map.end())
|
|
|
|
else {
|
|
int id = hash_map[hash_num];
|
|
Edge ed(current_state.get_id(), id, exploit, assetGroup);
|
|
ed.set_id();
|
|
instance.edges.push_back(ed);
|
|
}
|
|
} //sync fire if
|
|
else
|
|
break;
|
|
} //for loop for new states ends
|
|
} //while frontier ends
|
|
auto ag_end= std::chrono::system_clock::now();
|
|
}//OpenMP block ends
|
|
gettimeofday(&t2,NULL);
|
|
total_t+=(t2.tv_sec-t1.tv_sec)*1000.0+(t2.tv_usec-t1.tv_usec)/1000.0;
|
|
printf("AG TOOK %lf ms.\n", total_t);
|
|
|
|
auto end = std::chrono::system_clock::now();
|
|
|
|
std::chrono::duration<double> elapsed_seconds = end - start;
|
|
instance.elapsed_seconds = elapsed_seconds;
|
|
|
|
return instance;
|
|
}
|