Spark推荐实战系列之LR的两种实现方式和多分类LR实战介绍

本文主要包含以下内容:

  • 回归分析

    • 什么是回归分析

    • 回归分析算法分类

  • 逻辑回归介绍

    • Sigmoid函数

    • LR为什么使用Sigmoid函数

    • LR的算法原理

  • mllib中的LRWithLBFGS

  • ml中的二分类LR

  • ml中的多分类LR

逻辑回归(Logistic Regression,LR)是较早应用在推荐排序上的,其属于线性模型,模型简单,可以引入海量离散特征,好处是模型可以考虑更加细节或者说针对具体个体的因素。如果想要引入非线性因素需要做特征交叉,这样很容易产生百亿特征,在很早之前ctr就主要靠堆人力搞特征工程工作来持续优化效果。

虽然目前在工业界单纯使用LR做排序的场景或业务并不多,但是对于初学者,一些中小企业在资源和人力不那么充足的情况下,LR扔不失为一个不错的选择。这样可以遵循先上线再迭代的策略完善排序模型。

在学习LR之前先了解一下什么是回归分析和其分类。

回归分析

回归分析算法(Regression Analysis Algorithm)是机器学习算法中最常见的一类机器学习算法。

什么是回归分析

回归分析就是利用样本(已知数据),产生拟合方程,从而(对未知数据)进行预测。例如有一组随机变量 和另外一组随机变量 ,那么研究变量 与 之间关系的统计学方法就叫作回归分析。因为这里 和 是单一对应的,所以这里是一元线性回归。

回归分析算法分类

回归分析算法分为线性回归算法和非线性回归算法。

1、线性回归

线性回归可以分为一元线性回归和多元线性回归。当然线性回归中自变量的指数都是 1,这里的线性并非真的是指用一条线将数据连起来,也可以用一个二维平面、三维曲面等。

一元线性回归:只有一个自变量的回归。例如房子面积(Area)和房子总价(Money)的关系,随着面积(Area)的增大,房屋价格也是不断增加。这里的自变量只有面积,所以是一元线性回归。

多元线性回归:自变量大于或等于两个的回归。例如房子面积(Area)、楼层(floor)和房屋价格(Money)的关系,这里自变量有两个,所以是二元线性回归。

典型的线性回归方法如下:

在统计意义上,如果一个回归等式是线性的,那么它相对于参数就必须是线性的。如果相对于参数是线性的,那么即使相对于样本变量的特征是二次方或多次方的,这个回归模型也是线性的。例如下面的式子:

甚至可以使用对数或者指数取形式化特征,如下:

2、非线形回归和过去的自己和2020好好道个别吧!

有一类模型,其回归参数不是线性的,也不能通过转换的方法将其变为线性的参数,这类模型称为非线性回归模型。非线性回归可以分为一元回归和多元回归。非线性回归中至少有一个自变量的指数不为 1。回归分析中,当研究的因果关系只涉及因变量和一个自变量时,叫作一元回归分析;当研究的因果关系涉及因变量和两个或两个以上自变量时,叫作多元回归分析。

例如下面的两个回归方程:

与线性回归模型不一样的是,这些非线性回归模型的特征因子对应的参数不止一个。

3、广义线性回归

有些非线性回归也可以用线性回归的方法来进行分析,这样的非线性回归叫作广义线性回归。典型的代表是 Logistic Regression。

这里不做过多介绍,下文会详细介绍。

逻辑回归介绍

逻辑回归与线性回归本质上是一样的,都是通过误差函数求解最优系数,在形式上只不过是在线性回归上增加了一个逻辑函数。与线性回归相比,逻辑回归(Logistic Regression,LR)更适用于因变量为二分变量的模型,Logistic 回归系数可用于估计模型中每个自变量的权重比。

Sigmoid函数

Sigmoid函数(海维赛德阶跃函数)在二分类的情况下输出的值为0和1,其数学表达式如下:

可以通过以下代码来展示 Sigmoid 函数的图像。

import math 
import matplotlib.pyplot as plt 
def sigmoid(x): 
    return 1 / (1 + math.exp(-x))

# python2 中 range 生成的是一个数组,python3 中生成的是一个迭代器,可以使用 list 进行转换
X = list(range(-10, 10)) 
Y = list(map(sigmoid, X)) 
fig = plt.figure(figsize=(4, 4)) 
ax = fig.add_subplot(111)

# 隐藏上边和右边
ax.spines['top'].set_color('none') 
ax.spines['right'].set_color('none')

