#Project 1 for the University of Tulsa's CS-7313 Adv. AI Course #Approximate Inference Methods for Bayesian Networks #Professor: Dr. Sen, Fall 2021 #Noah Schrick - 1492657 import json import random import math import os import numpy as np from shutil import copyfile import argparse from csv import writer import re from timeit import default_timer as timer def main(): parser = argparse.ArgumentParser() parser.add_argument("-s", "--size", dest = "NETWORK_SIZE", default = 5, type = int, help = "Number of nodes in the network") parser.add_argument("-t", "--type", dest = "NETWORK_TYPE", default = "dag", help = "Type of network. dag or polytree") parser.add_argument("-n", "--num", dest = "NUM_SAMPLES", default = 100, type = int, help = "Number of samples to take") parser.add_argument("-a", "--alpha", dest = "ALPHA", default = 0.95, type = float, help = "Metropolis-Hastings split probabilities. Must be between 0 and 1.") args = parser.parse_args() #Generate a new BN. Specify type and number of nodes in network print("Generating a Bayesian Network of type", args.NETWORK_TYPE, "and with", args.NETWORK_SIZE, "nodes.") gen_json(args.NETWORK_TYPE, args.NETWORK_SIZE) #Get our BN bayes_net = import_bayes() #Generate random evidence E = gen_ev(bayes_net) print("Generating random evidence:", E) #Generate a random query that is not an evidence variable in the form of {var : val} X = gen_query(E, bayes_net) print("Generating random query:", X) print() query_var = (list(X.items())[0][0]) upstream = True #Determine if evidence is mostly upstream or downstream if(check_upstream(query_var, E, bayes_net)): upstream = False #Get probability of query from LW given evidence start = timer() LW_prob = likelihood_weighting(X, E, bayes_net, args.NUM_SAMPLES) end = timer() print("Probability of", X, "given", E, "with the LW algorithm and", args.NUM_SAMPLES, "samples is:") print(LW_prob) LW_time = (end-start) start = timer() GS_prob, tmp = gibbs_sampling(X, E, bayes_net, args.NUM_SAMPLES) end = timer() print("Probability of", X, "given", E, "with the GS algorithm and", args.NUM_SAMPLES, "samples is:") print(GS_prob) GS_time = (end-start) #Get probability of query from MH given evidence start = timer() MH_prob = metropolis_hastings(X, E, bayes_net, args.NUM_SAMPLES, args.ALPHA) end = timer() print("Probability of", X, "given", E, "with the MH algorithm and", args.NUM_SAMPLES, "samples, and a", args.ALPHA*100, "/", 100-(args.ALPHA*100), "split is:") print(MH_prob) MH_time = (end-start) query_val = (list(X.values())[0]) if(args.NETWORK_SIZE < 15): start = timer() exact_total = run_exact(query_var) end = timer() Exact_time = (end-start) else: Exact_time = 0 Exact_prob = 0 print("Probability of", X, "with the Variable Elimination algorithm is:") #Do extremely sloppy string parsing that I'm too lazy to fix if args.NETWORK_SIZE < 15: if(query_val): match = re.search(r'\bP_True\b', exact_total) offset = 9 else: match = re.search(r'\bP_False\b', exact_total) offset = 10 start = match.span()[0] if(query_val and args.NETWORK_SIZE < 15): Exact_prob = exact_total[(start+offset):].splitlines()[0] print(Exact_prob) else: if args.NETWORK_SIZE < 15: result = exact_total[(start+offset):] res_split = result.split() Exact_prob = res_split[0] print(Exact_prob) to_write = [args.NETWORK_TYPE, args.NETWORK_SIZE, args.ALPHA, args.NUM_SAMPLES, LW_prob, LW_time, GS_prob, GS_time, MH_prob, MH_time, Exact_prob, Exact_time, upstream] append_csv(to_write) def append_csv(list_of_ele): with open('results.csv', 'a+', newline='') as file: csv_writer = writer(file) csv_writer.writerow(list_of_ele) file.close() def run_exact(query): #Get dir of exact_inference os.chdir("./exact_inference") dirname = os.getcwd() #Go back up and into the gen_bn folder os.chdir("..") os.chdir("./gen_bn") #copy the file to the exact_inference folder copyfile("bn.json", dirname+'/bn.json') os.chdir("../exact_inference") #Run the exact_inference on the query variable output = os.popen('python exact_inference.py -f bn.json' + ' ' + '-q' + ' ' + str(query)).read() os.chdir("..") return output #Generate a new BN. #Input: Type ("dag", or "polytree") #Input: Number of nodes in the network def gen_json(type, num_nodes): os.chdir("./gen_bn") os.system('python gen_bn.py' + ' ' + type + ' ' + str(num_nodes)) os.chdir("..") #Import the BN from the json def import_bayes(): with open ("gen_bn/bn.json") as json_file: bayes_json = json.load(json_file) json_file.close() return bayes_json #Generate a random set of evidence def gen_ev(bayes_net): total_nodes = len(bayes_net) #Arbitrarily, let's only generate total_nodes/2 (rounded up) evidence variables at most, but at least 1 num_ev = random.randint(1, int(math.ceil(total_nodes/2))) fixed_ev = [] #Go through and generate nodes that will be fixed for i in range(num_ev): fixed_var = random.randint(0, total_nodes-1) if fixed_var not in fixed_ev: fixed_ev.append(fixed_var) #Now generate random values for the ev #Randomly generate a double. >=0.5 will be "True", <0.5 will be "False" E = {} for i in fixed_ev: val_p = random.random() if val_p >= 0.5: E[str(i)] = True else: E[str(i)] = False return E #Given the evidence variables and the bayes net, generate a random variable to query and its value def gen_query(ev, bayes_net): possible_vars = list(range(len(bayes_net))) ev_vars = [*ev] for e in ev_vars: if int(e) in possible_vars: possible_vars.remove(int(e)) query = str(random.choice(possible_vars)) rand_prob = random.random() if rand_prob >= 0.5: val = True else: val = False return {query : val} #Checks if node has parents. If not, it is a root def is_root(node, BN): return (BN[node]["parents"]) == [] #Return a list of the root nodes def get_root(BN): roots = [] for node in range(len(BN)): if ((BN[str(node)]["parents"]) == []): roots.append(str(node)) return roots def is_ancestor(query, ancestor, BN): parsed = [] parents = get_parents(query, BN) if ancestor in parents: return True else: for parent in parents: if is_root(str(parent), BN): parsed.append(parent) continue else: gparents = get_parents(parent, BN) if ancestor in gparents: return True for gparent in gparents: if gparent not in parsed: res = is_ancestor(query, gparent, BN) return res return False def check_upstream(node, e, bayes_net): downstream = [] roots = get_root(bayes_net) curr_list = [] if node in roots: return False else: for ev in e: if is_ancestor(ev, node, bayes_net): downstream.append(ev) if (len(downstream) > (len(e)/2)): return True else: return False #Get parents of a node def get_parents(node, BN): return BN[str(node)]["parents"] """NOTES""" #(bayes_json["x"]): the information about node x (an int) #(bayes_json["x"]["parents"] the information about node x's parents #bayes_json["x"]["prob"][0][0] returns the first set of truth table (0, 0), where [1][0] is the second (0,1) #bayes_json["x"]["prob"][parent][1] returns probability of the set evidence variable #E is a dict in the form of {"Node" : Value} #Compute the estimate of P(X|e), where X is the query variable, and e is the observed value for variables E def likelihood_weighting(X, e, bayes_net, num_samples, MH=0): W = {} for i in range(num_samples): w = 1 #Holds all the info on the samples. EX: ~b, ~e, a, ~j, m samples = {} #Get all the roots to save traversion costs root_nodes = get_root(bayes_net) #Go through all the roots to get probabilities for root in root_nodes: #If the root is an evidence variables if root in e and root not in samples: #Just set the value to the observed value samples[root] = e[root] #Adjust the weight accordingly w = w * bayes_net[root]["prob"][0][1] else: #Otherwise, sample randomly if root not in samples: rand_prob = random.random() if rand_prob >= bayes_net[root]["prob"][0][1]: samples[root] = True else: samples[root] = False #Now go through the BN for non-root nodes for node in bayes_net: if node not in samples: #Get the probability, updated sample dict, and weight samples, prob, w = get_probability(str(node), samples, e, bayes_net, w) #We now need to write to W #If this sample is already in W, don't add a new sample - only adjust the weight written = False for tmp in range(len(W)): #If sample is in W if samples in W[tmp].values(): #Pull the weight that's associated with the sample key = list(W[tmp].items())[0][0] #Add the new weight to the existing weight new_key = key + w #Store it all back into W W[tmp] = {new_key : samples} #Make note that we've already written to W in this loop, so we don't write it again written = True #If the sample wasn't already in W, put it in there now if not written: W[len(W)] = {w : samples} #use for MH alg if(MH is not 0): return W prob = compute_prob(X, e, W) return prob #Return the probability of a node and the value dict, given the current evidence and fixed evidence #Uses recursion to pull probabilites and values down through the network def get_probability(node, samples, ev, bayes_net, w, G=0): parents = get_parents(node, bayes_net) for parent in parents: #If we already know the value of the parent, no need to reobtain if str(parent) in samples: continue #If we don't know the value, then we need to get it else: gparents = get_parents(parent, bayes_net) #If we have all of parent's parents, then we can just get the probability if all(eles in samples for eles in gparents): samples, prob, w = translate_ev(gparents, parent, ev, samples, bayes_net, w) #Otherwise, we need to get values for the ancestor nodes - use recursion else: for gparent in gparents: if gparent not in samples: get_probability(gparent, samples, ev, bayes_net, w) samples, prob, w = translate_ev(gparents, parent, ev, samples, bayes_net, w) #Now that we have all the parents' values, we can get the node value, probability, and update samples samples, prob, w = translate_ev(parents, node, ev, samples, bayes_net, w, G) return samples, prob, w #Given a node and its parents, determine the node's value and it's probability def translate_ev(parents, node, ev, samples, bayes_net, w, G=0): #If G=1, it means we already set the state space to specific values. #Meaning, we only need to compute the prob of that happening, NOT set values #Call a different function instead #Sort in ascending order parents.sort() node = str(node) value_list = [] for parent in parents: value = samples[str(parent)] value_list.append(value) #See if this is an evidence node if node in ev: samples[node] = ev[node] get_weight = True else: get_weight = False #The truth table has 2^parents entries for i in range(2**len(parents)): #If the truth table matches the value combination we have if bayes_net[str(node)]["prob"][i][0] == value_list: #Sample randomly rand_prob = random.random() table_prob = bayes_net[str(node)]["prob"][i][1] if rand_prob >= table_prob: samples[str(node)] = True else: samples[str(node)] = False table_prob = 1-table_prob if(get_weight): w = w * table_prob return samples, table_prob, w def get_children(node, BN): children = [] for x in range(len(BN)): if node in BN[str(x)]["parents"]: children.append(str(x)) return children def product(nums): result = 1 for num in nums: result *= num return result #Given a node and state_space values, get prob from CPT def get_cpt(node, samples, bayes_net): parents = bayes_net[str(node)]["parents"] parents.sort() value_list = [] for parent in parents: value = samples[str(parent)] value_list.append(value) for i in range(2**len(parents)): #If the truth table matches the value combination we have if bayes_net[str(node)]["prob"][i][0] == value_list: return bayes_net[str(node)]["prob"][i][1] def mb(x_node, e, bayes_net, state_space): #x_node = (list(X.items())[0][0]) #print("NODE IS", x_node) state_space[x_node] = True #print("True SS") #print(state_space) x_true = get_cpt(x_node, state_space, bayes_net) * product(get_cpt(child, state_space, bayes_net) for child in get_children(x_node, bayes_net)) #print(x_true) state_space[x_node] = False #print("False SS") #print(state_space) x_false = get_cpt(x_node, state_space, bayes_net) * product(get_cpt(child, state_space, bayes_net) for child in get_children(x_node, bayes_net)) #print(x_false) #print() return x_true, x_false #Given a query, the evidence, and a dict of samples, compute the prob of the query def compute_prob(X, e, W): #Combine the query and ev dicts for easier computation combined_dict = {**X, **e} #Initialize the probability of the query query_prob = 0 ev_prob = 0 #print(W) #Go through all the samples for i in range(len(W)): #Easy way to find all the instances where query var and ev match the samples - use subsets #Convert W[index].values() to a list, since Python 3 returns a "view" of the dictionary with .values() #Then remove the list aspect through [0], and obtain the values from the nested dict through .items() if combined_dict.items() <= list(W[i].values())[0].items(): #Get the weight query_prob+=list(W[i].items())[0][0] #See if evidence is a subset if e.items() <= list(W[i].values())[0].items(): ev_prob+=list(W[i].items())[0][0] if(ev_prob==0): return 0 else: return (query_prob/ev_prob) def gibbs_sampling(X, e, bayes_net, num_samples): #State of the network, init'd with the query and ev #x = {**X, **e} x = {**e} #Counts for each value of X C = {} for j in range(len(bayes_net)): C[str(j)] = 0 #Get a list of non-ev variables to make things easier Z = [] for node in bayes_net: #if node not in e and node not in X: if node not in e: Z.append(node) #Initialize the rest of the network randomly for z in Z: rand_prob = random.random() if rand_prob >=0.5: x[z] = True else: x[z] = False for k in range(num_samples): zi = random.choice(Z) #Reuse the get_prob function, even though we don't care about maintaining a samples list or weight #samples, prob, w = get_probability(str(zi), {}, e, bayes_net, 0) z_true, z_false = mb(str(zi), e, bayes_net, x) #Gen a random number to determine the value based on probability value_prob = random.random() if value_prob >= z_true: x[zi] = True C[zi]+=1 else: x[zi] = False for j in range(len(bayes_net)): C[str(j)]/=num_samples query_var = (list(X.items())[0][0]) query_val = (list(X.values())[0]) #print(C) if(query_val): GS_Prob = C[str(query_var)] else: GS_Prob = 1.00000 - C[str(query_var)] return GS_Prob, x #Function for using log-probabilities for preventing the underflow from MH as advised by Dr. Sen def compute_log(prob): return -0.5 * np.sum(prob ** 2) def acceptance(xprime, x, compute_log): return min(1, np.exp(compute_log(xprime) - compute_log(x))) def metropolis_hastings(X, e, bayes_net, num_samples, alpha): state_space = {**e} Z = [] #Counts for each value of X W = {} C = {} for j in range(len(bayes_net)): C[str(j)] = 0 for node in bayes_net: #if node not in e and node not in X: if node not in e: Z.append(node) #Initialize the rest of the network randomly for z in Z: rand_prob = random.random() if rand_prob >=0.5: state_space[z] = True else: state_space[z] = False #3 runs: 95% to run Gibbs for x', 5% for weighted sample. Then 85/15, 75/25 #100% acceptance for num in range(num_samples): proposal_choice = random.random() #Weighted-Sample if(proposal_choice <= (1-alpha)): tmp = likelihood_weighting(X, e, bayes_net, 1, 1) sample = list(tmp[0].values())[0] #If this sample is already in W, don't add a new sample - only adjust the weight written = False for k in range(len(W)): #If sample is in W if sample in W[k].values(): #Pull the weight that's associated with the sample key = list(W[k].items())[0][0] #Increment count new_key = key + 1 #Store it all back into W W[k] = {new_key : sample} #Make note that we've already written to W in this loop, so we don't write it again written = True #If the sample wasn't already in W, put it in there now if not written: W[len(W)] = {1 : sample} #for key, value in W.items(): # print(key, ' : ', value) #zi = random.choice(Z) #state_space, prob, w = get_probability(zi, state_space, bayes_net, 0) #Gibbs else: zi = random.choice(Z) #Reuse the get_prob function, even though we don't care about maintaining a samples list or weight #samples, prob, w = get_probability(str(zi), {}, e, bayes_net, 0) z_true, z_false = mb(str(zi), e, bayes_net, state_space) #Gen a random number to determine the value based on probability value_prob = random.random() if value_prob >= z_true: state_space[zi] = True C[zi]+=1 else: state_space[zi] = False #If this sample is already in W, don't add a new sample - only adjust the weight written = False for tmp in range(len(W)): #If sample is in W if state_space in W[tmp].values(): #Pull the weight that's associated with the sample key = list(W[tmp].items())[0][0] #Increment count new_key = key + 1 #Store it all back into W W[tmp] = {new_key : state_space} #Make note that we've already written to W in this loop, so we don't write it again written = True #If the sample wasn't already in W, put it in there now if not written: W[len(W)] = {1 : state_space} #for key, value in W.items(): # print(key, ' : ', value) #Combine the query and ev dicts for easier computation combined_dict = {**X, **e} #Initialize the probability of the query query_prob = 0 #Go through all the samples for i in range(len(W)): #Easy way to find all the instances where query var and ev match the samples - use subsets #Convert W[index].values() to a list, since Python 3 returns a "view" of the dictionary with .values() #Then remove the list aspect through [0], and obtain the values from the nested dict through .items() if combined_dict.items() <= list(W[i].values())[0].items(): #Get the weight query_prob+=list(W[i].items())[0][0] return (query_prob/num_samples) #MH_prob = compute_prob(X, e, W) #return MH_prob if __name__ == '__main__': main()