作业2
实现能处理连续属性的贝叶斯网络。
思路
- 怎么自动判断该属性是离散还是连续:计算该属性的不同值有多少个,超过10个就认为是连续,否则是离散的。
- 离散的:统计该类、该属性、该值的各个数量,计算概率,为后续计算做准备
- 连续的:统计该类、该属性的平均值、方差、标准差,为后续计算做准备
- 预测:
- 离散的:answer=whichjinmax(P(Cj)∏P(wi∣Cj))answer=which \ \ j \ \ in \ \ max(P(C_j)\prod P(w_i|C_j))answer=which j in max(P(Cj)∏P(wi∣Cj))
- 连续的:answer=whichjinmax(12πσje−(x−uj)22σj2)answer=which \ \ j \ \ in \ \ max(\frac{1}{\sqrt{2π}σ_j} e^{-\frac{(x-u_j)^2}{2σ_j^2}})answer=which j in max(2πσj1e−2σj2(x−uj)2)
'''
6. 实现了训练、预测、输出精确度的功能
7. 实现自动判断特征是离散 还是连续,分别对离散、连续特征进行计算
8. 离散连续的判断依据:整个训练集中 该特征的特征值的数量 是否大于10?
'''
import numpy as np
import pandas as pd
from sklearn.datasets import make_blobs
from sklearn.model_selection import train_test_splitclass naive_bayes():def __init__(self):self.pi=3.1415926passdef fit(self,x,y):self.sign_feature=[]#标记特征为离散 false 连续truefor k in range(x.shape[1]):#特征kcount=x[x.columns[k]].unique()#计算特征k有多少不同的值if len(count)<10:#如果小于10个 处理为离散值 否则处理为连续值self.sign_feature.append('false')#标记该特征为离散else:self.sign_feature.append('true')#标记该特征为连续self.classes=y[y.columns[0]].unique()#统计y不同的值 self.x_avg={}#计算类j特征k的各个数值平均self.s_2={}#方差self.s={}#标准差self.class_condition_prob={}#类条件概率初始化为字典for j in self.classes:#遍历类jfor k in range(x.shape[1]):#特征kif self.sign_feature[k]:#如果是连续值x_avg=0x_x=[]for i in range(x.shape[0]):#遍历xif y[y.columns[0]][i]==j :x_avg+=x[x.columns[k]][i]x_x.append(x[x.columns[k]][i])x_avg/=len(x_x)s_2=0for i in range(len(x_x)):s_2+=(x_x[i]-x_avg)*(x_x[i]-x_avg)s_2/=len(x_x)s=np.sqrt(s_2)self.x_avg[(j,k)]=x_avg#平均值self.s_2[(j,k)]=s_2#方差self.s[(j,k)]=s#标准差else:class_count=y[y.columns[0]].value_counts()#即每个类中样本数 类数*属于该类的样本数#类先验概率self.class_prior=class_count/len(y) #属于该类的样本数/总样本数p_x_y=x[(y==j).values][x.columns[k]].value_counts()for i in p_x_y.index:self.class_condition_prob[(j,k,i)] = p_x_y[i]/class_count[j]def predict(self,x):ans=[]for i in range(len(x)):res=[]for j in self.classes:#遍历类 p_x_y=1for k in range(x.shape[1]):#遍历特征值if self.sign_feature[k]:#如果是连续值temp0=1/(np.sqrt(2*self.pi)*self.s[tuple([j]+[k])])temp1=-(self.x_avg[tuple([j]+[k])]-x[x.columns[k]][i])*(self.x_avg[tuple([j]+[k])]-x[x.columns[k]][i])temp2=temp1/(2*self.s_2[tuple([j]+[k])])p_x_y*=temp0*np.exp(temp2)else:p_y=self.class_prior[j]#类先验概率 p_x_y*=p_ytemp=x[x.columns[k]][i]p_x_y*=self.class_condition_prob[tuple([j]+[k]+[temp])]res.append(p_x_y)#获得每个类下的概率ans.append( self.classes[np.argmax(res)] )# np.argmax(res) 返回最大值的索引return ansdef accuracy(self,y_hat,y):#计算准确率ans=0for i in range(len(y_hat)):if y_hat[i]==y[i]:ans+=1print(y_hat==y)#查看错误了哪个? 只错了1个return ans/len(y)#构造数据集
x,y=make_blobs(n_samples=500, #样本数n_features=2, #特征值数centers=2, #类标签数cluster_std=[1.0,3.0], #每个类别的方差shuffle=True, #默认为Truerandom_state=8)feature1=np.random.randint(5,size=x.shape[0])x=np.column_stack((feature1,x))#增加离散的特征值x_train,x_test,y_train,y_test=train_test_split(x,y,random_state=8)x_list=['x1','x2','x3']
y_list=['y']
x_train=pd.DataFrame(x_train,columns=x_list)
x_test=pd.DataFrame(x_test,columns=x_list)
y_train=pd.DataFrame(y_train,columns=y_list)#使用列表形式model=naive_bayes()
model.fit(x_train,y_train)#训练y_hat=model.predict(x_test)#预测
print(y_hat)#输出预测结果score=model.accuracy(y_hat,y_test)
print(score)#输出准确率