python模拟登陆之下载

好长时间没有更新博客了,哈哈。

今天公司给了这么一个需求,现在我们需要去淘宝获取上一天的订单号,然后再根据订单号去另一个接口去获取订单详情,然后再给我展示到web!

中间涉及到的技术点有:

  • 模拟登陆
  • 模拟下载
  • 解析exal文件数据流
  • 读取exal文件,拿出订单号
  • 还有最后一点请求接口

下面就给大家挨个说一下,刚拿到需求其实还是很模糊的,因为一个都没做过,等静下心来去理解的时候,发现并没有那么难,反而很简单

模拟登陆

一、分析页面请求头

本次登陆地址是https://huoche.alitrip.com/hello.htm

1、先登陆了一遍查看了一下请求头,发现就携带了三个东西,隐藏token,用户名,密码

一看一目了然,就一个后台页面,可想而知相对来说还是很简单,哈哈,下一步我只需要封装一下cookie,然后带上tocken,username,passwd去登陆咯

给大家说下,python的requests模块可以忽略cookie,自己创建一个session对象,他自己去给咱们匹配cookie,不用去挨个试cookie,这样就节省了好多代码和时间

2、代码如下

class TbTomas(object): def __init__(self): # 配置初始化 self.session_obj = requests.session() def download_file(self,thomas_username,thomas_password,): hello_url = 'https://huoche.alitrip.com/hello.htm' # 获取原文 hello_response = self.session_obj.get(hello_url) # 正则匹配原文 h_u_s = re_search('<input type='hidden' id='h_u_s' name='h_u_s' value='(.*?)'>', hello_response.text) h_u_s = base64.b64encode(h_u_s) headers = { 'Accept': 'text/html, application/xhtml+xml, image/jxr, */*', 'Referer': 'https://huoche.alitrip.com/hello.htm', 'Accept-Language': 'zh-CN', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586', 'Content-Type': 'application/x-www-form-urlencoded', 'Accept-Encoding': 'gzip, deflate', 'Host': 'huoche.alitrip.com', 'Content-Length': '73', 'Connection': 'Keep-Alive', 'Cache-Control': 'no-cache' } post_data = { 'h_u_s': base64.b64encode(h_u_s), 'h_u_n': thomas_username, 'h_u_p': base64.b64encode(thomas_password) } index_url = 'https://huoche.alitrip.com/index.htm' index_response = self.session_obj.post(index_url, headers=headers, data=post_data)

最后一提交post请求,就可以判断有没有登录成功了,是不是很简单,哈哈!

数据下载

下载也是和登录是一样的道理,下载的时候肯定也是像网页发一个post请求,然后就回去下载exal文件咯,python有这么一个模块xlrd,可以去操作exal文件,非常方便

1、原文是让我们输入时间看,下载那一天的数据,领导给的任务是下载前一天的,所以上一天时间要写几行代码来实现

代码如下:

today = datetime.datetime.now()yesterday = today + datetime.timedelta(days=-1)trade_date = yesterday.strftime('%Y-%m-%d')

2、查看下载文件请求的url,以及提交的数据,一张图一切都明白了

从图中可以看到,该文发送的url,请求方式,请求头,和返回的数据

3、模拟请求下载,只需用提交一下日期就OK搞定,文件下载完毕,接下开要读文件拿自己想要的东西啦

post_data = { 'orderExportDate': trade_date } sheet_content = '' for _ in xrange(3): try: # 得到exal文件流 download_response = self.session_obj.post(download_url, data=post_data) # 打开exal文件 xls_content = xlrd.open_workbook(file_contents=download_response.content) sheet_content = xls_content.sheets()[0] break except Exception as e: continue

4、这个就众所周知,和读取文件一样,for循环一行一行读取,然后把订单号挨个添加给一个列表啥啦乱七八糟的

order_item = []        for line_num in range(sheet_content.nrows):            line_item = sheet_content.row_values(line_num)            if line_item[2]:                order_item.append(line_item[2], )  # 订单号 order_no        # 获取到所有订单号        order_item = order_item[1:]

