Python asyncio doesn't show any errors
I am trying to get some data from thousands of URLs by using asyncio. Here is a brief overview of the design:
Queue
Populate a bunch of URLs at once with one URLProducer
- generate a bunch
Consumers
- Each
Consumer
keeps fetching urls fromQueue
and sendingGET
requests asynchronously - Do some post-processing on the results
- Combine all processed results and return
Problem: asyncio
It almost never shows if there are any errors, it just hangs silently without any errors. I put statements print
everywhere to detect my own problem, but it didn't help much.
Depending on the number of URLs entered and the number or limit of users, I may encounter the following errors:
Task was destroyed but it is pending!
task exception was never retrieved future: <Task finished coro=<consumer()
aiohttp.client_exceptions.ServerDisconnectedError
aiohttp.client_exceptions.ClientOSError: [WinError 10053] An established connection was aborted by the software in your host machine
Question: How to detect and handle exceptions in asyncio
? How to try again without disturbing Queue
?
Bellow is the code I wrote looking at various asynchronous code examples. Currently, there is an intentional error at the end of the def get_video_title
function . When running, nothing is displayed.
import asyncio
import aiohttp
import json
import re
import nest_asyncio
nest_asyncio.apply() # jupyter notebook throws errors without this
user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36"
def get_video_title(data):
match = re.search(r'window\[["\']ytInitialPlayerResponse["\']\]\s*=\s*(.*)', data)
string = match[1].strip()[:-1]
result = json.loads(string)
return result['videoDetails']['TEST_ERROR'] # <---- should be 'title'
async def fetch(session, url, c):
async with session.get(url, headers={"user-agent": user_agent}, raise_for_status=True, timeout=60) as r:
print('---------Fetching', c)
if r.status != 200:
r.raise_for_status()
return await r.text()
async def consumer(queue, session, responses):
while True:
try:
i, url = await queue.get()
print("Fetching from a queue", i)
html_page = await fetch(session, url, i)
print('+++Processing', i)
result = get_video_title(html_page) # should raise an error here!
responses.append(result)
queue.task_done()
print('+++Task Done', i)
except (aiohttp.http_exceptions.HttpProcessingError, asyncio.TimeoutError) as e:
print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Error', i, type(e))
await asyncio.sleep(1)
queue.task_done()
async def produce(queue, urls):
for i, url in enumerate(urls):
print('Putting in a queue', i)
await queue.put((i, url))
async def run(session, urls, consumer_num):
queue, responses = asyncio.Queue(maxsize=2000), []
print('[Making Consumers]')
consumers = [asyncio.ensure_future(
consumer(queue, session, responses))
for _ in range(consumer_num)]
print('[Making Producer]')
producer = await produce(queue=queue, urls=urls)
print('[Joining queue]')
await queue.join()
print('[Cancelling]')
for consumer_future in consumers:
consumer_future.cancel()
print('[Returning results]')
return responses
async def main(loop, urls):
print('Starting a Session')
async with aiohttp.ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=300)) as session:
print('Calling main function')
posts = await run(session, urls, 100)
print('Done')
return posts
if __name__ == '__main__':
urls = ['https://www.youtube.com/watch?v=dNQs_Bef_V8'] * 100
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(loop, urls))
The problem is that you consumer
are only catching two very specific exceptions and in this case marking the task as completed. It will terminate the consumer if any other exception occurs (such as a network related exception). However, it is not detected run
that this is waiting queue.join()
(effectively) for a consumer running in the background. That's why your program hangs - the queued items are never considered, the queue is never fully processed.
There are two ways to solve this problem, depending on what the program wants to do when it encounters an unexpected exception. If you want it to keep running, you can add a catch-all clause except
to the consumer , such as:
except Exception as e
print('other error', e)
queue.task_done()
The alternative is to propagate unhandled consumer exceptions to run
. This must be arranged explicitly, but has the advantage of never allowing exceptions to be passed silently. ( See this article for details on this topic .) One way to achieve this is to wait and consumer at the same time. Since consumers are in an infinite loop, they only complete if an exception occurs.queue.join()
print('[Joining queue]')
# wait for either `queue.join()` to complete or a consumer to raise
done, _ = await asyncio.wait([queue.join(), *consumers],
return_when=asyncio.FIRST_COMPLETED)
consumers_raised = set(done) & set(consumers)
if consumers_raised:
await consumers_raised.pop() # propagate the exception
Question: How to detect and handle exceptions in asyncio?
Exceptions are propagated await
like any other code and detected and handled normally. Only special handling is required to catch exceptions leaked from "background" tasks such as consumer
.
How can I retry without interrupting the queue?
You can call await queue.put((i, url))
in a except
block . The item will be added to the back of the queue for the consumer to pick up. In this case, you only need the first snippet, and don't want to bother trying to propagate the exception consumer
into run
.