隐马尔可夫模型的学习问题
HMM 学习问题
HMM 学习问题:
已知观测序列O=(o_1,o_2,\cdots,o_T),估计模型\lambda=(A, B, \pi)参数,使得在该模型下观测序列概率P(O|\lambda)最大。
一、监督学习方法
假设已给训练数据包含 S个长度相同的观测序列和对应的状态序列 \{ (O_1,I_1), (O_2,I_2), \cdots, (O_S,I_S) \},那么可以利用极大似然估计来估计隐马尔科夫模型的参数。
- 转移概率 a_{ij}的估计:
设样本中时刻 t 处于状态 i 时刻 t+1 转移到状态 j 的频数为 A_{ij},那么状态转移概率 a_{ij} 的估计是
- 观测概率 b_j(k)的估计
设样本中状态为 j 并观测为 k 的频数为 B_{jk},那么状态为 j 观测为 k 的概率 b_j(k)的估计是
- 初始状态概率 \pi_i 的估计
\hat \pi_i为 S个样本中初始状态为 q_i的频率。
R示例:HMM参数监督学习
# 模拟参数A,B, Pi
hmm_sim_param <- function(Q,V){
N <- length(Q)
M <- length(V)
params <- list(
A = matrix(nrow = N,ncol = N),
B = matrix(nrow = N,ncol = M ),
Pi = c(),
N = N, M=M)
for (i in 1:N) {
temp <- runif(N,0,100)
Sum <- sum(temp)
for (j in 1:N) {
params$A[i,j] = temp[j]/Sum # 每行参数和为1
}
}
for (i in 1:N) {
temp <- runif(M,0,100)
Sum <- sum(temp)
for (j in 1:M) {
params$B[i,j] = temp[j]/Sum
}
}
temp <- runif(N,0,100)
Sum <- sum(temp)
params$Pi = temp/Sum
return(params)
}
# 模拟实验数据
hmm_sim_data <- function(n,len){
gen_data <- function(len = len){
#随机抽样
temp_I <- sample(Q,1,replace = T,prob = params$Pi)
I <- c(temp_I)
temp_O <- sample(V,1,replace = T,prob = params$B[temp_I,])
O <- c(temp_O)
for (i in 2:len) {
temp_I <- sample(Q,1,replace = T,prob = params$A[temp_I,])
I <- c(I,temp_I)
}
for (i in 2:len) {
temp_O <- sample(V,1,replace = T,prob = params$B[I[i],])
O <- c(O,temp_O)
}
return(list(I = I,O = O))
}
sim_data <- list()
for (x in 1:n) {
sim_data[[x]] <- gen_data(len = len)
}
return(sim_data)
}
# 监督学习法
# A
estimate_aij <- function(t,i,j){
t_i <- sapply(sim_data, function(x){return(x[["I"]][t]==i)})
next_j <- sapply(sim_data, function(x){return(x[["I"]][t+1]==j)})
i_j <- sum(t_i & next_j)
sum_ij <- 0
for (q in Q) {
next_q <- sapply(sim_data, function(x){return(x[["I"]][t+1]==q)})
sum_ij <- sum_ij + sum(t_i & next_q)
}
a_ij <- i_j/sum_ij
return(a_ij)
}
estimate_A <- function(t=2){
est_A <- matrix(nrow = params$N,ncol = params$N)
for (i in 1:params$N) {
for (j in 1:params$N) {
est_A[i,j] <- estimate_aij(t=t,i=i,j=j)
}
}
print("A实际参数:")
print(params$A)
print("A估计参数:")
print(est_A)
return(est_A)
}
# B
estimate_bjk <- function(j,k){
j_k <- sapply(sim_data, function(x){
return(sum(x$I == j & x$O == V[k]))
})
b_jk <- sum(j_k)
sum_jk <- 0
for (k in 1:params$M) {
tmp_j_k <- sapply(sim_data, function(x){
return(sum(x$I == j & x$O == V[k]))
})
sum_jk <- sum_jk+sum(tmp_j_k)
}
return(b_jk/sum_jk)
}
estimate_B <- function(){
est_B <- matrix(nrow = params$N,ncol = params$M)
for (j in 1:params$N) {
for (k in 1:params$M) {
est_B[j,k] <- estimate_bjk(j=j,k=k)
}
}
print("B实际参数:")
print(params$B)
print("B估计参数:")
print(est_B)
return(est_B)
}
# Pi
estimate_Pi <- function(){
t_1 <- sapply(sim_data, function(x){
x[['I']][1]
})
freq_t_1 <- table(t_1 )
est_Pi <- as.vector(prop.table(freq_t_1 ))
print("Pi实际参数:")
print(params$Pi)
print("Pi估计参数:")
print(est_Pi)
return(est_Pi)
}
# main
set.seed(1)
Q <- c(1,2,3) # 可能状态集合
V <- c("R","W") # 可能观测集合
params <- hmm_sim_param(Q = Q,V = V)
sim_data <- hmm_sim_data(params,n=1000,len = 10)
est_A <- estimate_A()
est_B <- estimate_B()
est_Pi <- estimate_Pi()
二、无监督学习(Baum-Welch 算法)
无监督学习即用极大似然估计的方法估计参数。Baum-Welch算法,也就是EM算法可以高效地对隐马尔可夫模型进行训练。它是一种非监督学习算法。
假设给定数据只包含了 S 个长度为 T 的观测序列 \{ O_1, O_2, \cdots, O_S \}而没有对应的状态序列,目标是学习隐马尔科夫模型 \lambda = (A,B,\pi)的参数。我们将观测是序列数据看作观测数据 O,状态序列数据看作不可观测的隐数据 I,那么 HMM 事实上是一个含隐变量的概率模型。
其参数学习可以由 EM 算法实现。
Baum-Welch 模型参数估计
输入:观测数据O=(o_1,o_2,\cdots,o_T)
输出:HMM模型参数
(1)初始化。对n=0, 取a_{ij}^{(0)},b_{j}(k)^{(0)},\pi_i^{(0)}, 得到模型\lambda^{(0)} =(A^{(0)},B^{(0)},\pi^{(0)})
(2)递推。对n=1,2,\cdots
右端各值按观测O=(o_1,o_2,\cdots,o_T)和模型\lambda^{(n)} =(A^{(n)},B^{(n)},\pi^{(n)})计算。
(3)终止。得到模型参数\lambda^{(n+1)}
其中,利用前向概率和后向概率, \gamma_t(i), \xi_t(i,j)由下公式给出。
- 给定模型 \lambda 和观测 O,在时刻 t 处于状态q_i 的概率,由前向概率\alpha_t(i)和后向概率\beta_t(i)定义,记:
- 给定模型 \lambda 和观测 O,在时刻 t 处于状态 q_i,且在时刻 t+1处于状态 q_j 的概率,记:
\gamma_t(i)和\xi_t(i,j)对各个时刻 t 求和,可以得到一些有用的期望值。
- 在观测O下状态i出现的期望值:
\sum_{t=1}^T\gamma_t(i)
- 在观测O下状态i转移的期望值:
\sum_{t=1}^{T-1}\gamma_t(i)
- 在观测O下由状态i转移到状态j的期望值:
\sum_{t=1}^{T-1}\xi_t(i,j)
R示例:Baum-Welch 模型估计HMM参数
gamma <- function(alpha,beta,i,t,N){
numerator <- alpha[[t]][i]*beta[[t]][i]
denominator <- 0
for (j in 1:N) {
denominator <- denominator+ alpha[[t]][j]*beta[[t]][j]
}
return(numerator/denominator)
}
ksi <- function(A,B,alpha,beta,i,j,t,N){
numerator <- alpha[[t]][i]*A[i,j]*B[j,O[t+1]]*beta[[t+1]][j]
denominator <- 0
for (i in 1:N) {
for (j in 1:N) {
denominator <- denominator+alpha[[t]][i]*A[i,j]*B[j,O[t+1]]*beta[[t+1]][j]
}
}
return(numerator/denominator)
}
forward <- function(A,B,Pi,T_){
alpha <- list()
for (t in 1:T_) {
if (t == 1) {
alpha[[t]] <- Pi * B[,O[t]]
} else {
alpha[[t]] <- colSums(alpha[[t-1]] * A) * B[,O[t]]
}
}
return(alpha)
}
backward <- function(A,B,N,T_){
beta <- list()
for (t in T_:1) {
if (t == T_) {
beta[[t]] <- rep(1,N)
} else {
beta[[t]] <- colSums(t(A) * B[,O[t+1]]*beta[[t+1]])
}
}
return(beta)
}
hmm_train <- function(params,O,Q,V){
A <- params$A
B <- params$B
Pi <- params$Pi
# N状态数 M观测数 T_时刻数
N <- length(Q)
M <- length(V)
T_ <- length(O)
tmp_A <- matrix(0,nrow = N,ncol = N)
tmp_B <- matrix(0,nrow = N,ncol = M,dimnames = list(Q,V))
tmp_Pi <- rep(0,N)
alpha <- forward(A,B,Pi,T_)
beta <- backward(A,B,N,T_)
# a_ij
for (i in 1:N) {
for (j in 1:N) {
numerator <- 0
denominator <- 0
for (t in 1:(T_-1)) {
numerator <- numerator+ksi(A,B,alpha,beta,i,j,t,N)
denominator <- denominator+gamma(alpha,beta,i,t,N)
}
tmp_A[i,j] <- numerator/denominator
}
}
#b_jk
for (j in 1:N) {
for (k in 1:M) {
numerator <- 0
denominator <- 0
for (t in 1:T_) {
if (V[k] == O[t]) {
numerator <- numerator+gamma(alpha,beta,j,t,N)
}
denominator <- denominator+gamma(alpha,beta,j,t,N)
}
tmp_B[j,k] <- numerator/denominator
}
}
#pi
for (i in 1:N) {
tmp_Pi[i] <- gamma(alpha,beta,i,1,N)
}
params$A <- tmp_A
params$B <- tmp_B
params$Pi <- tmp_Pi
return(params)
}
hmm_run <- function(params,O,Q,V,maxIter=100,criterion=10e-6){
for (n in 1:maxIter) {
old_params <- params
params <- hmm_train(params,O,Q,V)
delta_A <- max(abs(params$A-old_params$A))
delta_B <- max(abs(params$B-old_params$B))
delta_Pi <- max(abs(params$Pi-old_params$Pi))
tmp_delta <- c(delta_A,delta_B,delta_Pi)
cat("Iter: ", n,"\n",
"delta: ", tmp_delta,"\n")
if ( all(tmp_delta < criterion) | (n == maxIter)) {
print(params)
cat("delta: ", tmp_delta,"\n",
"Iter: ", n,"\n")
return(params)
}
}
}
random_init <- function(Q,V){
N <- length(Q)
M <- length(V)
params <- list(
A = matrix(nrow = N,ncol = N),
B = matrix(nrow = N,ncol = M,dimnames = list(Q,V)),
Pi = c()
)
for (i in 1:N) {
temp <- runif(N,0,100)
Sum <- sum(temp)
for (j in 1:N) {
params$A[i,j] = temp[j]/Sum
}
}
for (i in 1:N) {
temp <- runif(M,0,100)
Sum <- sum(temp)
for (j in 1:M) {
params$B[i,j] = temp[j]/Sum
}
}
temp <- runif(N,0,100)
Sum <- sum(temp)
params$Pi = temp/Sum
return(params)
}
set.seed(1)
Q <- c(1,2,3)
V <- c("R","W")
O <- c("R","W","R")
params <- random_init(Q,V)
params <- hmm_run(params = params,O,Q,V,maxIter=3)
R示例:HMM生成三角波图形
hmm_sim_data <- function(params,n,len){
gen_data <- function(len = len){
temp_I <- sample(Q,1,replace = T,prob = params$Pi)
I <- c(temp_I)
temp_O <- sample(V,1,replace = T,prob = params$B[temp_I,])
O <- c(temp_O)
for (i in 2:len) {
temp_I <- sample(Q,1,replace = T,prob = params$A[temp_I,])
I <- c(I,temp_I)
}
for (i in 2:len) {
temp_O <- sample(V,1,replace = T,prob = params$B[I[i],])
O <- c(O,temp_O)
}
return(list(I = I,O = O))
}
sim_data <- list()
for (x in 1:n) {
sim_data[[x]] <- gen_data(len = len)
}
return(sim_data)
}
set.seed(1)
Q <- 1:10
V <- 1:4
# 模拟三角波
O <- c(1,2,3,4,3,2,1,2,3,4,3,2,1,2,3,4,3,2,1,2)
par(mfrow=c(2,1))
plot(1:length(O),O,type = 'l',main = "实际图形")
params <- random_init(Q,V)
params <- hmm_run(params = params,O,Q,V,maxIter=100,criterion = 10e-6)
gen <- hmm_sim_data(params,n = 1,len = 100)
plot(1:length(gen[[1]]$O),gen[[1]]$O,type = "l",main = "HMM生成图形")