高斯混合模型(GMM):理念、数学、EM算法和python实现

高斯混合模型是一种流行的无监督学习算法。GMM方法类似于K-Means聚类算法,但是由于其复杂性,它更健壮,因此更有用。

K-means聚类使用欧式距离函数来发现数据中的聚类。只要数据相对于质心呈圆形分布,此方法就可以很好地工作。但是,如果数据是非线性的呢?或者数据具有非零的协方差呢?如果聚类具有不同的均值和协方差怎么办?

这就要用到高斯混合模型了!

GMM假设生成数据的是一种混合的高斯分布。与将数据点硬分配到聚类的K-means方法(假设围绕质心的数据呈圆形分布)相比,它使用了将数据点软分配到聚类的方法(即概率性,因此更好)。

简而言之,GMM效果更好,因为:(A)通过使用软分配捕获属于不同聚类的数据点的不确定性,(B)对圆形聚类没有偏见。即使是非线性数据分布,它也能很好地工作。

GMM

GMM的目标函数是最大化数据X、p(X)或对数似然值L的似然值(因为对数是单调递增函数)。通过假设混合了K个高斯来生成数据,我们可以将p(X)写为边缘概率,对所有数据点的K个聚类求和。

似然值

对数似然值

利用上面对数函数的求和,我们不能得到解析解。看起来很讨厌,但这个问题有一个很好的解决方案:Expectation-Maximization(EM)算法。

数学

EM算法是一种迭代算法,用于在无法直接找到参数的情况下寻找模型的最大似然估计(MLE)。它包括两个步骤:期望步骤和最大化步骤。

1.期望步骤:计算成员值r_ic。这是数据点x_i属于聚类c的概率。

2. 最大化步骤:计算一个新参数mc,该参数确定属于不同聚类的点的分数。 通过计算每个聚类c的MLE来更新参数μ,π,Σ。

重复EM步骤,直到对数似然值L收敛。

Python编码

让我们从头开始用python编写GMM的基本实现。

生成一维数据。

x = np.linspace(-5, 5, 20)x1 = x*np.random.rand(20)x2 = x*np.random.rand(20) + 10x3 = x*np.random.rand(20) - 10xt = np.hstack((x1,x2,x3))

初始化GMM的参数:μ,π,Σ。

max_iterations = 10pi = np.array([1/3, 1/3, 1/3])mu = np.array([5,6,-3])var = np.array([1,3,9])r = np.zeros((len(xt), 3))

运行EM算法的第一次迭代

import matplotlib.pyplot as pltimport numpy as npfrom scipy.stats import normgauss1 = norm(loc=mu[0], scale=var[0])gauss2 = norm(loc=mu[1], scale=var[1])gauss3 = norm(loc=mu[2], scale=var[2]) # E-Stepfor c,g,p in zip(range(3), [gauss1, gauss2, gauss3], pi): r[:,c] = p*g.pdf(xt[:])for i in range(len(r)): r[i,:] /= np.sum(r[i,:])fig = plt.figure(figsize=(10,10))ax0 = fig.add_subplot(111)for i in range(len(r)): ax0.scatter(xt[i],0,c=r[i,:],s=100) for g,c in zip([gauss1.pdf(np.linspace(-15,15)),gauss2.pdf(np.linspace(-15,15)),gauss3.pdf(np.linspace(-15,15))],['r','g','b']): ax0.plot(np.linspace(-15,15),g,c=c,zorder=0)ax0.set_xlabel('X-axis')ax0.set_ylabel('Gaussian pdf value')ax0.legend(['Gaussian 1', 'Gaussian 2', 'Gaussian 3'])plt.show() # M-Stepmc = np.sum(r, axis=0)pi = mc/len(xt)mu = np.sum(r*np.vstack((xt, xt, xt)).T, axis=0)/mcvar = []for c in range(len(pi)): var.append(np.sum(np.dot(r[:,c]*(xt[i] - mu[c]).T, r[:,c]*(xt[i] - mu[c])))/mc[c])

将此代码放在for循环中,并将其放在类对象中。

