【生成模型】简述概率密度函数可处理流模型

本期将介绍第二种非常优雅的生成模型—流模型,它也是一种概率密度函数可处理的生成模型。本文将对其原理进行介绍,并对nice模型的源码进行讲解。

作者&编辑 | 小米粥

1 流模型

这是一种想法比较直接但实际不容易构造的生成模型,它通过可逆的非线性变换等技巧使得似然函数可以被精确计算出来。对于分布比较简单(例如高斯分布)的隐变量z,其概率分布为 pz(z) ,这时若存在一个连续、可微、可逆的非线性变换g(z),将简单的潜变量z的分布转换成关于样本x的一个复杂分布,将非线性变换g(z)的逆变换记为f(x),则可得到样本x的准确的概率密度函数 px(x)

注意,非线性变换g(z)会引起空间的变形,即px(x)不等于 pz(f(x)),且有pz(z)dz=px(x)dx。若上述模型构建成功,则生成样本时只需从简单分布pz(z)中随机采样然后将其变换为 x=g(z) 即可。

为了训练非线性独立成分估计模型,我们必须计算样本的概率密度函数px(x)。分析上式,概率密度函数px(x)的计算需要计算pz(z)和雅可比矩阵的行列式绝对值。对于前者,要构建一个 g(z) 的逆变换f(x),这样当给定一个样本 x 后,就可以通过z=f(x)计算得到其对应的隐变量,pz(z)通常设计为简单的分布,容易计算;对于后者,要将f(x)设计为某种特殊的形式,使得雅可比矩阵的行列式易于计算。另外,变换的可逆性要求样本x和隐变量z具有相同的维度。综上,需要将生成模型精心设计成一种易于处理且灵活的双射模型,使其逆变换f(x)存在,且对应的雅可比矩阵的行列式可计算。

为了对这类模型有更深刻的理解,我们对NICE模型进行详细的介绍。NICE模型的逆变换f(x)由多个加性耦合层和一个尺度变换层构成。在每个加性耦合层中,首先将 n 维样本 x 分解为两部分x1和x2,例如将x的第1,3,5...个元素划入x1部分,将第2,4,6...个元素划入x2部分,每个部分的维度均为n/2;也可以将 x 的第2,4,6...个元素划入x1部分,将第1,3,5...个元素划入 x2 部分;或者使用其他划分方式。然后对两部分其进行变换:

其中m()为任意函数,注意这里要保证m()的输出结果维度与 x2 保持一致,NICE模型使用多层全连接网络和ReLU激活函数来构建 m() 。容易发现,使用加性耦合层作为逆变换f(x)的其中一部分,它是可逆的并且雅可比矩阵的行列式也是容易计算的。当已知h1和h2时,可得其逆变换:

其雅可比矩阵为:

根据三角矩阵的性质,其行列式为对角元素的乘积,故加性耦合层雅可比矩阵的行列式绝对值为1。当将多个加性耦合层串联时,如下图所示。由于每个层的逆变换是容易计算的,则串联后的逆变换仍然是容易计算的。此时的雅可比矩阵为:

根据矩阵行列式的性质,有:

需要说明的是,必须注意在不同的加性耦合层使用不同的划分策略,使得样本不同维度的信息充分混淆。在尺度变换层,定义了包含n个非负参数的向量s=[s1,s2,...,sn],将加性耦合层的输出结果h(l)与s逐元素相乘可得到对应的隐变量z。这里s用于控制每个维度的特征变换的尺度,可以表征维度的重要性,对应维度的数值较大表明这一维度的重要性低,因为生成样本时隐变量需要先经过尺度变换层,隐变量在尺度变换层需要逐元素乘1/s。显然,尺度变换层的逆变换只需逐元素乘1/s,为了计算雅可比矩阵,将尺度变换写为对角矩阵的形式:

则其雅可比矩阵的行列式为s1s2...sn。现在,我们构造了可逆的、雅可比矩阵的行列式绝对值易于计算的逆变换f(x),对于隐变量z,NICE模型假设其n个维度彼此独立,即

若选择z为高斯分布,则样本x的似然函数为:

若选择z为logistic分布,即

则样本x的似然函数为

