A Guide to Python asyncio — part 2

Kamil Wasilewski
6 min readMar 1, 2021

Some time ago I wrote an article about the asyncio library in Python. The article describes the benefits of asynchronous operations over OS threads as well as how to use a new async/await syntax which arrives with asyncio. If you have not seen it yet, I strongly recommend to have a look at this. Although most fundamental concepts have been covered by the article, there are still plenty of things that you can do with asyncio. Surely they deserve more attention. This is why I am getting back with the second installment of a Guide to Python asyncio. This time, I am going to discuss how to write a client-server application using streams. You are going to learn what are asynchronous generators and asynchronous for loops. Finally, you are going to see how to perform asynchronous I/O operations on files using the aiofiles tool and how to synchronize concurrent code using synchronization primitives.

Echo server

Our goal is to implement a text chat in which you can, as a client, communicate with other clients though a server. I am going to begin with a prototype, which will serve as a good starting point for a discussion. In the next step, more functionalities will be added until we end up with a fully working application.

You can find the whole code on this GitHub repository. If you take a look at commit history, you can easily check the process of developing the application.

Let’s start from a server that receives incoming messages from clients and sends them back to the recipient. The client can send a message of arbitrary length. A message is sent once the user presses “Enter”.

The application starts by creating a coroutine object and passing it to asyncio.run.

start_server awaits on asyncio.start_server, which returns an asyncio.Server object. asyncio.start_server takes a coroutine function, which must take two arguments, instances of asyncio.StreamReader and asyncio.StreamWriter.

The server will not accept any connection until we await on serve_forever(). Note that the program uses async with server. I have already mentioned asynchronous context managers in the previous article, but I did not explain what they do. You probably know that context managers in Python are a recommended way to allocate and release external resources, e.g. connections to a database. Context manager is a class that implements two methods: __enter__ and __exit__. But those methods are regular Python functions and we cannot use them to interact with asynchronous code. This is where asynchronous context managers (ACM) come in. Instead of __enter__ and __exit__ methods, ACM defines __aenter__ and __aexit__ methods. If you take a look at any example implementation, you will notice those are async functions (defined via async def).

Every time a new client establishes a connection to the server, a new coroutine object will be automatically created and scheduled on the event loop. As you can see, handle_connection uses another new syntax — async for. Whenever a user needs to iterate over an iterable using for loop, the Python interpreter obtains an iterator, i.e. an object that implements __next__ method. __next__ returns the next item in a collection or raises StopIteration when there are no more items. However, once again, __next__ is a regular function which must be replaced by its concurrent equivalent. This can be done by an asynchronous iterator with its __anext__ method.

Now let’s focus on common.py:

The server is constantly waiting for new messages from a client. Each message is terminated by the newline character, so we need a generator that yields lines of text. This is the role of readlines. read_until_eol has been extracted to a separate entity, because it is also used by the client code for printing echoed messages.

Lastly, here is the client code:

Three things are worth mentioning here:
• The client establishes a connection by awaiting on asyncio.open_connection. This coroutine returns a reader and a writer, which can be used to read/write messages to/from the server.
await writer.drain() halts the execution until it is possible to resume writing. This prevents the underlying IO write buffer from overflow. Note the same pattern is also being used by the server.
writer.close() closes the stream and the underlying socket. This method must be used along with wait_closed coroutine, which must be awaited on.

Creating a simple chat

It is time to upgrade the application by implementing all missing parts. But before we start, we have to fix a serious flaw in the client code.
When developing an asynchronous program, it is crucial to avoid any blocking calls that can freeze the event loop. The client uses a regular for loop to get text input from the user. However, the program spends a huge amount of time on just waiting for the user input! This is happening, because, by default, file operations are not asynchronous. Unless the problem is solved, the chat will not work, because the client must be ready to read incoming messages from other clients at any time.

Luckily, there is an excellent tool for handling files in asyncio applications called aiofiles. Here’s the client code that has been fixed using aiofiles:

When it comes to the server, I defined a global users dictionary which stores user address and asyncio.StreamWriter object pairs. Then I made a couple of changes to handle_connection coroutine. Those changes handle adding a new user to the dictionary, sending back a message to all users and removing the user once the connection is closed.

Our simple asynchronous chat is now working properly. If you want to give it a try, the link to the repository is provided at the beginning of the Echo server section of this post.

Synchronization primitives

Finally, let’s talk about synchronization primitives. asyncio offers a number of synchronization primitives. The full list can be found here. But why do we need synchronization and when to use these?

When writing multithreaded applications, one of the most common issues is race conditions. A race condition occurs when more than one thread accesses a shared state. A context switch may happen at any time, and a programmer has no control over it, because it is an OS that manages threads. As a result, the order in which threads will access the state is unknown. The problem is easier in asyncio applications, because a context switch may happen only at await lines of code, but it still exists.

If you want some part of code to be executed only by a single coroutine at the same time, then you should use asyncio.Lock. Take a look at the following example:

The result of running this example is:

computed value: 10
computed value: 1
counter: 1

Most likely, we will never receive a valid result. As soon as one coroutine halts its execution at the await asyncio.sleep(0.1) line, another one is resumed. But the counter has not been updated by the first coroutine yet and is still 0. That’s why we only see one computed value, and not the sum of two values.

If we use asyncio.Lock, things will go much better:

computed value: 3
computed value: 5
counter: 8

Once again one coroutine halts its executions and another one is resumed, but the second one now waits until the first one is done.

When it comes to other primitives, asyncio.Semaphore could be useful if you want some part of code to be executed by a limited number of coroutines at the same time. A perfect example is rate limiting, i.e. controlling the rate of HTTP requests to an external website.

Summary

We have successfully implemented a simple chat application, which uses many new asyncio features, like asynchronous generators and asynchronous for loops. It was also demonstrated how dangerous is mixing a blockling code with an asynchronous code. Last, but not least, we saw an overview of asyncio synchronization primitives.

In conclusion, if you are going to start a new Python project and you think about asyncio, you have to consider what tools you need. If those tools do not have their asyncio version, you had better give up on asyncio. Otherwise, asyncio will bring many benefits: simplicity, especially when compared to threading applications, and good performance.

--

--