class GMM1D:    '''Apply GMM to 1D Data'''        def __init__(self, X, max_iterations):        '''Initialize data and max_iterations'''        self.X = X        self.max_iterations = max_iterations            def run(self):        '''Initialize parameters mu, var, pi'''        self.pi = np.array([1/3, 1/3, 1/3])        self.mu = np.array([5,8,1])        self.var = np.array([5,3,1])                r = np.zeros((len(self.X), 3))                for itr in range(self.max_iterations):                gauss1 = norm(loc=self.mu[0], scale=self.var[0])            gauss2 = norm(loc=self.mu[1], scale=self.var[1])            gauss3 = norm(loc=self.mu[2], scale=self.var[2])                        # E-Step            for c,g,p in zip(range(3), [gauss1, gauss2, gauss3], self.pi):                r[:,c] = p*g.pdf(xt[:])            for i in range(len(r)):                r[i,:] /= np.sum(r[i,:])            fig = plt.figure(figsize=(10,10))            ax0 = fig.add_subplot(111)            for i in range(len(r)):                ax0.scatter(xt[i],0,c=r[i,:],s=100)             for g,c in zip([gauss1.pdf(np.linspace(-15,15)),gauss2.pdf(np.linspace(-15,15)),gauss3.pdf(np.linspace(-15,15))],['r','g','b']):                ax0.plot(np.linspace(-15,15),g,c=c,zorder=0)            plt.show()            # M-Step            mc = np.sum(r, axis=0)            self.pi = mc/len(self.X)            self.mu = np.sum(r*np.vstack((self.X, self.X, self.X)).T, axis=0)/mc            self.var = []            for c in range(len(self.pi)):                self.var.append(np.sum(np.dot(r[:,c]*(self.X[i] - self.mu[c]).T, r[:,c]*(self.X[i] - self.mu[c])))/mc[c])gmm = GMM1D(xt, 10)gmm.run()

我们已经建立并运行了一个一维数据模型。同样的原理也适用于更高维度(≥2D)。唯一的区别是我们将使用多元高斯分布。让我们为2D模型编写Python代码。

让我们生成一些数据并编写我们的模型