现在,我们可以使用极大似然法对NICE模型进行训练,训练完成后也得到了生成模型g(z)。若  z 为高斯分布,则直接从高斯分布中采样可得到z;若选择  为logistic分布,可现在0-1之间的均匀分布中采样得到\epsilon ,然后使用变换z=t(ε)得到隐变量。根据两个随机变量的映射关系

则有

将隐变量z使用非线性变换g(z),即经过尺度变换层的逆变换、多个加性耦合层的逆变换可得到生成样本x。

2 NICE 代码

接下来我们将提供一份完整的NICE的代码讲解,其中训练集为mnist数据集。

首先读取相关python库:

import argparse

import torch

import torchvision

import numpy as np

import niceimport utils

定义主函数,设置相关参数,并进行模型的训练和验证

定义二维掩膜卷积核,其中有A与B两种类型,区别之处在于中心位置是否被卷积计算:

def main(args):

device = torch.device("cuda:0")

# 训练相关参数设置

dataset = args.dataset

batch_size = args.batch_size

latent = args.latent

max_iter = args.max_iter

sample_size = args.sample_size

coupling = 4    mask_config = 1.

# 优化器设置

lr = args.lr

momentum = args.momentum

decay = args.decay

zca = None

mean = None

# 选择相应的训练数据集

if dataset == 'mnist':

mean = torch.load('./statistics/mnist_mean.pt')        (full_dim, mid_dim, hidden) = (1 * 28 * 28, 1000, 5)        transform = torchvision.transforms.ToTensor()        trainset = torchvision.datasets.MNIST(root='~/torch/data/MNIST',            train=True, download=True, transform=transform)        trainloader = torch.utils.data.DataLoader(trainset,            batch_size=batch_size, shuffle=True, num_workers=2)

# 使用正态分布或逻辑回归分布

if latent == 'normal':

prior = torch.distributions.Normal(

torch.tensor(0.).to(device), torch.tensor(1.).to(device))

elif latent == 'logistic':

prior = utils.StandardLogistic()

filename = '%s_' % dataset \

+ 'bs%d_' % batch_size \

+ '%s_' % latent \

+ 'cp%d_' % coupling \

+ 'md%d_' % mid_dim \

+ 'hd%d_' % hidden

# 实例化流模型

flow = nice.NICE(prior=prior,

coupling=coupling,

in_out_dim=full_dim,

mid_dim=mid_dim,

hidden=hidden,

mask_config=mask_config).to(device)

optimizer = torch.optim.Adam(

flow.parameters(), lr=lr, betas=(momentum, decay), eps=1e-4)

total_iter = 0

train = True

running_loss = 0

# 训练模型

while train:

for _, data in enumerate(trainloader, 1):

flow.train()

# set to training mode

if total_iter == max_iter:

train = False

break

total_iter += 1

optimizer.zero_grad()

# clear gradient tensors

inputs, _ = data

inputs = utils.prepare_data(

inputs, dataset, zca=zca,

mean=mean).to(device)

# log-likelihood of input minibatch

loss = -flow(inputs).mean()

running_loss += float(loss)

# backprop and update parameters

loss.backward()

optimizer.step()

if total_iter % 10 == 0:

mean_loss = running_loss / 2000

bit_per_dim = (mean_loss + np.log(256.) * full_dim) \

/ (full_dim * np.log(2.))

print('iter %s:' % total_iter,

'loss = %.3f' % mean_loss,

'bits/dim = %.3f' % bit_per_dim)

running_loss = 0.0

if total_iter % 1000 == 0:

flow.eval()

# set to inference mode

with torch.no_grad():

samples = flow.sample(sample_size).cpu()

samples = utils.prepare_data(                        samples, dataset, zca=zca, mean=mean, reverse=True)

torchvision.utils.save_image(torchvision.utils.make_grid(samples),

'./samples/' + filename +'iter%d.png' % total_iter)

# 保存模型

torch.save({

'total_iter': total_iter,

'model_state_dict': flow.state_dict(),

'optimizer_state_dict': optimizer.state_dict(),

'dataset': dataset,

'batch_size': batch_size,

'latent': latent,

'coupling': coupling,

'mid_dim': mid_dim,

'hidden': hidden,

'mask_config': mask_config},

'./models/mnist/' + filename +'iter%d.tar' % total_iter)

