隐马尔可夫模型的学习问题

5

HMM 学习问题

HMM 学习问题:
已知观测序列​O=(o_1,o_2,\cdots,o_T),估计模型​\lambda=(A, B, \pi)参数,使得在该模型下观测序列概率​P(O|\lambda)最大。

一、监督学习方法

假设已给训练数据包含 ​S个长度相同的观测序列和对应的状态序列 ​\{ (O_1,I_1), (O_2,I_2), \cdots, (O_S,I_S) \},那么可以利用极大似然估计来估计隐马尔科夫模型的参数。

  1. 转移概率 ​a_{ij}的估计:
    设样本中时刻 ​t 处于状态 ​i 时刻 ​t+1 转移到状态 ​j 的频数为 ​A_{ij},那么状态转移概率 ​a_{ij} 的估计是
\hat a_{ij} = \frac{A_{ij}}{\sum_{j=1}^{N}A_{ij}}, \ i=1,2,\cdots,N; \ j = 1,2,\cdots,N
  1. 观测概率 ​b_j(k)的估计
    设样本中状态为 ​j 并观测为 ​k 的频数为 ​B_{jk},那么状态为 ​j 观测为 ​k 的概率 ​b_j(k)的估计是
\hat b_j(k) = \frac{B_{jk}}{\sum_{k=1}^{M}B_{jk}}, \ j=1,2,\cdots,N; \ k = 1,2,\cdots, M
  1. 初始状态概率 ​\pi_i 的估计
    ​\hat \pi_i​S个样本中初始状态为 ​q_i的频率。

R示例:HMM参数监督学习

# 模拟参数A,B, Pi
hmm_sim_param <- function(Q,V){
  N <- length(Q) 
  M <- length(V)
  params <- list(
    A = matrix(nrow = N,ncol = N),
    B = matrix(nrow = N,ncol = M ),
    Pi = c(),
    N = N, M=M)
  
  for (i in 1:N) {  
    temp <- runif(N,0,100)
    Sum <- sum(temp)
    for (j in 1:N) {
      params$A[i,j] = temp[j]/Sum  # 每行参数和为1
    }
  }
  
  for (i in 1:N) {
    temp <- runif(M,0,100)
    Sum <- sum(temp)
    for (j in 1:M) {
      params$B[i,j] = temp[j]/Sum
    }
  }
  
  temp <- runif(N,0,100)
  Sum <- sum(temp)
  params$Pi = temp/Sum
  
  return(params)
}
# 模拟实验数据
hmm_sim_data <- function(n,len){
  gen_data <- function(len = len){
	   #随机抽样
    temp_I <- sample(Q,1,replace = T,prob = params$Pi)
    I <- c(temp_I)
    temp_O <- sample(V,1,replace = T,prob = params$B[temp_I,])
    O <- c(temp_O)
  
    for (i in 2:len) {
      temp_I  <- sample(Q,1,replace = T,prob = params$A[temp_I,])
      I <- c(I,temp_I)
    }
  
    for (i in 2:len) {
      temp_O <- sample(V,1,replace = T,prob = params$B[I[i],])
      O <- c(O,temp_O)
    }
    return(list(I = I,O = O))
  }
  
  sim_data <- list()
  for (x in 1:n) {
    sim_data[[x]] <- gen_data(len = len)
  }
  return(sim_data)
}

