如何使用Python异步编程进行API调用 | 区块链研究实验室
原创 链三丰 区块链研究实验室 今天
收录于话题
#Python1
#区块链技术33
#区块链44
#API1
#区块链应用30
本文中,将向大家介绍如何使用Python异步编程,以便您可以更快地进行更多的API调用。那么让我们开始吧。
请求库
import requestsresponse = requests.get("http://example.com/")print(response)
import requestsfor i in range(10): response = requests.get("http://example.com/") print(response)
将请求发送至example.com。 等待回应。 得到回应。
import requestsimport osapi_key = os.getenv('ALPHAVANTAGE_API_KEY')url = 'https://www.alphavantage.co/query?function=OVERVIEW&symbol={}&apikey={}'symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT', 'PEP']results = []for symbol in symbols: response = requests.get(url.format(symbol, api_key)) results.append(response.json())
此时必须等待大约1.5秒才能进行5个API调用,然后需要11秒才能进行50个API调用,需要50秒才能进行135个API调用……
异步代码与同步代码
cook_burger()cook_vegetables()
async def cook_meal(): await asyncio.gather(cook_burger(), cook_vegetables())asyncio.run(cook_meal())
事件循环
发出第一个请求。 等待。 得到第一反应。 发出第二个请求。 等待。 得到第二个答复。
发出第一个请求。 发出第二个请求。 等待。 得到第一反应。 得到第二个答复。
然后,我们可以异步运行我们的代码。
输入asyncio和aiohttp
import asyncioimport aiohttpimport osimport timeapi_key = os.getenv('ALPHAVANTAGE_API_KEY')url = 'https://www.alphavantage.co/query?function=OVERVIEW&symbol={}&apikey={}'symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT', 'PEP']results = []async def get_symbols(): async with aiohttp.ClientSession() as session: for symbol in symbols: response = await session.get(url.format(symbol, api_key), ssl=False)asyncio.run(get_symbols())
分解
loop = asyncio.get_event_loop()results = loop.run_until_complete(get_symbols())loop.close()
async def get_symbols(): async with aiohttp.ClientSession() as session: for symbol in symbols: response = await session.get(url.format(symbol, api_key), ssl=False)
收集任务
import asyncioimport aiohttpimport osimport timeapi_key = os.getenv('ALPHAVANTAGE_API_KEY')url = 'https://www.alphavantage.co/query?function=OVERVIEW&symbol={}&apikey={}'symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT', 'PEP']results = []def get_tasks(session): tasks = [] for symbol in symbols: tasks.append(session.get(url.format(symbol, api_key), ssl=False)) return tasksasync def get_symbols(): async with aiohttp.ClientSession() as session: tasks = get_tasks(session) responses = await asyncio.gather(*tasks)asyncio.run(get_symbols())
tasks = [session.get(URL.format(symbol, API_KEY), ssl=False) for symbol in symbols]
responses = await asyncio.gather(*tasks)
responses = await asyncio.gather(session.get(URL.format('IBM', API_KEY), ssl=False), session.get(URL.format('AAPL', API_KEY), ssl=False), session.get(URL.format('MSFT', API_KEY), ssl=False))
协程与任务
tasks.append(session.get(url.format(symbol, api_key), ssl=False))
tasks.append(asyncio.create_task(session.get(url.format(symbol, api_key), ssl=False)))
链三丰
你的赞赏是我持续不断的动力,谢
喜欢作者
阅读 30
赞在看
写下你的留言
赞 (0)