拿到订单号要去获取订单详情了,但是领导给我说这个已经有同事写好代码了,只需要调用那个接口就好,所以别人的代码我就不往上面展示了,原理很简单

requests模块,请求url,get传入订单号,发送请求,就可以返回数据咯,web页面展示,那个需求,每个公司都不一样,存入数据库,自己取自己想要的吧。

本文就到这里吧,学到一点东西的请点赞,哈哈

最后附带源码,用户名和密码就不告诉大家啦,啊哈哈

#!/usr/bin/python# coding:utf-8import sysimport osimport djangoreload(sys)sys.setdefaultencoding('utf8')sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 把manage.py所在目录添加到系统目录os.environ['DJANGO_SETTINGS_MODULE'] = 'business.settings' # 设置setting文件django.setup() # 初始化Django环境import requestsimport reimport loggingimport base64import xlrdimport datetimeimport timeimport MySQLdbimport threadpoolfrom business import settingsfrom train.depends.platform import Platformfrom train.models import TbTomasOrder,TbTomasEpay,TtTicketThomas,TbTomasLinkmanfrom train import utilsfrom train.status import OrderStatusfrom django.core.mail import EmailMultiAlternativesfrom train.busi import insert_order,insert_ticket,insert_epay,insert_linkmanlogger = logging.getLogger('django')class TbTomas(object): succ_number = 0 fail_number = 0 fail_order = [] def __init__(self,thread_num = 3): # 配置初始化 self.session_obj = requests.session() self.fail_order = [] self.succ_number = 0 self.fail_number = 0 self.thread_num = thread_num self.start_date = '' self.end_date = '' self.trade_date = utils.now() def login_thomas(self,thomas_username,thomas_password): hello_url = 'https://huoche.alitrip.com/hello.htm' hello_response = self.session_obj.get(hello_url) h_u_s = re_search('<input type='hidden' id='h_u_s' name='h_u_s' value='(.*?)'>', hello_response.text) h_u_s = base64.b64encode(h_u_s) headers = { 'Accept': 'text/html, application/xhtml+xml, image/jxr, */*', 'Referer': 'https://huoche.alitrip.com/hello.htm', 'Accept-Language': 'zh-CN', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586', 'Content-Type': 'application/x-www-form-urlencoded', 'Accept-Encoding': 'gzip, deflate', 'Host': 'huoche.alitrip.com', 'Content-Length': '73', 'Connection': 'Keep-Alive', 'Cache-Control': 'no-cache' } post_data = { 'h_u_s': base64.b64encode(h_u_s), 'h_u_n': thomas_username, 'h_u_p': base64.b64encode(thomas_password) } index_url = 'https://huoche.alitrip.com/index.htm' index_response = self.session_obj.post(index_url, headers=headers, data=post_data) logger.info(u'登陆成功,等待下载文件...') def download_file(self,thomas_username,thomas_password,args): for _ in xrange(3): try: self.login_thomas(thomas_username,thomas_password) break except Exception as e: logger.error(e) continue # 处理时间 all_time = self.date_time_handle(args) if not all_time: logger.error(u'日期格式错误!!') return for trade_date in all_time: try: self.trade_date = trade_date post_data = { 'orderExportDate': trade_date } download_url = 'https://huoche.alitrip.com/orderlistexp.do' sheet_content = '' for _ in xrange(3): try: # 得到exal文件流 download_response = self.session_obj.post(download_url, data=post_data) # 打开exal文件 xls_content = xlrd.open_workbook(file_contents=download_response.content) sheet_content = xls_content.sheets()[0] logger.info(u'下载文件成功,正在拿取订单号') break except Exception as e: logger.error(u'下载文件超时,正在等待重新登录后下载...') self.login_thomas(thomas_username, thomas_password) continue order_item = [] if not sheet_content: logger.error(u'下载文件失败,正在重新登录...') continue for line_num in range(sheet_content.nrows): line_item = sheet_content.row_values(line_num) if line_item[2] and line_item[2] not in order_item: order_item.append(line_item[2], ) # 订单号 order_no # 获取到所有订单号 order_item = order_item[1:] # 根据订单号去拿订单详情 logger.info(u'正在写入数据库') # 多线程去执行 pool = threadpool.ThreadPool(self.thread_num) reqs = threadpool.makeRequests(self.create_order_info, order_item) [pool.putRequest(req) for req in reqs] pool.wait() logger.info(u'写入完成,完成时间为:%s'% self.trade_date) content = self.add_content(len(order_item), self.succ_number, self.fail_number, self.fail_order) self.send_mail(content=content) self.succ_number,self.fail_order = 0,0 self.fail_order = [] # self.create_order_info(order_item) except Exception as e: logger.error(e) def date_time_handle(self,args): all_time = [] if args: if len(args) == 1: self.start_date = datetime.datetime.strptime(args[0], '%Y-%m-%d').date() self.end_date = datetime.datetime.strptime(datetime.datetime.now().strftime('%Y-%m-%d'), '%Y-%m-%d').date() elif len(args) == 2: self.start_date = datetime.datetime.strptime(args[0], '%Y-%m-%d').date() self.end_date = datetime.datetime.strptime(args[1], '%Y-%m-%d').date() elif len(args) == 3: self.start_date = datetime.datetime.strptime(args[0], '%Y-%m-%d').date() self.end_date = datetime.datetime.strptime(args[1], '%Y-%m-%d').date() self.thread_num = int(args[2]) else: logger.error(u'传入参数错误,请重新执行') return i = 0 while True: tomoary = self.start_date + datetime.timedelta(days=i) trade_date = tomoary.strftime('%Y-%m-%d') all_time.append(trade_date) i += 1 if tomoary == self.end_date: break else: today = datetime.datetime.now() yesterday = today + datetime.timedelta(days=-1) trade_date = yesterday.strftime('%Y-%m-%d') all_time.append(trade_date) return all_time def create_order_info(self, order): platform_obj = Platform() order_info = platform_obj.get_order(order) if not order_info: self.fail_order.append(order) self.fail_number += 1 logger.error('获取订单号:[%s]失败'%order) return try: # 插入order表 if TbTomasOrder.objects.filter(order_no=order).exists(): logger.error('订单号:[%s]已经存在于TbTomasOrder'%order) self.fail_order.append(order) self.fail_number += 1 return else: insert_order(order_info,order,self.trade_date) self.succ_number += 1 # 插入ticket表 insert_ticket(order_info,order,self.trade_date) # 插入联系人 if TbTomasLinkman.objects.filter(order_no=order).exists(): logger.error('订单号:[%s]已经存在于TbTomasLinkman'%order) else: insert_linkman(order_info,order,self.trade_date) # 插入epay表 if TbTomasEpay.objects.filter(order_no=order).exists(): logger.error('订单号:[%s]已经存在于TbTomasEpay'%order) else: insert_epay(order_info,order,self.trade_date) except Exception as e: logger.error(e) self.fail_number +=1 def add_content(self,total,succ_number,fail_number,fail_order): content = u''' <h3>托马斯导入订单报表</h3> <div class='col-xs-12'> <table border='1' cellpadding='3' cellspacing='1'> <tr> <td>日期</td> <td>总单数</td> <td>成功单数</td> <td>失败单数</td> <td>失败订单号</td> </tr> <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> </table> </div> '''%(datetime.datetime.now().strftime('%Y-%m-%d %H:%M'),total,succ_number,fail_number,fail_order) return content def send_mail(self, content): time_target = self.trade_date subject = u'托马斯数据抓取邮件 %s' % (time_target) logger.info(u'准备发送邮件....%s', subject) mail_address = settings.mail_address_thomas to_addr = [] if isinstance(mail_address, list): to_addr += mail_address elif isinstance(mail_address, str): to_addr.append(mail_address) logger.debug(to_addr) from_email = settings.DEFAULT_FROM_EMAIL msg = EmailMultiAlternatives(subject, 'result', from_email, to_addr) msg.attach_alternative(content, 'text/html') flag = msg.send() if flag: logger.info(u'%s发送成功', subject) else: logger.error(u'%s发送失败', subject) return def run(self, username,passwd,args): # 登陆托马斯后台 for _ in xrange(3): try: self.download_file(username,passwd,args) break except Exception as e: logger.error(e) continuedef re_search(regex, subject): subject = str(subject) obj = re.compile(regex) match = obj.search(subject) if match: result = match.group(1) else: result = '' return resultdef main(): username = base64.b64decode(settings.THOMAS_USERNAME) passwd = base64.b64decode(settings.THOMAS_PASSWORD) args = sys.argv[1:] if sys.argv[1:] else '' TbTomas().run(username,passwd,args)if __name__ == '__main__': main()