# 监督学习法
# A
estimate_aij <- function(t,i,j){
  
  t_i <- sapply(sim_data, function(x){return(x[["I"]][t]==i)})
  next_j <- sapply(sim_data, function(x){return(x[["I"]][t+1]==j)})
  i_j <- sum(t_i & next_j)
  
  sum_ij <- 0
  for (q in Q) {
    next_q <- sapply(sim_data, function(x){return(x[["I"]][t+1]==q)})
    sum_ij <- sum_ij + sum(t_i & next_q) 
  }
  
  a_ij <- i_j/sum_ij
  
  return(a_ij)
}
estimate_A <- function(t=2){
  est_A <-  matrix(nrow = params$N,ncol = params$N)
  
  for (i in 1:params$N) {
    for (j in 1:params$N) {
      est_A[i,j] <- estimate_aij(t=t,i=i,j=j)
    }
  }
  
  print("A实际参数:")
  print(params$A)
  print("A估计参数:")
  print(est_A)
  return(est_A)
}
# B
estimate_bjk <- function(j,k){
  
  j_k <- sapply(sim_data, function(x){
    return(sum(x$I == j & x$O == V[k]))
  })
  b_jk <- sum(j_k)
  
  sum_jk <- 0
  for (k in 1:params$M) {
    tmp_j_k <- sapply(sim_data, function(x){
      return(sum(x$I == j & x$O == V[k]))
    })
    sum_jk <- sum_jk+sum(tmp_j_k)
  }
  
  return(b_jk/sum_jk)
}
estimate_B <- function(){
  est_B <-  matrix(nrow = params$N,ncol = params$M)
  
  for (j in 1:params$N) {
    for (k in 1:params$M) {
      est_B[j,k] <- estimate_bjk(j=j,k=k)
    }
  }
  
  print("B实际参数:")
  print(params$B)
  print("B估计参数:")
  print(est_B)
  return(est_B)
}
# Pi
estimate_Pi <- function(){
  t_1 <- sapply(sim_data, function(x){
    x[['I']][1]
  })
  freq_t_1 <- table(t_1 )
  est_Pi <- as.vector(prop.table(freq_t_1 ))
  
  print("Pi实际参数:")
  print(params$Pi)
  print("Pi估计参数:")
  print(est_Pi)
  return(est_Pi)
}

# main
set.seed(1)
Q <- c(1,2,3) # 可能状态集合
V <- c("R","W") # 可能观测集合

params <- hmm_sim_param(Q = Q,V = V)
sim_data <- hmm_sim_data(params,n=1000,len = 10)

est_A <- estimate_A()
est_B <- estimate_B()
est_Pi <- estimate_Pi()

f812c0b519a7b9ac6cc487de33eac56b.png

二、无监督学习(Baum-Welch 算法)

无监督学习即用极大似然估计的方法估计参数。Baum-Welch算法,也就是EM算法可以高效地对隐马尔可夫模型进行训练。它是一种非监督学习算法。

假设给定数据只包含了 ​S 个长度为 ​T 的观测序列 ​\{ O_1, O_2, \cdots, O_S \}而没有对应的状态序列,目标是学习隐马尔科夫模型 ​\lambda = (A,B,\pi)的参数。我们将观测是序列数据看作观测数据 ​O,状态序列数据看作不可观测的隐数据 ​I,那么 HMM 事实上是一个含隐变量的概率模型。

P(O|\lambda) = \sum_{I}P(O|I,\lambda)P(I|\lambda)

其参数学习可以由 EM 算法实现。

Baum-Welch 模型参数估计

输入:观测数据​O=(o_1,o_2,\cdots,o_T)
输出:HMM模型参数
(1)初始化。对​n=0, 取​a_{ij}^{(0)},b_{j}(k)^{(0)},\pi_i^{(0)}, 得到模型​\lambda^{(0)} =(A^{(0)},B^{(0)},\pi^{(0)})
(2)递推。对​n=1,2,\cdots

a_{ij}^{(n+1)} = \frac{\sum_{t=1}^{T-1}\xi_t(i,j)}{\sum_{t=1}^{T-1}\gamma_t(i)}
b_j(k)^{(n+1)} = \frac{\sum_{t=1,o_t = v_k}^{T}\gamma_t(j)}{\sum_{t=1}^{T}\gamma_t(j)}
\pi_i^{(n+1)} = \gamma_1(i)

右端各值按观测​O=(o_1,o_2,\cdots,o_T)和模型​\lambda^{(n)} =(A^{(n)},B^{(n)},\pi^{(n)})计算。
(3)终止。得到模型参数​\lambda^{(n+1)}

