Bringing multithreading to Python's async event loop


Posted: 15 September 2024

project url





Introduction


I kicked off this project as a bit of a joke after a colleague made the classic mix-up, thinking async in Python magically meant multithreading. It got me wondering if it was actually possible to make Python’s async event loop work with multiple threads. I know that tokio already has this capability and zig is trying to make it happen. And with Python 3.13 letting you disable the GIL, it feels like the perfect time for this experiment. Plus, it’s a good excuse to dive deeper into Python’s async system and understand it better.

The goal here isn’t to create a production-ready or high-performance multithreaded alternative to asyncio. This is just a chance to explore the idea, share what I learn, and hopefully get a basic FastAPI server running with this multithreaded async setup.


How the event loop works


David Beazley’s workshop at PyCon India 2019 gives an excellent deep dive into how the Python event loop works, but here’s a brief overview.

The event loop is like a to-do list of tasks that need to be completed. We go through this list, running each task one by one. Unlike operating systems that may interrupt tasks, Python’s event loop doesn’t preemptively pause tasks. Instead, it waits for tasks to either yield control voluntarily (using the await statement) or complete on their own.

Its also important to note that in python every thread has it's own separate event loop. This needs to be considered in the design of a multithreaded async approach

In the asyncio library’s source code, you’ll find that the event loop is highly customizable. It provides an abstract interface via the BaseEventLoop class.Instead of implementing all the methods required manually, we'll inherit most of the methods from the existing _UnixSelectorEventLoop and override methods as needed.

For the purposes of this project, the important mechanisms of the event loop are as follows:

1. Immediate scheduling

Methods like call_soon and run_forever handle immediate scheduling. call_soon adds a task to the ready list to be executed as soon as possible.

run_forever keeps the event loop running, processing tasks and callbacks until it’s stopped. This method is indirectly called by asyncio.run, which sets up a loop, runs a coroutine passed as an arguement, and then closes the loop.

2. Scheduling later

The call_at function schedules tasks to run at a specific time. For instance:
await asyncio.sleep(10)
func()
Here, python suspends the running coroutine at await asyncio.sleep(10) and runs the next one. After 10 seconds are up, this coroutine is added back to the ready list.

The way this works under the hood is, the suspended coroutine is added to a "schedule heap" (BaseEventLoop._scheduled). Items in the heap are sorted by timestamp so that the top of the heap is always the next task that will be ready to run. Every time the event loop completes 1 cycle of pick a task from the ready list and execute it, we check the schedule heap for tasks that are ready, and add them to the ready list

3. Handling Networking Operations

For a simple FastAPI server, the key networking methods fo BaseEventLoop we need are:

i. Creating Connections

create_connection: This method starts TCP connections. Libraries like httpx and anyio use it to handle async network operations.

ii. Receiving Data

Methods like sock_accept and sock_recv manage incoming connections and data reception. These will be needed to have a FastAPI server listen for incoming requests

Bringing it all together

In the _run_once method of the BaseEventLoop one entire cycle of running event loop tasks occurs as follows. The following diagram illustrates this flow:

Existing run once functionality
Existing run once functionality


Simple, right? Now, how do we make this thread-safe and prevent our threads from busy waiting?


My approach


While it isn't perfect, here's what I've done:

Ready list management

Where the BaseEventLoop uses a simple list for the ready list, I've chosen to use a queue.Queue. This lets worker threads wait(i.e. block) for new tasks when the queue is empty. To run tasks from the ready list in parallel we'll have a thread pool of worker threads get items from the ready queue and run them.

Normally each worker thread would have their own separate event loop. This would cause issues when a running task wants to schedule something. To prevent this and have the worker threads all share the same event loop, I've written a custom policy(the thing that actually creates the event loop) that allows me to reuse an existing event loop

Ready list management
New run once functionality




Handling scheduled tasks

To simplify the management of tasks without blocking the ready queue workers on two separate queues (the ready queue and the scheduled queue), we will introduce a dedicated "schedule thread."

In the updated implementation, the _scheduled list will be replaced with a queue.Queue. The call_at method will enqueue tasks into this queue, which the schedule thread will then dequeue for processing.

This schedule thread will maintain a local heap that organizes tasks based on the timestamp at which they will become ready, ensuring that the task ready soonest is always at the top. It will block on the _scheduled queue, using a timeout equal to the next readiness time of the task at the top of the heap. When a task becomes ready, the schedule thread will pop it from the heap and enqueue it onto the ready queue.



Scheduling functionality
Scheduling functionality




Network Operations

Network operations is where this approach breaks down a bit because of how the selector interface works. As far as I can tell, the problem is here. Specifically, calling _selector.select doesn’t remove ready I/O events. This means that if you call _selector.select multiple times in a row, you’ll get the same list of ready events each time. This is problematic in a multithreaded environment because different threads could try to process the same event.

To address this, I’ve implemented a solution where a separate thread calls _selector.select and adds the events to the ready queue. While this approach helps, it’s not perfect. The network thread might still see events as ready until they’re processed from the ready queue, which can cause some spurious errors. Despite these issues, the errors raised aren’t catastrophic, so I’m proceeding with this setup. I plan to rewrite the IO methods from scratch in the future to better handle these problems, but that’s for a different project.

Network operations
Network operations





Changes required


asyncio.tasks.Task prevents a single event loop from executing 2 tasks at the same time. Specifically, in the __step method, the _enter_task and _leave_task functions will throw an error if an event loop tries to execute 2 tasks in parallel. Rather than removing these functions outright, we simply avoid throwing the error by overriding the definitions of these functions. Finally to use our custom definition of task we can override the create_task method in our custom event loop


Results


All in all, the custom event loop runs smoothly for normal async tasks like call_soon and call_at —no hiccups there.

Networking, though, brings a few gremlins. The requests go through, but we’ve got some non-fatal, errors from threads stepping on each other while processing the same event. It's that whole _selector.select business I mentioned earlier.

And FastAPI? Well, it’s a mixed bag. On Python 3.13-dev, I hit a hard wall with a segfault right on import of pydantic, which was fun. I'm not sure if this is just a me issue or not, I'll update this section if I do get it working. But on Python 3.12, the multithreaded event loop ran just fine with a simple FastAPI server.

So, in the end: Mission accomplished...mostly!


What are the caveats?


Nothing is ever free, so what are the drawbacks of this approach.

Well to begin with, people writing async code never had to worry about locking shared resources since only a single function would run at a time. Allowing tasks to be run in parallel brings back this requirement. This doesn't matter too much for FastAPI servers, since if you wanted to serve this with multiple worker processes anyway, your code was probably already lock free. But with normal async code you will need to add mutexes over shared resources and be more mindful of race conditions

If you were to compare this approach to pure async or pure multithreaded, this approach would help achieve better utilization of the CPU when you have a mix of network calls and CPU intensive tasks. For example with a pure multithreaded approach, you would have to have enough threads to make sure that the threads blocking on network calls doesn't freeze your entire server. Similarly with a pure asyncio approach, as soon as you hit cpu intensive tasks your server would be unable to process new/other requests in the meantime.


Trying it out:


I've got all the code hosted on github with installation instructions on the README. The tests folder in my project github has scripts to test general scheduling, scheduling of tasks to be executed later(call_at) etc.

But the tests/sample_fastapi folder contains a sample asgi compatible server. This server was taken from this very helpful tutorial. The reason I wrote an asgi server from scratch is that many existing asgi servers(uvicorn, gunicorn, etc.) have their own custom event loops. So to ensure my event loop is being used with FastAPI I'm using this simple asgi server.