thread_code

(0)

相关推荐

  • 手把手教你训练自己的Mask R

    近来在学习图像分割的相关算法,准备试试看Mask R-CNN的效果. 关于Mask R-CNN的详细理论说明,可以参见原作论文https://arxiv.org/abs/1703.06870,网上也有 ...

  • Python爬虫模拟登陆哔哩哔哩(bilibili)并突破点选验证码功能

    写在前面   今天带给大家一个突破点选验证码的案例,利用爬虫模拟登陆哔哩哔哩,并且把一些采坑的地方给大家强调一下! 一.需求分析   模拟登陆哔哩哔哩   网站链接: https://passport ...

  • 会玩,有人用 Python 模拟导弹防御!

    作者:半壶砂  https://www.cnblogs.com/halfsand/p/7976636.html 最近中|东闹得凶,除了对某色列强烈谴责,最吸引眼球的要是他们的铁穹防御系统. 那如何用P ...

  • selenium+python自动化79-文件下载(SendKeys)

    前言 文件下载时候会弹出一个下载选项框,这个弹框是定位不到的,有些元素注定定位不到也没关系,就当没有鼠标,我们可以通过键盘的快捷键完成操作. SendKeys库是专业的处理键盘快捷事件的,所以这里需要 ...

  • selenium+python自动化80-文件下载(不弹询问框)

    前言 上一篇是点弹出框上的按钮去保存文件,本篇介绍一种更加优雅的方法,加载Firefox和Chrome的配置文件,不弹出询问框后台下载. 一.FirefoxProfile 1.点下载的时候,如下图,如 ...

  • selenium+python自动化99--文件下载弹窗处理(PyKeyboard)

    前言 在web自动化下载操作时,有时候会弹出下载框,这种下载框不属于web的页面,是没办法去定位的(有些同学一说到点击,脑袋里面就是定位!定位!定位!) 有时候我们并不是非要去定位到这个按钮再去点击, ...

  • 192页PPT | 半导体器件模拟仿真 (可下载)

    192页PPT | 半导体器件模拟仿真 (可下载)

  • Python模拟​:不在996中爆发,就在996中灭亡!

    996和955,哪种人生更成功? 为了能够进行定量模拟,我们必须假设一些标准:人生的主要目标是事业成就(每天用工作时间换取成就积分),8小时之外的工作时间有概率获得额外成就,且成就积分的积累有复利逻辑 ...

  • 第76天:Scrapy 模拟登陆

    想爬取网站数据?先登录网站!对于大多数大型网站来说,想要爬取他们的数据,第一道门槛就是登录网站.下面请跟随我的步伐来学习如何模拟登陆网站. 为什么进行模拟登陆? 互联网上的网站分两种:需要登录和不需要 ...

  • 用 Python 制作音乐聚合下载器

    来源:Python 技术「ID: pythonall」 现在的音乐APP有很多,为了不下载很多的APP,所以咱用python做了一个聚合的音乐下载器,现在聚合了咪咕音乐.QQ音乐,下面是效果图 安装 ...