Spark推荐实战系列之LR的两种实现方式和多分类LR实战介绍
本文主要包含以下内容:
回归分析
什么是回归分析
回归分析算法分类
逻辑回归介绍
Sigmoid函数
LR为什么使用Sigmoid函数
LR的算法原理
mllib中的LRWithLBFGS
ml中的二分类LR
ml中的多分类LR
逻辑回归(Logistic Regression,LR)是较早应用在推荐排序上的,其属于线性模型,模型简单,可以引入海量离散特征,好处是模型可以考虑更加细节或者说针对具体个体的因素。如果想要引入非线性因素需要做特征交叉,这样很容易产生百亿特征,在很早之前ctr就主要靠堆人力搞特征工程工作来持续优化效果。
虽然目前在工业界单纯使用LR做排序的场景或业务并不多,但是对于初学者,一些中小企业在资源和人力不那么充足的情况下,LR扔不失为一个不错的选择。这样可以遵循先上线再迭代的策略完善排序模型。
在学习LR之前先了解一下什么是回归分析和其分类。
回归分析
回归分析算法(Regression Analysis Algorithm)是机器学习算法中最常见的一类机器学习算法。
什么是回归分析
回归分析就是利用样本(已知数据),产生拟合方程,从而(对未知数据)进行预测。例如有一组随机变量 和另外一组随机变量 ,那么研究变量 与 之间关系的统计学方法就叫作回归分析。因为这里 和 是单一对应的,所以这里是一元线性回归。
回归分析算法分类
回归分析算法分为线性回归算法和非线性回归算法。
1、线性回归
线性回归可以分为一元线性回归和多元线性回归。当然线性回归中自变量的指数都是 1,这里的线性并非真的是指用一条线将数据连起来,也可以用一个二维平面、三维曲面等。
一元线性回归:只有一个自变量的回归。例如房子面积(Area)和房子总价(Money)的关系,随着面积(Area)的增大,房屋价格也是不断增加。这里的自变量只有面积,所以是一元线性回归。
多元线性回归:自变量大于或等于两个的回归。例如房子面积(Area)、楼层(floor)和房屋价格(Money)的关系,这里自变量有两个,所以是二元线性回归。
典型的线性回归方法如下:
在统计意义上,如果一个回归等式是线性的,那么它相对于参数就必须是线性的。如果相对于参数是线性的,那么即使相对于样本变量的特征是二次方或多次方的,这个回归模型也是线性的。例如下面的式子:
甚至可以使用对数或者指数取形式化特征,如下:
2、非线形回归和过去的自己和2020好好道个别吧!
有一类模型,其回归参数不是线性的,也不能通过转换的方法将其变为线性的参数,这类模型称为非线性回归模型。非线性回归可以分为一元回归和多元回归。非线性回归中至少有一个自变量的指数不为 1。回归分析中,当研究的因果关系只涉及因变量和一个自变量时,叫作一元回归分析;当研究的因果关系涉及因变量和两个或两个以上自变量时,叫作多元回归分析。
例如下面的两个回归方程:
与线性回归模型不一样的是,这些非线性回归模型的特征因子对应的参数不止一个。
3、广义线性回归
有些非线性回归也可以用线性回归的方法来进行分析,这样的非线性回归叫作广义线性回归。典型的代表是 Logistic Regression。
这里不做过多介绍,下文会详细介绍。
逻辑回归介绍
逻辑回归与线性回归本质上是一样的,都是通过误差函数求解最优系数,在形式上只不过是在线性回归上增加了一个逻辑函数。与线性回归相比,逻辑回归(Logistic Regression,LR)更适用于因变量为二分变量的模型,Logistic 回归系数可用于估计模型中每个自变量的权重比。
Sigmoid函数
Sigmoid函数(海维赛德阶跃函数)在二分类的情况下输出的值为0和1,其数学表达式如下:
可以通过以下代码来展示 Sigmoid 函数的图像。
import math
import matplotlib.pyplot as plt
def sigmoid(x):
return 1 / (1 + math.exp(-x))
# python2 中 range 生成的是一个数组,python3 中生成的是一个迭代器,可以使用 list 进行转换
X = list(range(-10, 10))
Y = list(map(sigmoid, X))
fig = plt.figure(figsize=(4, 4))
ax = fig.add_subplot(111)
# 隐藏上边和右边
ax.spines['top'].set_color('none')
ax.spines['right'].set_color('none')
# 移动另外两个轴
ax.yaxis.set_ticks_position('left')
ax.spines['left'].set_position(('data', 0))
ax.xaxis.set_ticks_position('bottom')
ax.spines['bottom'].set_position(('data', 0.5))
ax.plot(X, Y)
plt.show()
可以看出,Sigmoid 函数连续、光滑、严格单调,以(0,0.5)为中心对称,是一个非常良好的阈值函数。当 趋近负无穷时, 趋近于 0;当 趋近于正无穷时, 趋近于 1; 时,。当然,在 超出[-6,6]的范围后,函数值基本上没有变化,值非常接近,在应用中一般不考虑。
Sigmoid 函数的值域限制在(0,1)之间,[0,1]与概率值的范围是相对应的,这样 Sigmoid 函数就能与一个概率分布联系起来了。
Sigmoid 函数的导数是其本身的函数,即݂,计算过程非常方便,也非常节省时间。其推导过程如下:
<section role="presentation" data-formula="\begin{aligned} f" (x)="" &="(-1)" (1+e^{-x})^{-2}(0+(-1)e^{-x})="" \="" *="" \frac{e^{-x}}{(1+e^{-x})}="" \end{aligned}="" '="" data-formula-type="block-equation">
LR为什么使用Sigmoid函数
这里只讨论二分类的情况。首先LR的假设只有一个,就是两个类别的特征服从均值不等、方差相等的高斯分布,也就是:
为什么假设它们服从高斯分布?一方面是因为高斯分布比较容易理解;另一方面从信息论的角度看,当均值和方差已知时(尽管并不知道确切的均值和方差,但是根据概率论,当样本量足够大时,样本均值和方差以概率 1 趋向于均值和方差),高斯分布是熵最大的分布。为什么要熵最大?因为熵最大的分布可以平摊风险。想想二分查找中,为什么每次都选取中间点作为查找点?就是为了平摊风险。假设方差相等是为了后面处理起来方便,若不相等则无法消去项。
首先定义“风险”为:
式中,是把样本预测为 0 时的风险,是把样本预测为 1 时的风险,是样本为实际标签 却把它预测为 时所带来的风险。
在 LR 算法中,认为预测正确不会带来风险,因此 和 都为 0,此外,认为标签为 0 而预测为 1 和标签为 1 而预测为 0,两者所带来的风险是一样的,因此 和 相等。方便起见,记为 。
所以上面定义的“风险”可以化简为:
现在问题来了,对于某个样本,应该把它预测为 0 还是预测为 1 好?按照风险最小化的原则,应该选择风险最小的,也就是,当 时,预测为 0 的风险要小于预测为 1 的风险,即 ܲ 时,应该把样本预测为 0。即:比较两个条件概率,并把样本分配到概率最大的那个类中。
式中两边同时除以 可得:
对不等式左边的部分取对数(为什么取对数?因为之前提过,两个类别的特征服从均值不等、方差相等的高斯分布,取对数方便处理高斯分布里的指数),再利用贝叶斯公式进行展开,归一化常数忽略掉,将得到:
方便起见,假设 是一维的(当然也很容易推广到多维的情况),套入高斯分布公式,此外,由于 和 都是常数,第二项简记为常数 继续展开,将得到:
取:
两边取指数,并利用这个概率公式化简,可得到:
其推算过程为:
综上可以知道为什么 LR 算法要用 Sigmoid 函数了。
LR的算法原理
1、算法原理
机器学习模型实际上把决策函数限定在某一组条件下,这组限定条件就决定了模型的假设
空间。当然,还希望这组限定条件简单而合理。
逻辑回归模型所做的假设是:
这里的 就是 Sigmoid 函数,相应的决策函数为:
选择 0.5 作为阈值是一般的做法,实际应用时,特定的情况下可以选择不同的阈值。如果对正例的判别准确性要求高,可以使阈值大一些;如果对正例的召回要求高,则可以使阈值小一些。
在函数的数学形式确定之后,就要求解模型中的参数了。统计学中常用的一种数学方法是最大似然估计,即找到一组参数,使得在这组参数条件下数据的似然度(概率)更大。在逻辑回归算法中,似然函数可以表示为:
取对数,可以得到对数形式的似然函数:
同样这里也使用损失函数来衡量模型预测结果准确的程度,这里采用 损失函数,其在单条数据上的定义为:
如果取整个数据集上的平均 损失,可以得到:
在逻辑回归模型中,最大化似然函数和最小化 lg 损失函数实际上是等价的。对于该优化问题,存在多种求解方法,这里以梯度下降的情况为例说明。基本步骤如下:
(1)选择下降方向(梯度方向:);
(2)选择步长,更新参数 );
(3)重复以上两步直到满足终止条件。
其中损失函数的梯度计算方法为:
<section role="presentation" data-formula="\frac{\partial J}{\partial \theta} = -\frac{1}{n} \sum_{i=1} (y_i - y_i" )x_i="" +="" \lambda\theta="" '="" data-formula-type="block-equation">
沿梯度负方向选择一个较小的步长可以保证损失函数的值是减小的,另外,逻辑回归模型的损失函数是凸函数(加入正则项后是严格凸函数),可以保证找到的局部最优值是全局最优值。
2、正则化
当模型中参数过多时,容易产生过拟合,这时就要控制模型的复杂度,其中最常见的做法是在目标中加入正则项,通过惩罚过大的参数来防止过拟合。
常见的正则化方法包括 正则化和 正则化。其分别对应如下两个公式:
正则化是指权值向量 中各个元素的绝对值之和,通常表示为。
正则化是指权值向量 中各个元素的平方和然后再求平方根(可以看到 Ridge 回归
的 正则化项有平方符号),通常表示为。
mllib中的LRWithLBFGS
在Spark.mllib包中提供了两种LR分类模型,分别是:
mini-batch gradient descent(LogisticRegressionWithLBFGS) L-BFGS(LogisticRegressionWithSGD)
但官方给出的建议是:推荐使用LBFGS,因为基于LBFGS的LR比基于SGD的能更快的收敛。其原话如下:
We implemented two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. We recommend L-BFGS over mini-batch gradient descent for faster convergence.
而且LRWithLBFGS不仅支持二分类还支持多分类,但LRWithSGD只支持二分类。所以后续只介绍下Spark mllib中的LogisticRegressionWithLBFGS相关操作。
设置变量和创建spark对象
val file = 'data/sample_libsvm_data.txt'val model_path = 'model/lr/'val model_param = 'numInterations:5,regParam:0.1,updater:SquaredL2Updater,gradient:LogisticGradient'
val spark = SparkSession.builder() .master('local[5]') .appName('LogisticRegression_Model_Train') .getOrCreate()Logger.getRootLogger.setLevel(Level.WARN)
拆分数据集
// 记载数据集 并拆分成训练集和测试集
val data = MLUtils.loadLibSVMFile(spark.sparkContext,file).randomSplit(Array(0.7,0.3))
val (train, test) = (data(0), data(1))
LRWithLBFGS模型设置参数
// 定义分类的数目,默认为2,是logisticregression的参数private var numClass: Int = 2// 定义是否添加截距,默认值为false,是logisticregression的参数private var isAddIntercept: Option[Boolean] = None// 定义是否在训练模型前进行验证,是logisticregression的参数private var isValidateData: Option[Boolean] = None
// 定义迭代的次数,默认值是100,LBFGS的参数private var numInterations: Option[Int] = None// 定义正则化系数值,默认值是0.0,LBFGS的参数private var regParam: Option[Double] = None// 定义正则化参数,支持:L1Updater[L1]、SquaredL2Updater[L2]、SimpleUpdater[没有正则项],LBFGS的参数private var updater: Option[String] = None// 定义计算梯度的方式,支持:LogisticGradient、LeastSquaresGradient、HingeGradient ,LBFGS的参数private var gradient: Option[String] = None// 人工定义的收敛阈值private var threshold:Option[Double]=None// 定义模型收敛阈值,默认为 10^-6private var convergenceTol: Double= 1.0e-6
创建模型
def createLRModel(model_param: String): LogisticRegressionWithLBFGS={
// 设置模型参数
val optimizer = new LROptimizer()
optimizer.parseString(model_param)
println(s'模型训练参数为:${optimizer.toString}')
// 创建模型并指定相关参数
val LRModel = new LogisticRegressionWithLBFGS()
// 设置分类数目
LRModel.setNumClasses(optimizer.getNumClass)
// 设置是否添加截距
if(optimizer.getIsAddIntercept.nonEmpty) {LRModel.setIntercept(optimizer.getIsAddIntercept.get)}
// 设置是否进行验证模型
if(optimizer.getIsValidateData.nonEmpty){LRModel.setValidateData(optimizer.getIsValidateData.get)}
// 设置迭代次数
if(optimizer.getNumInterations.nonEmpty){LRModel.optimizer.setNumIterations((optimizer.getNumInterations.get))}
// 设置正则项参数
if(optimizer.getRegParam.nonEmpty) { LRModel.optimizer.setRegParam(optimizer.getRegParam.get) }
// 设置正则化参数
if(optimizer.getUpdater.nonEmpty){
optimizer.getUpdater match {
case Some('L1Updater') => LRModel.optimizer.setUpdater( new L1Updater())
case Some('SquaredL2Updater') => LRModel.optimizer.setUpdater(new SquaredL2Updater())
case Some('SimpleUpdater') => LRModel.optimizer.setUpdater(new SimpleUpdater())
case _ => LRModel.optimizer.setUpdater(new SquaredL2Updater())
}
}
// 设置梯度计算方式
if(optimizer.getGradient.nonEmpty){
optimizer.getGradient match {
case Some('LogisticGradient') => LRModel.optimizer.setGradient(new LogisticGradient())
case Some('LeastSquaresGradient') => LRModel.optimizer.setGradient(new LeastSquaresGradient())
case Some('HingeGradient') => LRModel.optimizer.setGradient(new HingeGradient())
case _ => LRModel.optimizer.setGradient(new LogisticGradient())
}
}
// 设置收敛阈值
if(optimizer.getThreshold.nonEmpty){ LRModel.optimizer.setConvergenceTol(optimizer.getThreshold.get)}
else {LRModel.optimizer.setConvergenceTol(optimizer.getConvergenceTol)}
LRModel
}
模型效果评估
def evaluteResult(result: RDD[(Double,Double,Double)]) :Unit = { // MSE val testMSE = result.map{ case(real, pre, _) => math.pow((real - pre), 2)}.mean() println(s'Test Mean Squared Error = $testMSE') // AUC val metrics = new BinaryClassificationMetrics(result.map(x => (x._2,x._1)).sortByKey(ascending = true),numBins = 2) println(s'0-1 label AUC is = ${metrics.areaUnderROC}') val metrics1 = new BinaryClassificationMetrics(result.map(x => (x._3,x._1)).sortByKey(ascending = true),numBins = 2) println(s'score-label AUC is = ${metrics1.areaUnderROC}') // 错误率 val error = result.filter(x => x._1!=x._2).count().toDouble / result.count() println(s'error is = $error') // 准确率 val accuracy = result.filter(x => x._1==x._2).count().toDouble / result.count() println(s'accuracy is = $accuracy') }
保存模型
def saveModel(model: LogisticRegressionModel, model_path: String): Unit = {
// 保存模型文件 obj
val out_obj = new ObjectOutputStream(new FileOutputStream(model_path+'model.obj'))
out_obj.writeObject(model)
// 保存模型信息
val model_info=new BufferedWriter(new FileWriter(model_path+'model_info.txt'))
model_info.write(model.toString())
model_info.flush()
model_info.close()
// 保存模型权重
val model_weights=new BufferedWriter(new FileWriter(model_path+'model_weights.txt'))
model_weights.write(model.weights.toString)
model_weights.flush()
model_weights.close()
println(s'模型信息写入文件完成,路径为:$model_path')
}
加载模型
def loadModel(model_path: String): Option[LogisticRegressionModel] = { try{ val in = new ObjectInputStream( new FileInputStream(model_path) ) val model = Option( in.readObject().asInstanceOf[LogisticRegressionModel] ) in.close() println('Model Load Success') model } catch { case ex: ClassNotFoundException => { println(ex.printStackTrace()) None } case ex: IOException => { println(ex.printStackTrace()) println(ex) None } case _: Throwable => throw new Exception } }
使用加载的模型进行分值计算
// 加载obj文件进行预测
val model_new = loadModel(s'$model_path/model.obj')
// 使用加载的模型进行样例预测
val result_new = test.map(line =>{
val pre_label = model_new.get.predict(line.features)
// blas.ddot(x.length, x,1,y,1) (向量x的长度,向量x,向量x的索引递增间隔,向量y,向量y的索引递增间隔)
val pre_score = blas.ddot(model.numFeatures, line.features.toArray, 1, model.weights.toArray, 1)
val score = Math.pow(1+Math.pow(Math.E, -2 * pre_score), -1)
(line.label, pre_label,score)
} )
result_new.take(2).foreach(println)
ml中的二分类LR
ml包中的LR既可以用来做二分类,也可以用来做多分类。
二分类对应:Binomial logistic regression 多分类对应:multinomial logistic regression
其中二分类可以通过Binomial logistic regression 和 multinomial logistic regression实现。
基于Binomial logistic regression的LR实现:
def BinaryModel(train: Dataset[Row], model_path: String, spark: SparkSession) = { // 创建模型 val LRModel = new LogisticRegression() .setMaxIter(20) .setRegParam(0.3) .setElasticNetParam(0.8) // 训练评估模型 val model = LRModel.fit(train) evalute(model, train, spark)}
def evalute(model: LogisticRegressionModel, train: Dataset[Row], spark: SparkSession):Unit = { // 打印模型参数 println(s'模型参数信息如下:\n ${model.parent.explainParams()} \n') println(s'Coefficients(系数): ${model.coefficients}') println(s'Intercept(截距): ${model.intercept}') // 查看训练集的预测结果 rawPrediction:row 计算的分值,probability:经过sigmoid转换后的概率 val result = model.evaluate(train) result.predictions.show(10) // 将 label,0 值概率,predict label提取出来 result.predictions.select('label','probability','prediction').rdd .map(row => (row.getDouble(0),row.get(1).asInstanceOf[DenseVector].toArray(0),row.getDouble(2))) .take(10).foreach(println) // 模型评估 val trainSummary = model.summary val objectiveHistory = trainSummary.objectiveHistory println('objectiveHistoryLoss:') objectiveHistory.foreach(loss => println(loss))
val binarySummary = trainSummary.asInstanceOf[BinaryLogisticRegressionSummary]
val roc = binarySummary.roc roc.show() println(s'areaUnderROC: ${binarySummary.areaUnderROC}')
// Set the model threshold to maximize F-Measure val fMeasure = binarySummary.fMeasureByThreshold fMeasure.show(10) val maxFMeasure = fMeasure.select(max('F-Measure')).head().getDouble(0) import spark.implicits ._ val bestThreshold = fMeasure.where($'F-Measure'===maxFMeasure).select('threshold').head().getDouble(0) model.setThreshold(bestThreshold) }
基于Multimial logistic regression的LR实现:
def BinaryModelWithMulti(train: Dataset[Row], model_path: String, spark: SparkSession) = {
// 创建模型
val LRModel = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setFamily('multinomial')
// 训练模型
val model = LRModel.fit(train)
// 打印模型参数
println(s'模型参数信息如下:\n ${model.parent.explainParams()} \n')
println(s'Coefficients(系数): ${model.coefficientMatrix}')
println(s'Intercept(截距): ${model.interceptVector}')
}
ml中的多分类LR
某条样本属于类别k的概率计算为:
其中K表示类别, 表示特征个数
权重最小化使用的是最大似然函数,其更新公式如下:
使用的数据集形式为:
1 1:-0.222222 2:0.5 3:-0.762712 4:-0.8333331 1:-0.555556 2:0.25 3:-0.864407 4:-0.9166671 1:-0.722222 2:-0.166667 3:-0.864407 4:-0.8333331 1:-0.722222 2:0.166667 3:-0.694915 4:-0.9166670 1:0.166667 2:-0.416667 3:0.457627 4:0.51 1:-0.833333 3:-0.864407 4:-0.9166672 1:-1.32455e-07 2:-0.166667 3:0.220339 4:0.08333332 1:-1.32455e-07 2:-0.333333 3:0.0169491 4:-4.03573e-08
多分类LR模型实现为:
def MultiModel(file_multi: String, spark: SparkSession, model_path: String): Unit = {
val training = spark.read.format('libsvm').load(file_multi)
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// Fit the model
val lrModel = lr.fit(training)
// Print the coefficients and intercept for multinomial logistic regression
println(s'Coefficients: \n${lrModel.coefficientMatrix}')
println(s'Intercepts: ${lrModel.interceptVector}')
}