601 lines
21 KiB
Python
601 lines
21 KiB
Python
#Project 1 for the University of Tulsa's CS-7313 Adv. AI Course
|
|
#Approximate Inference Methods for Bayesian Networks
|
|
#Professor: Dr. Sen, Fall 2021
|
|
#Noah Schrick - 1492657
|
|
|
|
import json
|
|
import random
|
|
import math
|
|
import os
|
|
import numpy as np
|
|
from shutil import copyfile
|
|
import argparse
|
|
from csv import writer
|
|
import re
|
|
from timeit import default_timer as timer
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument("-s", "--size", dest = "NETWORK_SIZE", default = 5, type = int, help = "Number of nodes in the network")
|
|
parser.add_argument("-t", "--type", dest = "NETWORK_TYPE", default = "dag", help = "Type of network. dag or polytree")
|
|
parser.add_argument("-n", "--num", dest = "NUM_SAMPLES", default = 100, type = int, help = "Number of samples to take")
|
|
parser.add_argument("-a", "--alpha", dest = "ALPHA", default = 0.95, type = float, help = "Metropolis-Hastings split probabilities. Must be between 0 and 1.")
|
|
|
|
args = parser.parse_args()
|
|
#Generate a new BN. Specify type and number of nodes in network
|
|
print("Generating a Bayesian Network of type", args.NETWORK_TYPE, "and with", args.NETWORK_SIZE, "nodes.")
|
|
gen_json(args.NETWORK_TYPE, args.NETWORK_SIZE)
|
|
|
|
#Get our BN
|
|
bayes_net = import_bayes()
|
|
|
|
#Generate random evidence
|
|
E = gen_ev(bayes_net)
|
|
print("Generating random evidence:", E)
|
|
|
|
#Generate a random query that is not an evidence variable in the form of {var : val}
|
|
X = gen_query(E, bayes_net)
|
|
print("Generating random query:", X)
|
|
print()
|
|
query_var = (list(X.items())[0][0])
|
|
|
|
upstream = True
|
|
#Determine if evidence is mostly upstream or downstream
|
|
if(check_upstream(query_var, E, bayes_net)):
|
|
upstream = False
|
|
#Get probability of query from LW given evidence
|
|
start = timer()
|
|
LW_prob = likelihood_weighting(X, E, bayes_net, args.NUM_SAMPLES)
|
|
end = timer()
|
|
print("Probability of", X, "given", E, "with the LW algorithm and", args.NUM_SAMPLES, "samples is:")
|
|
print(LW_prob)
|
|
LW_time = (end-start)
|
|
|
|
start = timer()
|
|
GS_prob, tmp = gibbs_sampling(X, E, bayes_net, args.NUM_SAMPLES)
|
|
end = timer()
|
|
print("Probability of", X, "given", E, "with the GS algorithm and", args.NUM_SAMPLES, "samples is:")
|
|
print(GS_prob)
|
|
GS_time = (end-start)
|
|
|
|
#Get probability of query from MH given evidence
|
|
start = timer()
|
|
MH_prob = metropolis_hastings(X, E, bayes_net, args.NUM_SAMPLES, args.ALPHA)
|
|
end = timer()
|
|
print("Probability of", X, "given", E, "with the MH algorithm and", args.NUM_SAMPLES, "samples, and a", args.ALPHA*100, "/", 100-(args.ALPHA*100), "split is:")
|
|
print(MH_prob)
|
|
MH_time = (end-start)
|
|
|
|
query_val = (list(X.values())[0])
|
|
|
|
if(args.NETWORK_SIZE < 15):
|
|
start = timer()
|
|
exact_total = run_exact(query_var)
|
|
end = timer()
|
|
Exact_time = (end-start)
|
|
else:
|
|
Exact_time = 0
|
|
Exact_prob = 0
|
|
|
|
print("Probability of", X, "with the Variable Elimination algorithm is:")
|
|
|
|
#Do extremely sloppy string parsing that I'm too lazy to fix
|
|
if args.NETWORK_SIZE < 15:
|
|
if(query_val):
|
|
match = re.search(r'\bP_True\b', exact_total)
|
|
offset = 9
|
|
else:
|
|
match = re.search(r'\bP_False\b', exact_total)
|
|
offset = 10
|
|
start = match.span()[0]
|
|
if(query_val and args.NETWORK_SIZE < 15):
|
|
Exact_prob = exact_total[(start+offset):].splitlines()[0]
|
|
print(Exact_prob)
|
|
else:
|
|
if args.NETWORK_SIZE < 15:
|
|
result = exact_total[(start+offset):]
|
|
res_split = result.split()
|
|
Exact_prob = res_split[0]
|
|
print(Exact_prob)
|
|
|
|
to_write = [args.NETWORK_TYPE, args.NETWORK_SIZE, args.ALPHA, args.NUM_SAMPLES, LW_prob, LW_time, GS_prob, GS_time, MH_prob, MH_time, Exact_prob, Exact_time, upstream]
|
|
append_csv(to_write)
|
|
|
|
def append_csv(list_of_ele):
|
|
with open('results.csv', 'a+', newline='') as file:
|
|
csv_writer = writer(file)
|
|
csv_writer.writerow(list_of_ele)
|
|
file.close()
|
|
|
|
def run_exact(query):
|
|
#Get dir of exact_inference
|
|
os.chdir("./exact_inference")
|
|
dirname = os.getcwd()
|
|
#Go back up and into the gen_bn folder
|
|
os.chdir("..")
|
|
os.chdir("./gen_bn")
|
|
#copy the file to the exact_inference folder
|
|
copyfile("bn.json", dirname+'/bn.json')
|
|
os.chdir("../exact_inference")
|
|
#Run the exact_inference on the query variable
|
|
output = os.popen('python exact_inference.py -f bn.json' + ' ' + '-q' + ' ' + str(query)).read()
|
|
os.chdir("..")
|
|
return output
|
|
|
|
#Generate a new BN.
|
|
#Input: Type ("dag", or "polytree")
|
|
#Input: Number of nodes in the network
|
|
def gen_json(type, num_nodes):
|
|
os.chdir("./gen_bn")
|
|
os.system('python gen_bn.py' + ' ' + type + ' ' + str(num_nodes))
|
|
os.chdir("..")
|
|
|
|
#Import the BN from the json
|
|
def import_bayes():
|
|
with open ("gen_bn/bn.json") as json_file:
|
|
bayes_json = json.load(json_file)
|
|
json_file.close()
|
|
return bayes_json
|
|
|
|
#Generate a random set of evidence
|
|
def gen_ev(bayes_net):
|
|
total_nodes = len(bayes_net)
|
|
#Arbitrarily, let's only generate total_nodes/2 (rounded up) evidence variables at most, but at least 1
|
|
num_ev = random.randint(1, int(math.ceil(total_nodes/2)))
|
|
fixed_ev = []
|
|
#Go through and generate nodes that will be fixed
|
|
for i in range(num_ev):
|
|
fixed_var = random.randint(0, total_nodes-1)
|
|
if fixed_var not in fixed_ev:
|
|
fixed_ev.append(fixed_var)
|
|
#Now generate random values for the ev
|
|
#Randomly generate a double. >=0.5 will be "True", <0.5 will be "False"
|
|
E = {}
|
|
for i in fixed_ev:
|
|
val_p = random.random()
|
|
if val_p >= 0.5:
|
|
E[str(i)] = True
|
|
else:
|
|
E[str(i)] = False
|
|
return E
|
|
|
|
#Given the evidence variables and the bayes net, generate a random variable to query and its value
|
|
def gen_query(ev, bayes_net):
|
|
possible_vars = list(range(len(bayes_net)))
|
|
ev_vars = [*ev]
|
|
for e in ev_vars:
|
|
if int(e) in possible_vars:
|
|
possible_vars.remove(int(e))
|
|
query = str(random.choice(possible_vars))
|
|
rand_prob = random.random()
|
|
if rand_prob >= 0.5:
|
|
val = True
|
|
else:
|
|
val = False
|
|
|
|
return {query : val}
|
|
|
|
#Checks if node has parents. If not, it is a root
|
|
def is_root(node, BN):
|
|
return (BN[node]["parents"]) == []
|
|
|
|
#Return a list of the root nodes
|
|
def get_root(BN):
|
|
roots = []
|
|
for node in range(len(BN)):
|
|
if ((BN[str(node)]["parents"]) == []):
|
|
roots.append(str(node))
|
|
return roots
|
|
|
|
def is_ancestor(query, ancestor, BN):
|
|
parsed = []
|
|
parents = get_parents(query, BN)
|
|
if ancestor in parents:
|
|
return True
|
|
else:
|
|
for parent in parents:
|
|
if is_root(str(parent), BN):
|
|
parsed.append(parent)
|
|
continue
|
|
else:
|
|
gparents = get_parents(parent, BN)
|
|
if ancestor in gparents:
|
|
return True
|
|
for gparent in gparents:
|
|
if gparent not in parsed:
|
|
res = is_ancestor(query, gparent, BN)
|
|
return res
|
|
|
|
return False
|
|
|
|
def check_upstream(node, e, bayes_net):
|
|
downstream = []
|
|
roots = get_root(bayes_net)
|
|
curr_list = []
|
|
if node in roots:
|
|
return False
|
|
else:
|
|
for ev in e:
|
|
if is_ancestor(ev, node, bayes_net):
|
|
downstream.append(ev)
|
|
|
|
if (len(downstream) > (len(e)/2)):
|
|
return True
|
|
else:
|
|
return False
|
|
#Get parents of a node
|
|
def get_parents(node, BN):
|
|
return BN[str(node)]["parents"]
|
|
|
|
"""NOTES"""
|
|
#(bayes_json["x"]): the information about node x (an int)
|
|
#(bayes_json["x"]["parents"] the information about node x's parents
|
|
#bayes_json["x"]["prob"][0][0] returns the first set of truth table (0, 0), where [1][0] is the second (0,1)
|
|
#bayes_json["x"]["prob"][parent][1] returns probability of the set evidence variable
|
|
#E is a dict in the form of {"Node" : Value}
|
|
|
|
#Compute the estimate of P(X|e), where X is the query variable, and e is the observed value for variables E
|
|
def likelihood_weighting(X, e, bayes_net, num_samples, MH=0):
|
|
W = {}
|
|
|
|
for i in range(num_samples):
|
|
w = 1
|
|
#Holds all the info on the samples. EX: ~b, ~e, a, ~j, m
|
|
samples = {}
|
|
|
|
#Get all the roots to save traversion costs
|
|
root_nodes = get_root(bayes_net)
|
|
#Go through all the roots to get probabilities
|
|
for root in root_nodes:
|
|
#If the root is an evidence variables
|
|
if root in e and root not in samples:
|
|
#Just set the value to the observed value
|
|
samples[root] = e[root]
|
|
#Adjust the weight accordingly
|
|
w = w * bayes_net[root]["prob"][0][1]
|
|
else:
|
|
#Otherwise, sample randomly
|
|
if root not in samples:
|
|
rand_prob = random.random()
|
|
if rand_prob >= bayes_net[root]["prob"][0][1]:
|
|
samples[root] = True
|
|
else:
|
|
samples[root] = False
|
|
|
|
#Now go through the BN for non-root nodes
|
|
for node in bayes_net:
|
|
if node not in samples:
|
|
#Get the probability, updated sample dict, and weight
|
|
samples, prob, w = get_probability(str(node), samples, e, bayes_net, w)
|
|
|
|
#We now need to write to W
|
|
#If this sample is already in W, don't add a new sample - only adjust the weight
|
|
written = False
|
|
for tmp in range(len(W)):
|
|
#If sample is in W
|
|
if samples in W[tmp].values():
|
|
#Pull the weight that's associated with the sample
|
|
key = list(W[tmp].items())[0][0]
|
|
#Add the new weight to the existing weight
|
|
new_key = key + w
|
|
#Store it all back into W
|
|
W[tmp] = {new_key : samples}
|
|
#Make note that we've already written to W in this loop, so we don't write it again
|
|
written = True
|
|
|
|
#If the sample wasn't already in W, put it in there now
|
|
if not written:
|
|
W[len(W)] = {w : samples}
|
|
|
|
#use for MH alg
|
|
if(MH is not 0):
|
|
return W
|
|
|
|
prob = compute_prob(X, e, W)
|
|
|
|
return prob
|
|
|
|
#Return the probability of a node and the value dict, given the current evidence and fixed evidence
|
|
#Uses recursion to pull probabilites and values down through the network
|
|
def get_probability(node, samples, ev, bayes_net, w, G=0):
|
|
parents = get_parents(node, bayes_net)
|
|
for parent in parents:
|
|
#If we already know the value of the parent, no need to reobtain
|
|
if str(parent) in samples:
|
|
continue
|
|
#If we don't know the value, then we need to get it
|
|
else:
|
|
gparents = get_parents(parent, bayes_net)
|
|
#If we have all of parent's parents, then we can just get the probability
|
|
if all(eles in samples for eles in gparents):
|
|
samples, prob, w = translate_ev(gparents, parent, ev, samples, bayes_net, w)
|
|
|
|
#Otherwise, we need to get values for the ancestor nodes - use recursion
|
|
else:
|
|
for gparent in gparents:
|
|
if gparent not in samples:
|
|
get_probability(gparent, samples, ev, bayes_net, w)
|
|
samples, prob, w = translate_ev(gparents, parent, ev, samples, bayes_net, w)
|
|
|
|
#Now that we have all the parents' values, we can get the node value, probability, and update samples
|
|
samples, prob, w = translate_ev(parents, node, ev, samples, bayes_net, w, G)
|
|
return samples, prob, w
|
|
|
|
#Given a node and its parents, determine the node's value and it's probability
|
|
def translate_ev(parents, node, ev, samples, bayes_net, w, G=0):
|
|
#If G=1, it means we already set the state space to specific values.
|
|
#Meaning, we only need to compute the prob of that happening, NOT set values
|
|
#Call a different function instead
|
|
|
|
#Sort in ascending order
|
|
parents.sort()
|
|
node = str(node)
|
|
value_list = []
|
|
for parent in parents:
|
|
value = samples[str(parent)]
|
|
value_list.append(value)
|
|
|
|
#See if this is an evidence node
|
|
if node in ev:
|
|
samples[node] = ev[node]
|
|
get_weight = True
|
|
else:
|
|
get_weight = False
|
|
#The truth table has 2^parents entries
|
|
for i in range(2**len(parents)):
|
|
#If the truth table matches the value combination we have
|
|
if bayes_net[str(node)]["prob"][i][0] == value_list:
|
|
#Sample randomly
|
|
rand_prob = random.random()
|
|
table_prob = bayes_net[str(node)]["prob"][i][1]
|
|
if rand_prob >= table_prob:
|
|
samples[str(node)] = True
|
|
else:
|
|
samples[str(node)] = False
|
|
table_prob = 1-table_prob
|
|
if(get_weight):
|
|
w = w * table_prob
|
|
|
|
return samples, table_prob, w
|
|
|
|
def get_children(node, BN):
|
|
children = []
|
|
for x in range(len(BN)):
|
|
if node in BN[str(x)]["parents"]:
|
|
children.append(str(x))
|
|
return children
|
|
|
|
def product(nums):
|
|
result = 1
|
|
for num in nums:
|
|
result *= num
|
|
return result
|
|
|
|
#Given a node and state_space values, get prob from CPT
|
|
def get_cpt(node, samples, bayes_net):
|
|
parents = bayes_net[str(node)]["parents"]
|
|
parents.sort()
|
|
value_list = []
|
|
|
|
for parent in parents:
|
|
value = samples[str(parent)]
|
|
value_list.append(value)
|
|
|
|
for i in range(2**len(parents)):
|
|
#If the truth table matches the value combination we have
|
|
if bayes_net[str(node)]["prob"][i][0] == value_list:
|
|
return bayes_net[str(node)]["prob"][i][1]
|
|
|
|
|
|
def mb(x_node, e, bayes_net, state_space):
|
|
#x_node = (list(X.items())[0][0])
|
|
#print("NODE IS", x_node)
|
|
state_space[x_node] = True
|
|
#print("True SS")
|
|
#print(state_space)
|
|
x_true = get_cpt(x_node, state_space, bayes_net) * product(get_cpt(child, state_space, bayes_net) for child in get_children(x_node, bayes_net))
|
|
#print(x_true)
|
|
state_space[x_node] = False
|
|
#print("False SS")
|
|
#print(state_space)
|
|
x_false = get_cpt(x_node, state_space, bayes_net) * product(get_cpt(child, state_space, bayes_net) for child in get_children(x_node, bayes_net))
|
|
#print(x_false)
|
|
#print()
|
|
return x_true, x_false
|
|
|
|
#Given a query, the evidence, and a dict of samples, compute the prob of the query
|
|
def compute_prob(X, e, W):
|
|
#Combine the query and ev dicts for easier computation
|
|
combined_dict = {**X, **e}
|
|
#Initialize the probability of the query
|
|
query_prob = 0
|
|
ev_prob = 0
|
|
#print(W)
|
|
#Go through all the samples
|
|
for i in range(len(W)):
|
|
#Easy way to find all the instances where query var and ev match the samples - use subsets
|
|
#Convert W[index].values() to a list, since Python 3 returns a "view" of the dictionary with .values()
|
|
#Then remove the list aspect through [0], and obtain the values from the nested dict through .items()
|
|
if combined_dict.items() <= list(W[i].values())[0].items():
|
|
#Get the weight
|
|
query_prob+=list(W[i].items())[0][0]
|
|
#See if evidence is a subset
|
|
if e.items() <= list(W[i].values())[0].items():
|
|
ev_prob+=list(W[i].items())[0][0]
|
|
|
|
if(ev_prob==0):
|
|
return 0
|
|
else:
|
|
return (query_prob/ev_prob)
|
|
|
|
def gibbs_sampling(X, e, bayes_net, num_samples):
|
|
#State of the network, init'd with the query and ev
|
|
#x = {**X, **e}
|
|
x = {**e}
|
|
|
|
#Counts for each value of X
|
|
C = {}
|
|
for j in range(len(bayes_net)):
|
|
C[str(j)] = 0
|
|
#Get a list of non-ev variables to make things easier
|
|
Z = []
|
|
for node in bayes_net:
|
|
#if node not in e and node not in X:
|
|
if node not in e:
|
|
Z.append(node)
|
|
|
|
#Initialize the rest of the network randomly
|
|
for z in Z:
|
|
rand_prob = random.random()
|
|
if rand_prob >=0.5:
|
|
x[z] = True
|
|
else:
|
|
x[z] = False
|
|
|
|
for k in range(num_samples):
|
|
zi = random.choice(Z)
|
|
#Reuse the get_prob function, even though we don't care about maintaining a samples list or weight
|
|
#samples, prob, w = get_probability(str(zi), {}, e, bayes_net, 0)
|
|
z_true, z_false = mb(str(zi), e, bayes_net, x)
|
|
#Gen a random number to determine the value based on probability
|
|
value_prob = random.random()
|
|
if value_prob >= z_true:
|
|
x[zi] = True
|
|
C[zi]+=1
|
|
|
|
else:
|
|
x[zi] = False
|
|
|
|
for j in range(len(bayes_net)):
|
|
C[str(j)]/=num_samples
|
|
|
|
query_var = (list(X.items())[0][0])
|
|
query_val = (list(X.values())[0])
|
|
|
|
#print(C)
|
|
if(query_val):
|
|
GS_Prob = C[str(query_var)]
|
|
else:
|
|
GS_Prob = 1.00000 - C[str(query_var)]
|
|
|
|
return GS_Prob, x
|
|
|
|
#Function for using log-probabilities for preventing the underflow from MH as advised by Dr. Sen
|
|
def compute_log(prob):
|
|
return -0.5 * np.sum(prob ** 2)
|
|
|
|
def acceptance(xprime, x, compute_log):
|
|
return min(1, np.exp(compute_log(xprime) - compute_log(x)))
|
|
|
|
def metropolis_hastings(X, e, bayes_net, num_samples, alpha):
|
|
state_space = {**e}
|
|
Z = []
|
|
|
|
#Counts for each value of X
|
|
W = {}
|
|
C = {}
|
|
for j in range(len(bayes_net)):
|
|
C[str(j)] = 0
|
|
|
|
for node in bayes_net:
|
|
#if node not in e and node not in X:
|
|
if node not in e:
|
|
Z.append(node)
|
|
|
|
#Initialize the rest of the network randomly
|
|
for z in Z:
|
|
rand_prob = random.random()
|
|
if rand_prob >=0.5:
|
|
state_space[z] = True
|
|
else:
|
|
state_space[z] = False
|
|
|
|
#3 runs: 95% to run Gibbs for x', 5% for weighted sample. Then 85/15, 75/25
|
|
#100% acceptance
|
|
for num in range(num_samples):
|
|
proposal_choice = random.random()
|
|
|
|
#Weighted-Sample
|
|
if(proposal_choice <= (1-alpha)):
|
|
tmp = likelihood_weighting(X, e, bayes_net, 1, 1)
|
|
sample = list(tmp[0].values())[0]
|
|
|
|
#If this sample is already in W, don't add a new sample - only adjust the weight
|
|
written = False
|
|
for k in range(len(W)):
|
|
#If sample is in W
|
|
if sample in W[k].values():
|
|
#Pull the weight that's associated with the sample
|
|
key = list(W[k].items())[0][0]
|
|
#Increment count
|
|
new_key = key + 1
|
|
#Store it all back into W
|
|
W[k] = {new_key : sample}
|
|
#Make note that we've already written to W in this loop, so we don't write it again
|
|
written = True
|
|
|
|
#If the sample wasn't already in W, put it in there now
|
|
if not written:
|
|
W[len(W)] = {1 : sample}
|
|
|
|
#for key, value in W.items():
|
|
# print(key, ' : ', value)
|
|
#zi = random.choice(Z)
|
|
#state_space, prob, w = get_probability(zi, state_space, bayes_net, 0)
|
|
#Gibbs
|
|
else:
|
|
zi = random.choice(Z)
|
|
#Reuse the get_prob function, even though we don't care about maintaining a samples list or weight
|
|
#samples, prob, w = get_probability(str(zi), {}, e, bayes_net, 0)
|
|
z_true, z_false = mb(str(zi), e, bayes_net, state_space)
|
|
#Gen a random number to determine the value based on probability
|
|
value_prob = random.random()
|
|
if value_prob >= z_true:
|
|
state_space[zi] = True
|
|
C[zi]+=1
|
|
|
|
else:
|
|
state_space[zi] = False
|
|
|
|
#If this sample is already in W, don't add a new sample - only adjust the weight
|
|
written = False
|
|
for tmp in range(len(W)):
|
|
#If sample is in W
|
|
if state_space in W[tmp].values():
|
|
#Pull the weight that's associated with the sample
|
|
key = list(W[tmp].items())[0][0]
|
|
#Increment count
|
|
new_key = key + 1
|
|
#Store it all back into W
|
|
W[tmp] = {new_key : state_space}
|
|
#Make note that we've already written to W in this loop, so we don't write it again
|
|
written = True
|
|
|
|
#If the sample wasn't already in W, put it in there now
|
|
if not written:
|
|
W[len(W)] = {1 : state_space}
|
|
|
|
#for key, value in W.items():
|
|
# print(key, ' : ', value)
|
|
|
|
#Combine the query and ev dicts for easier computation
|
|
combined_dict = {**X, **e}
|
|
#Initialize the probability of the query
|
|
query_prob = 0
|
|
#Go through all the samples
|
|
for i in range(len(W)):
|
|
#Easy way to find all the instances where query var and ev match the samples - use subsets
|
|
#Convert W[index].values() to a list, since Python 3 returns a "view" of the dictionary with .values()
|
|
#Then remove the list aspect through [0], and obtain the values from the nested dict through .items()
|
|
if combined_dict.items() <= list(W[i].values())[0].items():
|
|
#Get the weight
|
|
query_prob+=list(W[i].items())[0][0]
|
|
|
|
return (query_prob/num_samples)
|
|
#MH_prob = compute_prob(X, e, W)
|
|
#return MH_prob
|
|
|
|
if __name__ == '__main__':
|
|
main() |