from sklearn.datasets.samples_generator import make_blobsfrom scipy.stats import multivariate_normalX,Y = make_blobs(cluster_std=1.5,random_state=20,n_samples=500,centers=3)X = np.dot(X, np.random.RandomState(0).randn(2,2))plt.figure(figsize=(8,8))plt.scatter(X[:, 0], X[:, 1])plt.show()
class GMM2D:    '''Apply GMM to 2D data'''        def __init__(self, num_clusters, max_iterations):        '''Initialize num_clusters(K) and max_iterations for the model'''        self.num_clusters = num_clusters        self.max_iterations = max_iterations    def run(self, X):        '''Initialize parameters and run E and M step storing log-likelihood value after every iteration'''        self.pi = np.ones(self.num_clusters)/self.num_clusters        self.mu = np.random.randint(min(X[:, 0]), max(X[:, 0]), size=(self.num_clusters, len(X[0])))        self.cov = np.zeros((self.num_clusters, len(X[0]), len(X[0])))        for n in range(len(self.cov)):            np.fill_diagonal(self.cov[n], 5)        # reg_cov is used for numerical stability i.e. to check singularity issues in covariance matrix         self.reg_cov = 1e-6*np.identity(len(X[0]))        x,y = np.meshgrid(np.sort(X[:,0]), np.sort(X[:,1]))        self.XY = np.array([x.flatten(), y.flatten()]).T        # Plot the data and the initial model        fig0 = plt.figure(figsize=(10,10))        ax0 = fig0.add_subplot(111)        ax0.scatter(X[:, 0], X[:, 1])        ax0.set_title('Initial State')        for m, c in zip(self.mu, self.cov):            c += self.reg_cov            multi_normal = multivariate_normal(mean=m, cov=c)            ax0.contour(np.sort(X[:, 0]), np.sort(X[:, 1]), multi_normal.pdf(self.XY).reshape(len(X), len(X)), colors = 'black', alpha = 0.3)            ax0.scatter(m[0], m[1], c='grey', zorder=10, s=100)                fig0.savefig('GMM2D Initial State.png')        plt.show()        self.log_likelihoods = []        for iters in range(self.max_iterations):            # E-Step            self.ric = np.zeros((len(X), len(self.mu)))            for pic, muc, covc, r in zip(self.pi, self.mu, self.cov, range(len(self.ric[0]))):                covc += self.reg_cov                mn = multivariate_normal(mean=muc, cov=covc)                self.ric[:, r] = pic*mn.pdf(X)            for r in range(len(self.ric)):                self.ric[r, :] = self.ric[r, :] / np.sum(self.ric[r, :])            # M-step            self.mc = np.sum(self.ric, axis=0)            self.pi = self.mc/np.sum(self.mc)            self.mu = np.dot(self.ric.T, X) / self.mc.reshape(self.num_clusters,1)            self.cov = []            for r in range(len(self.pi)):                covc = 1/self.mc[r] * (np.dot( (self.ric[:, r].reshape(len(X), 1)*(X-self.mu[r]) ).T, X - self.mu[r]) + self.reg_cov)                self.cov.append(covc)            self.cov = np.asarray(self.cov)            self.log_likelihoods.append(np.log(np.sum([self.pi[r]*multivariate_normal(self.mu[r], self.cov[r] + self.reg_cov).pdf(X) for r in range(len(self.pi))])))            fig1 = plt.figure(figsize=(10,10))            ax1 = fig1.add_subplot(111)            ax1.scatter(X[:, 0], X[:, 1])            ax1.set_title('Iteration ' + str(iters))            for m, c in zip(self.mu, self.cov):                c += self.reg_cov                multi_normal = multivariate_normal(mean=m, cov=c)                ax1.contour(np.sort(X[:, 0]), np.sort(X[:, 1]), multi_normal.pdf(self.XY).reshape(len(X), len(X)), colors = 'black', alpha = 0.3)                ax1.scatter(m[0], m[1], c='grey', zorder=10, s=100)                        fig1.savefig('GMM2D Iter ' + str(iters) + '.png')            plt.show()        fig2 = plt.figure(figsize=(10,10))        ax2 = fig2.add_subplot(111)        ax2.plot(range(0, iters+1, 1), self.log_likelihoods)        ax2.set_title('Log Likelihood Values')        fig2.savefig('GMM2D Log Likelihood.png')        plt.show()    def predict(self, Y):        '''Predicting cluster for new samples in array Y'''        predictions = []        for pic, m, c in zip(self.pi, self.mu, self.cov):            prob = pic*multivariate_normal(mean=m, cov=c).pdf(Y)            predictions.append([prob])        predictions = np.asarray(predictions).reshape(len(Y), self.num_clusters)        predictions = np.argmax(predictions, axis=1)        fig2 = plt.figure(figsize=(10,10))        ax2 = fig2.add_subplot(111)        ax2.scatter(X[:, 0], X[:, 1], c='c')        ax2.scatter(Y[:, 0], Y[:, 1], marker='*', c='k', s=150, label = 'New Data')        ax2.set_title('Predictions on New Data')        colors = ['r', 'b', 'g']        for m, c, col, i in zip(self.mu, self.cov, colors, range(len(colors))):    #         c += reg_cov            multi_normal = multivariate_normal(mean=m, cov=c)            ax2.contour(np.sort(X[:, 0]), np.sort(X[:, 1]), multi_normal.pdf(self.XY).reshape(len(X), len(X)), colors = 'black', alpha = 0.3)            ax2.scatter(m[0], m[1], marker='o', c=col, zorder=10, s=150, label = 'Centroid ' + str(i+1))        for i in range(len(Y)):            ax2.scatter(Y[i, 0], Y[i, 1], marker='*', c=colors[predictions[i]], s=150)        ax2.set_xlabel('X-axis')        ax2.set_ylabel('Y-axis')        ax2.legend()        fig2.savefig('GMM2D Predictions.png')        plt.show()        return predictions

让我们对此模型进行一些预测

y = np.random.randint(-10, 20, size=(12, 2))gmm2d = GMM2D(num_clusters=3, max_iterations=10)gmm2d.run(X)gmm2d.predict(y)

如果使用sklearn,可以在几行代码中完成相同的任务。

from sklearn.mixture import GaussianMixtureX,Y = make_blobs(cluster_std=1.5,random_state=20,n_samples=500,centers=3)X = np.dot(X, np.random.RandomState(0).randn(2,2))GMM = GaussianMixture(n_components=3)GMM.fit(X)Y = np.random.randint(-10, 20, size=(1, 2))print(GMM.means_, GMM.predict_proba(Y))'''Out: [[19.88168663 17.47097164] [-12.83538784   4.89646199] [11.09673732 18.67548935]] [[1.91500946e-17 9.30483496e-01 6.95165038e-02]]'''

GMM将样本分类为第二类。

结论

实现高斯混合模型并不难。一旦你清楚了数学,它将为模型找到最大似然估计(无论是一维数据还是高维数据)。该方法具有较强的鲁棒性,在执行聚类任务时非常有用。现在您已经熟悉了GMM的python实现,可以使用数据集执行一些很酷的操作。

(0)

相关推荐