其中,利用前向概率和后向概率, ​\gamma_t(i), \xi_t(i,j)由下公式给出。

  1. 给定模型 ​\lambda 和观测 ​O,在时刻 ​t 处于状态​q_i 的概率,由前向概率​\alpha_t(i)和后向概率​\beta_t(i)定义,记:
\gamma_t(i)=P(i_t=q_i|O, \lambda) = \frac{P(i_t=q_i,O| \lambda)}{P(\lambda|O)}=\frac{\alpha_t(i)\beta_t(i) }{\sum_{j=1}^N{\alpha_t(j)\beta_t(j)}}
  1. 给定模型 ​\lambda 和观测 ​O,在时刻 ​t 处于状态 ​q_i,且在时刻 ​t+1处于状态 ​q_j 的概率,记:
\begin{aligned} \xi_t(i,j)&=P(i_t=q_i,i_{t+1}=q_j|O, \lambda) \\ &= \frac{P(i_t=q_i,i_{t+1}=q_j,O| \lambda)}{P(O|\lambda)}\\ &= \frac{\alpha_t(i)a_{ij}b_j(o_{t+1})\beta_{t+1}(j)}{\sum_{i=1}^N\sum_{j=1}^N{\alpha_t(i)a_{ij}b_j(o_{t+1})\beta_{t+1}(j)}} \\ \end{aligned}

​\gamma_t(i)​\xi_t(i,j)对各个时刻 ​t 求和,可以得到一些有用的期望值。

  • 在观测​O下状态​i出现的期望值:
    \sum_{t=1}^T\gamma_t(i)
  • 在观测​O下状态​i转移的期望值:
    \sum_{t=1}^{T-1}\gamma_t(i)
  • 在观测​O下由状态​i转移到状态​j的期望值:
    \sum_{t=1}^{T-1}\xi_t(i,j)

R示例:Baum-Welch 模型估计HMM参数

gamma <- function(alpha,beta,i,t,N){
  
  numerator <- alpha[[t]][i]*beta[[t]][i]
  denominator <- 0
  for (j in 1:N) {
    denominator <- denominator+ alpha[[t]][j]*beta[[t]][j]
  }
  return(numerator/denominator)
}

ksi <- function(A,B,alpha,beta,i,j,t,N){
  
  numerator <- alpha[[t]][i]*A[i,j]*B[j,O[t+1]]*beta[[t+1]][j]
  denominator  <-  0
  for (i in 1:N) {
    for (j in 1:N) {
      denominator <- denominator+alpha[[t]][i]*A[i,j]*B[j,O[t+1]]*beta[[t+1]][j]
    }
  }
  return(numerator/denominator) 
}

forward <- function(A,B,Pi,T_){
  alpha <- list()
  for (t in 1:T_) { 
    if (t == 1) {
      alpha[[t]] <- Pi * B[,O[t]]
    } else {
      alpha[[t]] <- colSums(alpha[[t-1]] * A) * B[,O[t]]
    }
  }
  return(alpha)
}

backward <- function(A,B,N,T_){
  beta <- list()
  for (t in T_:1) { 
    if (t == T_) {
      beta[[t]] <- rep(1,N)
    } else {
      beta[[t]] <- colSums(t(A) * B[,O[t+1]]*beta[[t+1]])
    }
  }
  return(beta)
}

hmm_train <- function(params,O,Q,V){
  A <- params$A
  B <- params$B
  Pi <- params$Pi
  # N状态数 M观测数 T_时刻数
  N <- length(Q)
  M <- length(V)
  T_ <- length(O)
  
  tmp_A <- matrix(0,nrow = N,ncol = N)
  tmp_B <- matrix(0,nrow = N,ncol = M,dimnames = list(Q,V))
  tmp_Pi <- rep(0,N)
  
  alpha <- forward(A,B,Pi,T_)
  beta <- backward(A,B,N,T_)
  
  # a_ij
  for (i in 1:N) {
    for (j in 1:N) {
      numerator <- 0
      denominator  <-  0
      for (t in 1:(T_-1)) {
        numerator <- numerator+ksi(A,B,alpha,beta,i,j,t,N)
        denominator <- denominator+gamma(alpha,beta,i,t,N)
      }
      tmp_A[i,j] <- numerator/denominator
    }
  }
  
  #b_jk
  for (j in 1:N) {
    for (k in 1:M) {
      numerator <- 0
      denominator  <-  0
      for (t in 1:T_) {
        if (V[k] == O[t]) {
          numerator <- numerator+gamma(alpha,beta,j,t,N)
        }
        denominator <- denominator+gamma(alpha,beta,j,t,N)
      }
  
      tmp_B[j,k] <- numerator/denominator
    }
  }
  
  #pi
  for (i in 1:N) {
    tmp_Pi[i] <- gamma(alpha,beta,i,1,N)
  }

  params$A <- tmp_A
  params$B <- tmp_B
  params$Pi <- tmp_Pi
  
  return(params)
}