print('Checkpoint Saved')

if __name__ == '__main__':

parser = argparse.ArgumentParser('MNIST NICE PyTorch implementation')

parser.add_argument('--dataset',

help='dataset to be modeled.',

type=str,

default='mnist')    parser.add_argument('--batch_size',

help='number of images in a mini-batch.',

type=int,

default=256)

parser.add_argument('--latent',

help='latent distribution.',

type=str,

default='logistic')

parser.add_argument('--max_iter',

help='maximum number of iterations.',

type=int,

default=20000)

parser.add_argument('--sample_size',

help='number of images to generate.',

type=int,

default=64)

parser.add_argument('--lr',

help='initial learning rate.',

type=float,

default=1e-3)

parser.add_argument('--momentum',

help='beta1 in Adam optimizer.',

type=float,

default=0.9)

parser.add_argument('--decay',

help='beta2 in Adam optimizer.',

type=float,

default=0.999)

args = parser.parse_args()

main(args)

NICE流模型的核心代码:

# 加性耦合层class Coupling(nn.Module):

def __init__(self, in_out_dim, mid_dim, hidden, mask_config):

super(Coupling, self).__init__()

self.mask_config = mask_config

self.in_block = nn.Sequential(

nn.Linear(in_out_dim//2, mid_dim),

nn.ReLU())

self.mid_block = nn.ModuleList([

nn.Sequential(

nn.Linear(mid_dim, mid_dim),

nn.ReLU()) for _ in range(hidden - 1)])

self.out_block = nn.Linear(mid_dim, in_out_dim//2)

def forward(self, x, reverse=False):

[B, W] = list(x.size())

x = x.reshape((B, W//2, 2))

if self.mask_config:

on, off = x[:, :, 0], x[:, :, 1]

else:

off, on = x[:, :, 0], x[:, :, 1]

off_ = self.in_block(off)

for i in range(len(self.mid_block)):

off_ = self.mid_block[i](off_)

shift = self.out_block(off_)

if reverse:

on = on - shift

else:

on = on + shift

if self.mask_config:

x = torch.stack((on, off), dim=2)

else:

x = torch.stack((off, on), dim=2)

return x.reshape((B, W))

# 尺度变换层

class Scaling(nn.Module):

def __init__(self, dim):

super(Scaling, self).__init__()

self.scale = nn.Parameter(

torch.zeros((1, dim)), requires_grad=True)

def forward(self, x, reverse=False):

log_det_J = torch.sum(self.scale)

if reverse:

x = x * torch.exp(-self.scale)

else:

x = x * torch.exp(self.scale)

return x, log_det_J

# NICE模型

class NICE(nn.Module):

def __init__(self, prior, coupling,

in_out_dim, mid_dim, hidden, mask_config):

self.prior = prior

self.in_out_dim = in_out_dim

self.coupling = nn.ModuleList([

Coupling(in_out_dim=in_out_dim,

mid_dim=mid_dim,

hidden=hidden,

mask_config=(mask_config+i)%2) \            for i in range(coupling)])

self.scaling = Scaling(in_out_dim)

def g(self, z):

x, _ = self.scaling(z, reverse=True)

for i in reversed(range(len(self.coupling))):

x = self.coupling[i](x, reverse=True)

return x

def f(self, x):

for i in range(len(self.coupling)):

x = self.coupling[i](x)

return self.scaling(x)

def log_prob(self, x):

z, log_det_J = self.f(x)

log_ll = torch.sum(self.prior.log_prob(z), dim=1)

return log_ll + log_det_J

def sample(self, size):

z = self.prior.sample((size, self.in_out_dim)).cuda()        return self.g(z)

def forward(self, x):

return self.log_prob(x)

[1] Dinh L , Krueger D , Bengio Y . NICE: Non-linear Independent Components Estimation[J]. Computer ence, 2014.

[2]Diederik P. Kingma*†, Prafulla Dhariwal⇤. Glow: Generative Flow with Invertible 1x1 Convolutions[J]. 2018.

总结

本期带大家学习了流模型,流模型的生成效果也非常卓越,但是其网络设计比较复杂,训练难度比较大。下一期将为大家继续讲解其他生成模型。

(0)

相关推荐