【生成模型】简述概率密度函数可处理流模型
本期将介绍第二种非常优雅的生成模型—流模型,它也是一种概率密度函数可处理的生成模型。本文将对其原理进行介绍,并对nice模型的源码进行讲解。
作者&编辑 | 小米粥
1 流模型
这是一种想法比较直接但实际不容易构造的生成模型,它通过可逆的非线性变换等技巧使得似然函数可以被精确计算出来。对于分布比较简单(例如高斯分布)的隐变量z,其概率分布为 pz(z) ,这时若存在一个连续、可微、可逆的非线性变换g(z),将简单的潜变量z的分布转换成关于样本x的一个复杂分布,将非线性变换g(z)的逆变换记为f(x),则可得到样本x的准确的概率密度函数 px(x)
注意,非线性变换g(z)会引起空间的变形,即px(x)不等于 pz(f(x)),且有pz(z)dz=px(x)dx。若上述模型构建成功,则生成样本时只需从简单分布pz(z)中随机采样然后将其变换为 x=g(z) 即可。
为了训练非线性独立成分估计模型,我们必须计算样本的概率密度函数px(x)。分析上式,概率密度函数px(x)的计算需要计算pz(z)和雅可比矩阵的行列式绝对值。对于前者,要构建一个 g(z) 的逆变换f(x),这样当给定一个样本 x 后,就可以通过z=f(x)计算得到其对应的隐变量,pz(z)通常设计为简单的分布,容易计算;对于后者,要将f(x)设计为某种特殊的形式,使得雅可比矩阵的行列式易于计算。另外,变换的可逆性要求样本x和隐变量z具有相同的维度。综上,需要将生成模型精心设计成一种易于处理且灵活的双射模型,使其逆变换f(x)存在,且对应的雅可比矩阵的行列式可计算。
为了对这类模型有更深刻的理解,我们对NICE模型进行详细的介绍。NICE模型的逆变换f(x)由多个加性耦合层和一个尺度变换层构成。在每个加性耦合层中,首先将 n 维样本 x 分解为两部分x1和x2,例如将x的第1,3,5...个元素划入x1部分,将第2,4,6...个元素划入x2部分,每个部分的维度均为n/2;也可以将 x 的第2,4,6...个元素划入x1部分,将第1,3,5...个元素划入 x2 部分;或者使用其他划分方式。然后对两部分其进行变换:
其中m()为任意函数,注意这里要保证m()的输出结果维度与 x2 保持一致,NICE模型使用多层全连接网络和ReLU激活函数来构建 m() 。容易发现,使用加性耦合层作为逆变换f(x)的其中一部分,它是可逆的并且雅可比矩阵的行列式也是容易计算的。当已知h1和h2时,可得其逆变换:
其雅可比矩阵为:
根据三角矩阵的性质,其行列式为对角元素的乘积,故加性耦合层雅可比矩阵的行列式绝对值为1。当将多个加性耦合层串联时,如下图所示。由于每个层的逆变换是容易计算的,则串联后的逆变换仍然是容易计算的。此时的雅可比矩阵为:
根据矩阵行列式的性质,有:
需要说明的是,必须注意在不同的加性耦合层使用不同的划分策略,使得样本不同维度的信息充分混淆。在尺度变换层,定义了包含n个非负参数的向量s=[s1,s2,...,sn],将加性耦合层的输出结果h(l)与s逐元素相乘可得到对应的隐变量z。这里s用于控制每个维度的特征变换的尺度,可以表征维度的重要性,对应维度的数值较大表明这一维度的重要性低,因为生成样本时隐变量需要先经过尺度变换层,隐变量在尺度变换层需要逐元素乘1/s。显然,尺度变换层的逆变换只需逐元素乘1/s,为了计算雅可比矩阵,将尺度变换写为对角矩阵的形式:
则其雅可比矩阵的行列式为s1s2...sn。现在,我们构造了可逆的、雅可比矩阵的行列式绝对值易于计算的逆变换f(x),对于隐变量z,NICE模型假设其n个维度彼此独立,即
若选择z为高斯分布,则样本x的似然函数为:
若选择z为logistic分布,即
则样本x的似然函数为
现在,我们可以使用极大似然法对NICE模型进行训练,训练完成后也得到了生成模型g(z)。若 z 为高斯分布,则直接从高斯分布中采样可得到z;若选择 为logistic分布,可现在0-1之间的均匀分布中采样得到\epsilon ,然后使用变换z=t(ε)得到隐变量。根据两个随机变量的映射关系
则有
将隐变量z使用非线性变换g(z),即经过尺度变换层的逆变换、多个加性耦合层的逆变换可得到生成样本x。
2 NICE 代码
接下来我们将提供一份完整的NICE的代码讲解,其中训练集为mnist数据集。
首先读取相关python库:
import argparse
import torch
import torchvision
import numpy as np
import niceimport utils
定义主函数,设置相关参数,并进行模型的训练和验证
定义二维掩膜卷积核,其中有A与B两种类型,区别之处在于中心位置是否被卷积计算:
def main(args):
device = torch.device("cuda:0")
# 训练相关参数设置
dataset = args.dataset
batch_size = args.batch_size
latent = args.latent
max_iter = args.max_iter
sample_size = args.sample_size
coupling = 4 mask_config = 1.
# 优化器设置
lr = args.lr
momentum = args.momentum
decay = args.decay
zca = None
mean = None
# 选择相应的训练数据集
if dataset == 'mnist':
mean = torch.load('./statistics/mnist_mean.pt') (full_dim, mid_dim, hidden) = (1 * 28 * 28, 1000, 5) transform = torchvision.transforms.ToTensor() trainset = torchvision.datasets.MNIST(root='~/torch/data/MNIST', train=True, download=True, transform=transform) trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
# 使用正态分布或逻辑回归分布
if latent == 'normal':
prior = torch.distributions.Normal(
torch.tensor(0.).to(device), torch.tensor(1.).to(device))
elif latent == 'logistic':
prior = utils.StandardLogistic()
filename = '%s_' % dataset \
+ 'bs%d_' % batch_size \
+ '%s_' % latent \
+ 'cp%d_' % coupling \
+ 'md%d_' % mid_dim \
+ 'hd%d_' % hidden
# 实例化流模型
flow = nice.NICE(prior=prior,
coupling=coupling,
in_out_dim=full_dim,
mid_dim=mid_dim,
hidden=hidden,
mask_config=mask_config).to(device)
optimizer = torch.optim.Adam(
flow.parameters(), lr=lr, betas=(momentum, decay), eps=1e-4)
total_iter = 0
train = True
running_loss = 0
# 训练模型
while train:
for _, data in enumerate(trainloader, 1):
flow.train()
# set to training mode
if total_iter == max_iter:
train = False
break
total_iter += 1
optimizer.zero_grad()
# clear gradient tensors
inputs, _ = data
inputs = utils.prepare_data(
inputs, dataset, zca=zca,
mean=mean).to(device)
# log-likelihood of input minibatch
loss = -flow(inputs).mean()
running_loss += float(loss)
# backprop and update parameters
loss.backward()
optimizer.step()
if total_iter % 10 == 0:
mean_loss = running_loss / 2000
bit_per_dim = (mean_loss + np.log(256.) * full_dim) \
/ (full_dim * np.log(2.))
print('iter %s:' % total_iter,
'loss = %.3f' % mean_loss,
'bits/dim = %.3f' % bit_per_dim)
running_loss = 0.0
if total_iter % 1000 == 0:
flow.eval()
# set to inference mode
with torch.no_grad():
samples = flow.sample(sample_size).cpu()
samples = utils.prepare_data( samples, dataset, zca=zca, mean=mean, reverse=True)
torchvision.utils.save_image(torchvision.utils.make_grid(samples),
'./samples/' + filename +'iter%d.png' % total_iter)
# 保存模型
torch.save({
'total_iter': total_iter,
'model_state_dict': flow.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'dataset': dataset,
'batch_size': batch_size,
'latent': latent,
'coupling': coupling,
'mid_dim': mid_dim,
'hidden': hidden,
'mask_config': mask_config},
'./models/mnist/' + filename +'iter%d.tar' % total_iter)
print('Checkpoint Saved')
if __name__ == '__main__':
parser = argparse.ArgumentParser('MNIST NICE PyTorch implementation')
parser.add_argument('--dataset',
help='dataset to be modeled.',
type=str,
default='mnist') parser.add_argument('--batch_size',
help='number of images in a mini-batch.',
type=int,
default=256)
parser.add_argument('--latent',
help='latent distribution.',
type=str,
default='logistic')
parser.add_argument('--max_iter',
help='maximum number of iterations.',
type=int,
default=20000)
parser.add_argument('--sample_size',
help='number of images to generate.',
type=int,
default=64)
parser.add_argument('--lr',
help='initial learning rate.',
type=float,
default=1e-3)
parser.add_argument('--momentum',
help='beta1 in Adam optimizer.',
type=float,
default=0.9)
parser.add_argument('--decay',
help='beta2 in Adam optimizer.',
type=float,
default=0.999)
args = parser.parse_args()
main(args)
NICE流模型的核心代码:
# 加性耦合层class Coupling(nn.Module):
def __init__(self, in_out_dim, mid_dim, hidden, mask_config):
super(Coupling, self).__init__()
self.mask_config = mask_config
self.in_block = nn.Sequential(
nn.Linear(in_out_dim//2, mid_dim),
nn.ReLU())
self.mid_block = nn.ModuleList([
nn.Sequential(
nn.Linear(mid_dim, mid_dim),
nn.ReLU()) for _ in range(hidden - 1)])
self.out_block = nn.Linear(mid_dim, in_out_dim//2)
def forward(self, x, reverse=False):
[B, W] = list(x.size())
x = x.reshape((B, W//2, 2))
if self.mask_config:
on, off = x[:, :, 0], x[:, :, 1]
else:
off, on = x[:, :, 0], x[:, :, 1]
off_ = self.in_block(off)
for i in range(len(self.mid_block)):
off_ = self.mid_block[i](off_)
shift = self.out_block(off_)
if reverse:
on = on - shift
else:
on = on + shift
if self.mask_config:
x = torch.stack((on, off), dim=2)
else:
x = torch.stack((off, on), dim=2)
return x.reshape((B, W))
# 尺度变换层
class Scaling(nn.Module):
def __init__(self, dim):
super(Scaling, self).__init__()
self.scale = nn.Parameter(
torch.zeros((1, dim)), requires_grad=True)
def forward(self, x, reverse=False):
log_det_J = torch.sum(self.scale)
if reverse:
x = x * torch.exp(-self.scale)
else:
x = x * torch.exp(self.scale)
return x, log_det_J
# NICE模型
class NICE(nn.Module):
def __init__(self, prior, coupling,
in_out_dim, mid_dim, hidden, mask_config):
self.prior = prior
self.in_out_dim = in_out_dim
self.coupling = nn.ModuleList([
Coupling(in_out_dim=in_out_dim,
mid_dim=mid_dim,
hidden=hidden,
mask_config=(mask_config+i)%2) \ for i in range(coupling)])
self.scaling = Scaling(in_out_dim)
def g(self, z):
x, _ = self.scaling(z, reverse=True)
for i in reversed(range(len(self.coupling))):
x = self.coupling[i](x, reverse=True)
return x
def f(self, x):
for i in range(len(self.coupling)):
x = self.coupling[i](x)
return self.scaling(x)
def log_prob(self, x):
z, log_det_J = self.f(x)
log_ll = torch.sum(self.prior.log_prob(z), dim=1)
return log_ll + log_det_J
def sample(self, size):
z = self.prior.sample((size, self.in_out_dim)).cuda() return self.g(z)
def forward(self, x):
return self.log_prob(x)
[1] Dinh L , Krueger D , Bengio Y . NICE: Non-linear Independent Components Estimation[J]. Computer ence, 2014.
[2]Diederik P. Kingma*†, Prafulla Dhariwal⇤. Glow: Generative Flow with Invertible 1x1 Convolutions[J]. 2018.
总结
本期带大家学习了流模型,流模型的生成效果也非常卓越,但是其网络设计比较复杂,训练难度比较大。下一期将为大家继续讲解其他生成模型。