# 移动另外两个轴
ax.yaxis.set_ticks_position('left') 
ax.spines['left'].set_position(('data', 0)) 
ax.xaxis.set_ticks_position('bottom') 
ax.spines['bottom'].set_position(('data', 0.5)) 
ax.plot(X, Y) 
plt.show()

Sigmoid 函数展示图

可以看出,Sigmoid 函数连续、光滑、严格单调,以(0,0.5)为中心对称,是一个非常良好的阈值函数。当 趋近负无穷时, 趋近于 0;当 趋近于正无穷时, 趋近于 1; 时,。当然,在 超出[-6,6]的范围后,函数值基本上没有变化,值非常接近,在应用中一般不考虑。

Sigmoid 函数的值域限制在(0,1)之间,[0,1]与概率值的范围是相对应的,这样 Sigmoid 函数就能与一个概率分布联系起来了。

Sigmoid 函数的导数是其本身的函数,即݂,计算过程非常方便,也非常节省时间。其推导过程如下:

<section role="presentation" data-formula="\begin{aligned} f" (x)="" &="(-1)" (1+e^{-x})^{-2}(0+(-1)e^{-x})="" \="" *="" \frac{e^{-x}}{(1+e^{-x})}="" \end{aligned}="" '="" data-formula-type="block-equation">

LR为什么使用Sigmoid函数

这里只讨论二分类的情况。首先LR的假设只有一个,就是两个类别的特征服从均值不等、方差相等的高斯分布,也就是:

为什么假设它们服从高斯分布?一方面是因为高斯分布比较容易理解;另一方面从信息论的角度看,当均值和方差已知时(尽管并不知道确切的均值和方差,但是根据概率论,当样本量足够大时,样本均值和方差以概率 1 趋向于均值和方差),高斯分布是熵最大的分布。为什么要熵最大?因为熵最大的分布可以平摊风险。想想二分查找中,为什么每次都选取中间点作为查找点?就是为了平摊风险。假设方差相等是为了后面处理起来方便,若不相等则无法消去项。

首先定义“风险”为:

式中,是把样本预测为 0 时的风险,是把样本预测为 1 时的风险,是样本为实际标签 却把它预测为 时所带来的风险。

在 LR 算法中,认为预测正确不会带来风险,因此 和 都为 0,此外,认为标签为 0 而预测为 1 和标签为 1 而预测为 0,两者所带来的风险是一样的,因此 和 相等。方便起见,记为 。

所以上面定义的“风险”可以化简为:

现在问题来了,对于某个样本,应该把它预测为 0 还是预测为 1 好?按照风险最小化的原则,应该选择风险最小的,也就是,当 时,预测为 0 的风险要小于预测为 1 的风险,即 ܲ 时,应该把样本预测为 0。即:比较两个条件概率,并把样本分配到概率最大的那个类中。

式中两边同时除以 可得:

对不等式左边的部分取对数(为什么取对数?因为之前提过,两个类别的特征服从均值不等、方差相等的高斯分布,取对数方便处理高斯分布里的指数),再利用贝叶斯公式进行展开,归一化常数忽略掉,将得到:

方便起见,假设 是一维的(当然也很容易推广到多维的情况),套入高斯分布公式,此外,由于 和 都是常数,第二项简记为常数 继续展开,将得到:

取:

两边取指数,并利用这个概率公式化简,可得到:

其推算过程为:

综上可以知道为什么 LR 算法要用 Sigmoid 函数了。

LR的算法原理

1、算法原理

机器学习模型实际上把决策函数限定在某一组条件下,这组限定条件就决定了模型的假设

空间。当然,还希望这组限定条件简单而合理。

逻辑回归模型所做的假设是:

这里的 就是 Sigmoid 函数,相应的决策函数为:

0.5 ' data-formula-type='block-equation'>

选择 0.5 作为阈值是一般的做法,实际应用时,特定的情况下可以选择不同的阈值。如果对正例的判别准确性要求高,可以使阈值大一些;如果对正例的召回要求高,则可以使阈值小一些。

在函数的数学形式确定之后,就要求解模型中的参数了。统计学中常用的一种数学方法是最大似然估计,即找到一组参数,使得在这组参数条件下数据的似然度(概率)更大。在逻辑回归算法中,似然函数可以表示为:

取对数,可以得到对数形式的似然函数:

同样这里也使用损失函数来衡量模型预测结果准确的程度,这里采用 损失函数,其在单条数据上的定义为:

