· Tutorials  · 4 min read

Why working with asyncio can be tricky in python

When writing a Python script, have you ever felt that your program waits for something to happen, and during this time, it could be doing other things, then come back to the result once it’s ready?

When writing a Python script, have you ever felt that your program waits for something to happen, and during this time, it could be doing other things, then come back to the result once it’s ready?

For example, imagine making 1,000 HTTP requests and collecting the responses. If done in blocking way, you make a request, wait for the response, and repeat till you got all responses. But while waiting, you could be sending the other 999 requests! This is called concurrency, and Python’s asyncio helps with it.

However, writing async code can be tricky. Your code might seem correct with no obvious errors, but it could silently fail to do what it’s supposed to. For instance, tasks might be stuck waiting, or your program might not handle operations concurrently as you intended. These problems can be tricky to spot because everything looks fine on the surface, but the event loop or task sequencing could be going wrong behind the scenes.

My requirement was to make concurrent HTTP requests as I receive a certain message. The goal was to keep the time between request and message to the minimum.

And I wanted to do this at scale. So, I thought, let’s spawn multiple processes and they would listen for messages. As soon as any of the process receive the message, they create an async task using asyncio.create_task. When you do that, python is supposed to start executing the function immediately.

import asyncio
import multiprocessing
import random
 
 
async def make_request(message: str):
    print(f"MAKING REQUEST: {message} || {multiprocessing.current_process().name}")
 
async def listen_for_messages(task_queue: multiprocessing.Queue):
    tasks = set()
    while True:
        message = task_queue.get()
        print(f"MESSAGE RECIEVED: {message} || {multiprocessing.current_process().name}")
        print(message)
        t = asyncio.create_task(make_request(message))
        print(f"TASK CREATED: {t} || {multiprocessing.current_process().name}")
        tasks.add(t)
        t.add_done_callback(tasks.remove)
 
def start_async_loop(task_queue: multiprocessing.Queue):
    asyncio.run(listen_for_messages(task_queue))
 
async def main():
 
    task_queue = multiprocessing.Queue()
    number_of_processes = multiprocessing.cpu_count()
 
    processes = []
    for i in range(number_of_processes):
        p = multiprocessing.Process(target=start_async_loop, args=(task_queue,), name=f"Process {i}", daemon=True)
        p.start()
        processes.append(p)
     
    # test code
    while True:
        task_queue.put(f"Message {hex(random.randint(0, 1000000))}")
        await asyncio.sleep(1)
 
if __name__ == "__main__":
    asyncio.run(main())

But when you run this code, this doesn’t happen. Tasks pile up, but the function handle_message is not getting called.

At first, you don’t see any flaw in the code. We are listening for the message from queue and as soon as we receive it we are creating an async task. But why not the function: make_request are not getting executed?

Pause a minute and think about what’s causing the issue. I spent an hour trying to debug this. ChatGPT couldn’t diagnose the issue. It could later, though, when you give it the correct context.

If you have found the issue, kudos to you 🎉.

The problem was with the blocking wait for messages from multiprocessing.Queue.

You call asyncio.create_task and then immediately block to wait for next message. There is no chance for event loop to run the handle_message task as you are blocking it.

You could usually find non-blocking versions of these functions that do the same job of waiting for some message, but don’t block the event loop.

asyncio library in python provides you a way to run blocking code in event loop without clogging it. run_in_executor. Executor can be ProcessPoolExecutor or ThreadPoolExecutor.

async def main():
    loop = asyncio.get_running_loop()
 
    ## Options:
 
    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)
 
    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)
 
    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)
 
if __name__ == '__main__':
    asyncio.run(main())

These use a process or thread pool to execute code asynchronously without blocking event loop. It’s like promisifying if you come from JavaScript world, but the code is run in different thread or process.

There is a handy function called asyncio.to_thread that achieves the same end result. It creates a thread pool internally and executes a blocking task in threads.

Here is the correct version of my previous code, which achieves the end result I wanted.

import asyncio
import multiprocessing
import random
 
 
async def make_request(message: str):
    print(f"MAKING REQUEST: {message} || {multiprocessing.current_process().name}")
 
async def listen_for_messages(task_queue: multiprocessing.Queue):
    tasks = set()
    while True:
        message = await asyncio.to_thread(task_queue.get)
        print(f"MESSAGE RECIEVED: {message} || {multiprocessing.current_process().name}")
        print(message)
        t = asyncio.create_task(make_request(message))
        print(f"TASK CREATED: {t} || {multiprocessing.current_process().name}")
        tasks.add(t)
        t.add_done_callback(tasks.remove)
 
def start_async_loop(task_queue: multiprocessing.Queue):
    asyncio.run(listen_for_messages(task_queue))
 
async def main():
 
    task_queue = multiprocessing.Queue()
    number_of_processes = multiprocessing.cpu_count()
 
    processes = []
    for i in range(number_of_processes):
        p = multiprocessing.Process(target=start_async_loop, args=(task_queue,), name=f"Process {i}", daemon=True)
        p.start()
        processes.append(p)
     
    # test code
    while True:
        task_queue.put(f"Message {hex(random.randint(0, 1000000))}")
        await asyncio.sleep(1)
 
if __name__ == "__main__":
    asyncio.run(main())
Share:
Back to Blog

Related Posts

View All Posts »