好长时间没有更新博客了,哈哈。
今天公司给了这么一个需求,现在我们需要去淘宝获取上一天的订单号,然后再根据订单号去另一个接口去获取订单详情,然后再给我展示到web!
中间涉及到的技术点有:
- 模拟登陆
- 模拟下载
- 解析exal文件数据流
- 读取exal文件,拿出订单号
- 还有最后一点请求接口
下面就给大家挨个说一下,刚拿到需求其实还是很模糊的,因为一个都没做过,等静下心来去理解的时候,发现并没有那么难,反而很简单
模拟登陆
一、分析页面请求头
本次登陆地址是https://huoche.alitrip.com/hello.htm
1、先登陆了一遍查看了一下请求头,发现就携带了三个东西,隐藏token,用户名,密码
一看一目了然,就一个后台页面,可想而知相对来说还是很简单,哈哈,下一步我只需要封装一下cookie,然后带上tocken,username,passwd去登陆咯
给大家说下,python的requests模块可以忽略cookie,自己创建一个session对象,他自己去给咱们匹配cookie,不用去挨个试cookie,这样就节省了好多代码和时间
2、代码如下
class TbTomas(object): def __init__(self): # 配置初始化 self.session_obj = requests.session() def download_file(self,thomas_username,thomas_password,): hello_url = 'https://huoche.alitrip.com/hello.htm' # 获取原文 hello_response = self.session_obj.get(hello_url) # 正则匹配原文 h_u_s = re_search('<input type='hidden' id='h_u_s' name='h_u_s' value='(.*?)'>', hello_response.text) h_u_s = base64.b64encode(h_u_s) headers = { 'Accept': 'text/html, application/xhtml+xml, image/jxr, */*', 'Referer': 'https://huoche.alitrip.com/hello.htm', 'Accept-Language': 'zh-CN', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586', 'Content-Type': 'application/x-www-form-urlencoded', 'Accept-Encoding': 'gzip, deflate', 'Host': 'huoche.alitrip.com', 'Content-Length': '73', 'Connection': 'Keep-Alive', 'Cache-Control': 'no-cache' } post_data = { 'h_u_s': base64.b64encode(h_u_s), 'h_u_n': thomas_username, 'h_u_p': base64.b64encode(thomas_password) } index_url = 'https://huoche.alitrip.com/index.htm' index_response = self.session_obj.post(index_url, headers=headers, data=post_data)
最后一提交post请求,就可以判断有没有登录成功了,是不是很简单,哈哈!
数据下载
下载也是和登录是一样的道理,下载的时候肯定也是像网页发一个post请求,然后就回去下载exal文件咯,python有这么一个模块xlrd,可以去操作exal文件,非常方便
1、原文是让我们输入时间看,下载那一天的数据,领导给的任务是下载前一天的,所以上一天时间要写几行代码来实现
代码如下:
today = datetime.datetime.now()yesterday = today + datetime.timedelta(days=-1)trade_date = yesterday.strftime('%Y-%m-%d')
2、查看下载文件请求的url,以及提交的数据,一张图一切都明白了
从图中可以看到,该文发送的url,请求方式,请求头,和返回的数据
3、模拟请求下载,只需用提交一下日期就OK搞定,文件下载完毕,接下开要读文件拿自己想要的东西啦
post_data = { 'orderExportDate': trade_date } sheet_content = '' for _ in xrange(3): try: # 得到exal文件流 download_response = self.session_obj.post(download_url, data=post_data) # 打开exal文件 xls_content = xlrd.open_workbook(file_contents=download_response.content) sheet_content = xls_content.sheets()[0] break except Exception as e: continue
4、这个就众所周知,和读取文件一样,for循环一行一行读取,然后把订单号挨个添加给一个列表啥啦乱七八糟的
order_item = [] for line_num in range(sheet_content.nrows): line_item = sheet_content.row_values(line_num) if line_item[2]: order_item.append(line_item[2], ) # 订单号 order_no # 获取到所有订单号 order_item = order_item[1:]
拿到订单号要去获取订单详情了,但是领导给我说这个已经有同事写好代码了,只需要调用那个接口就好,所以别人的代码我就不往上面展示了,原理很简单
requests模块,请求url,get传入订单号,发送请求,就可以返回数据咯,web页面展示,那个需求,每个公司都不一样,存入数据库,自己取自己想要的吧。
本文就到这里吧,学到一点东西的请点赞,哈哈
最后附带源码,用户名和密码就不告诉大家啦,啊哈哈
#!/usr/bin/python# coding:utf-8import sysimport osimport djangoreload(sys)sys.setdefaultencoding('utf8')sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 把manage.py所在目录添加到系统目录os.environ['DJANGO_SETTINGS_MODULE'] = 'business.settings' # 设置setting文件django.setup() # 初始化Django环境import requestsimport reimport loggingimport base64import xlrdimport datetimeimport timeimport MySQLdbimport threadpoolfrom business import settingsfrom train.depends.platform import Platformfrom train.models import TbTomasOrder,TbTomasEpay,TtTicketThomas,TbTomasLinkmanfrom train import utilsfrom train.status import OrderStatusfrom django.core.mail import EmailMultiAlternativesfrom train.busi import insert_order,insert_ticket,insert_epay,insert_linkmanlogger = logging.getLogger('django')class TbTomas(object): succ_number = 0 fail_number = 0 fail_order = [] def __init__(self,thread_num = 3): # 配置初始化 self.session_obj = requests.session() self.fail_order = [] self.succ_number = 0 self.fail_number = 0 self.thread_num = thread_num self.start_date = '' self.end_date = '' self.trade_date = utils.now() def login_thomas(self,thomas_username,thomas_password): hello_url = 'https://huoche.alitrip.com/hello.htm' hello_response = self.session_obj.get(hello_url) h_u_s = re_search('<input type='hidden' id='h_u_s' name='h_u_s' value='(.*?)'>', hello_response.text) h_u_s = base64.b64encode(h_u_s) headers = { 'Accept': 'text/html, application/xhtml+xml, image/jxr, */*', 'Referer': 'https://huoche.alitrip.com/hello.htm', 'Accept-Language': 'zh-CN', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586', 'Content-Type': 'application/x-www-form-urlencoded', 'Accept-Encoding': 'gzip, deflate', 'Host': 'huoche.alitrip.com', 'Content-Length': '73', 'Connection': 'Keep-Alive', 'Cache-Control': 'no-cache' } post_data = { 'h_u_s': base64.b64encode(h_u_s), 'h_u_n': thomas_username, 'h_u_p': base64.b64encode(thomas_password) } index_url = 'https://huoche.alitrip.com/index.htm' index_response = self.session_obj.post(index_url, headers=headers, data=post_data) logger.info(u'登陆成功,等待下载文件...') def download_file(self,thomas_username,thomas_password,args): for _ in xrange(3): try: self.login_thomas(thomas_username,thomas_password) break except Exception as e: logger.error(e) continue # 处理时间 all_time = self.date_time_handle(args) if not all_time: logger.error(u'日期格式错误!!') return for trade_date in all_time: try: self.trade_date = trade_date post_data = { 'orderExportDate': trade_date } download_url = 'https://huoche.alitrip.com/orderlistexp.do' sheet_content = '' for _ in xrange(3): try: # 得到exal文件流 download_response = self.session_obj.post(download_url, data=post_data) # 打开exal文件 xls_content = xlrd.open_workbook(file_contents=download_response.content) sheet_content = xls_content.sheets()[0] logger.info(u'下载文件成功,正在拿取订单号') break except Exception as e: logger.error(u'下载文件超时,正在等待重新登录后下载...') self.login_thomas(thomas_username, thomas_password) continue order_item = [] if not sheet_content: logger.error(u'下载文件失败,正在重新登录...') continue for line_num in range(sheet_content.nrows): line_item = sheet_content.row_values(line_num) if line_item[2] and line_item[2] not in order_item: order_item.append(line_item[2], ) # 订单号 order_no # 获取到所有订单号 order_item = order_item[1:] # 根据订单号去拿订单详情 logger.info(u'正在写入数据库') # 多线程去执行 pool = threadpool.ThreadPool(self.thread_num) reqs = threadpool.makeRequests(self.create_order_info, order_item) [pool.putRequest(req) for req in reqs] pool.wait() logger.info(u'写入完成,完成时间为:%s'% self.trade_date) content = self.add_content(len(order_item), self.succ_number, self.fail_number, self.fail_order) self.send_mail(content=content) self.succ_number,self.fail_order = 0,0 self.fail_order = [] # self.create_order_info(order_item) except Exception as e: logger.error(e) def date_time_handle(self,args): all_time = [] if args: if len(args) == 1: self.start_date = datetime.datetime.strptime(args[0], '%Y-%m-%d').date() self.end_date = datetime.datetime.strptime(datetime.datetime.now().strftime('%Y-%m-%d'), '%Y-%m-%d').date() elif len(args) == 2: self.start_date = datetime.datetime.strptime(args[0], '%Y-%m-%d').date() self.end_date = datetime.datetime.strptime(args[1], '%Y-%m-%d').date() elif len(args) == 3: self.start_date = datetime.datetime.strptime(args[0], '%Y-%m-%d').date() self.end_date = datetime.datetime.strptime(args[1], '%Y-%m-%d').date() self.thread_num = int(args[2]) else: logger.error(u'传入参数错误,请重新执行') return i = 0 while True: tomoary = self.start_date + datetime.timedelta(days=i) trade_date = tomoary.strftime('%Y-%m-%d') all_time.append(trade_date) i += 1 if tomoary == self.end_date: break else: today = datetime.datetime.now() yesterday = today + datetime.timedelta(days=-1) trade_date = yesterday.strftime('%Y-%m-%d') all_time.append(trade_date) return all_time def create_order_info(self, order): platform_obj = Platform() order_info = platform_obj.get_order(order) if not order_info: self.fail_order.append(order) self.fail_number += 1 logger.error('获取订单号:[%s]失败'%order) return try: # 插入order表 if TbTomasOrder.objects.filter(order_no=order).exists(): logger.error('订单号:[%s]已经存在于TbTomasOrder'%order) self.fail_order.append(order) self.fail_number += 1 return else: insert_order(order_info,order,self.trade_date) self.succ_number += 1 # 插入ticket表 insert_ticket(order_info,order,self.trade_date) # 插入联系人 if TbTomasLinkman.objects.filter(order_no=order).exists(): logger.error('订单号:[%s]已经存在于TbTomasLinkman'%order) else: insert_linkman(order_info,order,self.trade_date) # 插入epay表 if TbTomasEpay.objects.filter(order_no=order).exists(): logger.error('订单号:[%s]已经存在于TbTomasEpay'%order) else: insert_epay(order_info,order,self.trade_date) except Exception as e: logger.error(e) self.fail_number +=1 def add_content(self,total,succ_number,fail_number,fail_order): content = u''' <h3>托马斯导入订单报表</h3> <div class='col-xs-12'> <table border='1' cellpadding='3' cellspacing='1'> <tr> <td>日期</td> <td>总单数</td> <td>成功单数</td> <td>失败单数</td> <td>失败订单号</td> </tr> <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> </table> </div> '''%(datetime.datetime.now().strftime('%Y-%m-%d %H:%M'),total,succ_number,fail_number,fail_order) return content def send_mail(self, content): time_target = self.trade_date subject = u'托马斯数据抓取邮件 %s' % (time_target) logger.info(u'准备发送邮件....%s', subject) mail_address = settings.mail_address_thomas to_addr = [] if isinstance(mail_address, list): to_addr += mail_address elif isinstance(mail_address, str): to_addr.append(mail_address) logger.debug(to_addr) from_email = settings.DEFAULT_FROM_EMAIL msg = EmailMultiAlternatives(subject, 'result', from_email, to_addr) msg.attach_alternative(content, 'text/html') flag = msg.send() if flag: logger.info(u'%s发送成功', subject) else: logger.error(u'%s发送失败', subject) return def run(self, username,passwd,args): # 登陆托马斯后台 for _ in xrange(3): try: self.download_file(username,passwd,args) break except Exception as e: logger.error(e) continuedef re_search(regex, subject): subject = str(subject) obj = re.compile(regex) match = obj.search(subject) if match: result = match.group(1) else: result = '' return resultdef main(): username = base64.b64decode(settings.THOMAS_USERNAME) passwd = base64.b64decode(settings.THOMAS_PASSWORD) args = sys.argv[1:] if sys.argv[1:] else '' TbTomas().run(username,passwd,args)if __name__ == '__main__': main()
thread_code