如果取整个数据集上的平均 损失,可以得到:

在逻辑回归模型中,最大化似然函数和最小化 lg 损失函数实际上是等价的。对于该优化问题,存在多种求解方法,这里以梯度下降的情况为例说明。基本步骤如下:

  • (1)选择下降方向(梯度方向:);

  • (2)选择步长,更新参数 );

  • (3)重复以上两步直到满足终止条件。

其中损失函数的梯度计算方法为:

<section role="presentation" data-formula="\frac{\partial J}{\partial \theta} = -\frac{1}{n} \sum_{i=1} (y_i - y_i" )x_i="" +="" \lambda\theta="" '="" data-formula-type="block-equation">

沿梯度负方向选择一个较小的步长可以保证损失函数的值是减小的,另外,逻辑回归模型的损失函数是凸函数(加入正则项后是严格凸函数),可以保证找到的局部最优值是全局最优值。

2、正则化

当模型中参数过多时,容易产生过拟合,这时就要控制模型的复杂度,其中最常见的做法是在目标中加入正则项,通过惩罚过大的参数来防止过拟合。

常见的正则化方法包括 正则化和 正则化。其分别对应如下两个公式:

  • 正则化是指权值向量 中各个元素的绝对值之和,通常表示为。

  • 正则化是指权值向量 中各个元素的平方和然后再求平方根(可以看到 Ridge 回归

的 正则化项有平方符号),通常表示为。

mllib中的LRWithLBFGS

在Spark.mllib包中提供了两种LR分类模型,分别是:

  • mini-batch gradient descent(LogisticRegressionWithLBFGS)
  • L-BFGS(LogisticRegressionWithSGD)

但官方给出的建议是:推荐使用LBFGS,因为基于LBFGS的LR比基于SGD的能更快的收敛。其原话如下:

We implemented two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. We recommend L-BFGS over mini-batch gradient descent for faster convergence.

而且LRWithLBFGS不仅支持二分类还支持多分类,但LRWithSGD只支持二分类。所以后续只介绍下Spark mllib中的LogisticRegressionWithLBFGS相关操作。

设置变量和创建spark对象

val file = 'data/sample_libsvm_data.txt'val model_path = 'model/lr/'val model_param = 'numInterations:5,regParam:0.1,updater:SquaredL2Updater,gradient:LogisticGradient'

val spark = SparkSession.builder() .master('local[5]')  .appName('LogisticRegression_Model_Train') .getOrCreate()Logger.getRootLogger.setLevel(Level.WARN)

拆分数据集

// 记载数据集 并拆分成训练集和测试集
val data = MLUtils.loadLibSVMFile(spark.sparkContext,file).randomSplit(Array(0.7,0.3))
val (train, test) = (data(0), data(1))

LRWithLBFGS模型设置参数

// 定义分类的数目,默认为2,是logisticregression的参数private var numClass: Int = 2// 定义是否添加截距,默认值为false,是logisticregression的参数private var isAddIntercept: Option[Boolean] = None// 定义是否在训练模型前进行验证,是logisticregression的参数private var isValidateData: Option[Boolean] = None

// 定义迭代的次数,默认值是100,LBFGS的参数private var numInterations: Option[Int] = None// 定义正则化系数值,默认值是0.0,LBFGS的参数private var regParam: Option[Double] = None// 定义正则化参数,支持:L1Updater[L1]、SquaredL2Updater[L2]、SimpleUpdater[没有正则项],LBFGS的参数private var updater: Option[String] = None// 定义计算梯度的方式,支持:LogisticGradient、LeastSquaresGradient、HingeGradient ,LBFGS的参数private var gradient: Option[String] = None// 人工定义的收敛阈值private var threshold:Option[Double]=None// 定义模型收敛阈值,默认为 10^-6private var convergenceTol: Double= 1.0e-6

创建模型