hmm_run <- function(params,O,Q,V,maxIter=100,criterion=10e-6){
  
  for (n in 1:maxIter) {
    old_params <- params
    params <- hmm_train(params,O,Q,V)
  
    delta_A <- max(abs(params$A-old_params$A))
    delta_B <- max(abs(params$B-old_params$B))
    delta_Pi <- max(abs(params$Pi-old_params$Pi))
    tmp_delta <-  c(delta_A,delta_B,delta_Pi)
    cat("Iter: ", n,"\n",
      "delta: ", tmp_delta,"\n")
    if ( all(tmp_delta < criterion) | (n == maxIter)) {
      print(params)
      cat("delta: ", tmp_delta,"\n",
          "Iter: ", n,"\n")
      return(params)
    }
  }
}

random_init <- function(Q,V){
  N <- length(Q) 
  M <- length(V)
  params <- list(
    A = matrix(nrow = N,ncol = N),
    B = matrix(nrow = N,ncol = M,dimnames = list(Q,V)),
    Pi = c()
    )

  for (i in 1:N) {
    temp <- runif(N,0,100)
    Sum <- sum(temp)
    for (j in 1:N) {
      params$A[i,j] = temp[j]/Sum
    }
  }
  
  for (i in 1:N) {
    temp <- runif(M,0,100)
    Sum <- sum(temp)
    for (j in 1:M) {
      params$B[i,j] = temp[j]/Sum
    }
  }
  
  temp <- runif(N,0,100)
  Sum <- sum(temp)
  params$Pi = temp/Sum
  return(params)
}
set.seed(1)
Q <- c(1,2,3) 
V <- c("R","W")
O <- c("R","W","R")

params <- random_init(Q,V)
params <- hmm_run(params = params,O,Q,V,maxIter=3)

R示例:HMM生成三角波图形

hmm_sim_data <- function(params,n,len){
  
  gen_data <- function(len = len){
    temp_I <- sample(Q,1,replace = T,prob = params$Pi)
    I <- c(temp_I)
    temp_O <- sample(V,1,replace = T,prob = params$B[temp_I,])
    O <- c(temp_O)
  
    for (i in 2:len) {
      temp_I  <- sample(Q,1,replace = T,prob = params$A[temp_I,])
      I <- c(I,temp_I)
    }
  
    for (i in 2:len) {
      temp_O <- sample(V,1,replace = T,prob = params$B[I[i],])
      O <- c(O,temp_O)
    }
    return(list(I = I,O = O))
  }
  
  sim_data <- list()
  for (x in 1:n) {
    sim_data[[x]] <- gen_data(len = len)
  }
  return(sim_data)
}

set.seed(1)
Q <- 1:10
V <-  1:4
# 模拟三角波
O <- c(1,2,3,4,3,2,1,2,3,4,3,2,1,2,3,4,3,2,1,2)
par(mfrow=c(2,1))
plot(1:length(O),O,type = 'l',main = "实际图形")

params <- random_init(Q,V)
params <- hmm_run(params = params,O,Q,V,maxIter=100,criterion = 10e-6)

gen <- hmm_sim_data(params,n = 1,len = 100)
plot(1:length(gen[[1]]$O),gen[[1]]$O,type = "l",main = "HMM生成图形")