def createLRModel(model_param: String): LogisticRegressionWithLBFGS={
 // 设置模型参数
 val optimizer = new LROptimizer()
 optimizer.parseString(model_param)
 println(s'模型训练参数为:${optimizer.toString}')

// 创建模型并指定相关参数
 val LRModel = new LogisticRegressionWithLBFGS()
 // 设置分类数目
 LRModel.setNumClasses(optimizer.getNumClass)
 // 设置是否添加截距
 if(optimizer.getIsAddIntercept.nonEmpty) {LRModel.setIntercept(optimizer.getIsAddIntercept.get)}
 // 设置是否进行验证模型
 if(optimizer.getIsValidateData.nonEmpty){LRModel.setValidateData(optimizer.getIsValidateData.get)}
 // 设置迭代次数
 if(optimizer.getNumInterations.nonEmpty){LRModel.optimizer.setNumIterations((optimizer.getNumInterations.get))}
 // 设置正则项参数
 if(optimizer.getRegParam.nonEmpty) { LRModel.optimizer.setRegParam(optimizer.getRegParam.get) }
 // 设置正则化参数
 if(optimizer.getUpdater.nonEmpty){
  optimizer.getUpdater match {
   case Some('L1Updater') => LRModel.optimizer.setUpdater( new L1Updater())
   case Some('SquaredL2Updater') => LRModel.optimizer.setUpdater(new SquaredL2Updater())
   case Some('SimpleUpdater') => LRModel.optimizer.setUpdater(new SimpleUpdater())
   case _ => LRModel.optimizer.setUpdater(new SquaredL2Updater())
  }
 }
 // 设置梯度计算方式
 if(optimizer.getGradient.nonEmpty){
  optimizer.getGradient match {
   case Some('LogisticGradient') => LRModel.optimizer.setGradient(new LogisticGradient())
   case Some('LeastSquaresGradient') => LRModel.optimizer.setGradient(new LeastSquaresGradient())
   case Some('HingeGradient') => LRModel.optimizer.setGradient(new HingeGradient())
   case _ => LRModel.optimizer.setGradient(new LogisticGradient())
  }
 }
 // 设置收敛阈值
 if(optimizer.getThreshold.nonEmpty){ LRModel.optimizer.setConvergenceTol(optimizer.getThreshold.get)}
 else {LRModel.optimizer.setConvergenceTol(optimizer.getConvergenceTol)}

LRModel
}

模型效果评估

 def evaluteResult(result: RDD[(Double,Double,Double)]) :Unit = {  // MSE  val testMSE = result.map{ case(real, pre, _) => math.pow((real - pre), 2)}.mean()  println(s'Test Mean Squared Error = $testMSE')  // AUC  val metrics = new BinaryClassificationMetrics(result.map(x => (x._2,x._1)).sortByKey(ascending = true),numBins = 2)  println(s'0-1 label AUC is = ${metrics.areaUnderROC}')  val metrics1 = new BinaryClassificationMetrics(result.map(x => (x._3,x._1)).sortByKey(ascending = true),numBins = 2)  println(s'score-label AUC is = ${metrics1.areaUnderROC}')  // 错误率  val error = result.filter(x => x._1!=x._2).count().toDouble / result.count()  println(s'error is = $error')  // 准确率  val accuracy = result.filter(x => x._1==x._2).count().toDouble / result.count()  println(s'accuracy is = $accuracy') }

保存模型

 def saveModel(model: LogisticRegressionModel, model_path: String): Unit = {
  // 保存模型文件 obj
  val out_obj = new ObjectOutputStream(new FileOutputStream(model_path+'model.obj'))
  out_obj.writeObject(model)

// 保存模型信息
  val model_info=new BufferedWriter(new FileWriter(model_path+'model_info.txt'))
  model_info.write(model.toString())
  model_info.flush()
  model_info.close()

// 保存模型权重
  val model_weights=new BufferedWriter(new FileWriter(model_path+'model_weights.txt'))
  model_weights.write(model.weights.toString)
  model_weights.flush()
  model_weights.close()

println(s'模型信息写入文件完成,路径为:$model_path')
 }

加载模型

 def loadModel(model_path: String): Option[LogisticRegressionModel] = {  try{   val in = new ObjectInputStream( new FileInputStream(model_path) )   val model = Option( in.readObject().asInstanceOf[LogisticRegressionModel] )   in.close()   println('Model Load Success')   model  }  catch {   case ex: ClassNotFoundException => {    println(ex.printStackTrace())    None   }   case ex: IOException => {    println(ex.printStackTrace())    println(ex)    None   }   case _: Throwable => throw new Exception  } }

使用加载的模型进行分值计算

 // 加载obj文件进行预测
 val model_new = loadModel(s'$model_path/model.obj')
 // 使用加载的模型进行样例预测
 val result_new = test.map(line =>{
  val pre_label = model_new.get.predict(line.features)
  // blas.ddot(x.length, x,1,y,1) (向量x的长度,向量x,向量x的索引递增间隔,向量y,向量y的索引递增间隔)
  val pre_score = blas.ddot(model.numFeatures, line.features.toArray, 1, model.weights.toArray, 1)
  val score = Math.pow(1+Math.pow(Math.E, -2 * pre_score), -1)
  (line.label, pre_label,score)
 } )
 result_new.take(2).foreach(println)

ml中的二分类LR

ml包中的LR既可以用来做二分类,也可以用来做多分类。

  • 二分类对应:Binomial logistic regression
  • 多分类对应:multinomial logistic regression

其中二分类可以通过Binomial logistic regression 和 multinomial logistic regression实现。

基于Binomial logistic regression的LR实现:

def BinaryModel(train: Dataset[Row], model_path: String, spark: SparkSession) = { // 创建模型 val LRModel = new LogisticRegression()  .setMaxIter(20)  .setRegParam(0.3)  .setElasticNetParam(0.8) // 训练评估模型 val model = LRModel.fit(train) evalute(model, train, spark)}

def evalute(model: LogisticRegressionModel, train: Dataset[Row], spark: SparkSession):Unit = {  // 打印模型参数  println(s'模型参数信息如下:\n ${model.parent.explainParams()} \n')  println(s'Coefficients(系数): ${model.coefficients}')  println(s'Intercept(截距): ${model.intercept}')  // 查看训练集的预测结果 rawPrediction:row 计算的分值,probability:经过sigmoid转换后的概率  val result = model.evaluate(train)  result.predictions.show(10)  // 将 label,0 值概率,predict label提取出来  result.predictions.select('label','probability','prediction').rdd   .map(row => (row.getDouble(0),row.get(1).asInstanceOf[DenseVector].toArray(0),row.getDouble(2)))   .take(10).foreach(println)  // 模型评估  val trainSummary = model.summary  val objectiveHistory = trainSummary.objectiveHistory  println('objectiveHistoryLoss:')  objectiveHistory.foreach(loss => println(loss))

  val binarySummary = trainSummary.asInstanceOf[BinaryLogisticRegressionSummary]

  val roc = binarySummary.roc  roc.show()  println(s'areaUnderROC: ${binarySummary.areaUnderROC}')

  // Set the model threshold to maximize F-Measure  val fMeasure = binarySummary.fMeasureByThreshold  fMeasure.show(10)  val maxFMeasure = fMeasure.select(max('F-Measure')).head().getDouble(0)  import spark.implicits ._  val bestThreshold = fMeasure.where($'F-Measure'===maxFMeasure).select('threshold').head().getDouble(0)  model.setThreshold(bestThreshold) }

基于Multimial logistic regression的LR实现:

def BinaryModelWithMulti(train: Dataset[Row], model_path: String, spark: SparkSession) = {
 // 创建模型
 val LRModel = new LogisticRegression()
     .setMaxIter(10)
     .setRegParam(0.3)
     .setElasticNetParam(0.8)
     .setFamily('multinomial')
 // 训练模型
 val model = LRModel.fit(train)
 // 打印模型参数
 println(s'模型参数信息如下:\n ${model.parent.explainParams()} \n')
 println(s'Coefficients(系数): ${model.coefficientMatrix}')
 println(s'Intercept(截距): ${model.interceptVector}')
}

ml中的多分类LR

某条样本属于类别k的概率计算为:

其中K表示类别, 表示特征个数

权重最小化使用的是最大似然函数,其更新公式如下:

使用的数据集形式为:

1 1:-0.222222 2:0.5 3:-0.762712 4:-0.8333331 1:-0.555556 2:0.25 3:-0.864407 4:-0.9166671 1:-0.722222 2:-0.166667 3:-0.864407 4:-0.8333331 1:-0.722222 2:0.166667 3:-0.694915 4:-0.9166670 1:0.166667 2:-0.416667 3:0.457627 4:0.51 1:-0.833333 3:-0.864407 4:-0.9166672 1:-1.32455e-07 2:-0.166667 3:0.220339 4:0.08333332 1:-1.32455e-07 2:-0.333333 3:0.0169491 4:-4.03573e-08

多分类LR模型实现为:

def MultiModel(file_multi: String, spark: SparkSession, model_path: String): Unit = {
 val training = spark.read.format('libsvm').load(file_multi)
 val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
 val lrModel = lr.fit(training)

// Print the coefficients and intercept for multinomial logistic regression
 println(s'Coefficients: \n${lrModel.coefficientMatrix}')
 println(s'Intercepts: ${lrModel.interceptVector}')
}

关注我们不错过每一篇精彩
(0)

相关推荐