Concurrency in Python is one of the most complex topics to grasp, let alone implement. It also doesn’t help that there are multiple ways to produce concurrent programs: Should I spin up multiple threads? Use multiple processes? Use asynchronous programming?
Well, the answer there is to use the one that best serves your use case, but when in doubt you should use async IO when you can; use threading when you must.
This post breaks down asynchronous programs both in the older versions of Python (in case you’re dealing with legacy code), as well as the “newer” versions of Python.
Here’s what will be covered:
Get the foundations of advanced concurrency and multithreading, including Monitors, Event Loops, and Deferred Callbacks.
Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop. The model isn’t novel to Python and is implemented in other languages and frameworks too, the most prominent being JavaScript’s NodeJS.
Understanding asyncio with an example:
To understand the concept behind asyncio, let’s consider a restaurant with a single waiter. Suddenly, three customers, Kohli, Amir and John show up. The three of them take a varying amount of time to decide what to eat once they receive the menu from the waiter.
Let’s assume Kohli takes 5 minutes, Amir 10 minutes and John 1 minute to decide. If the single waiter starts with Amir first and takes his order in 10 minutes, next he serves Kohli and spends 5 minutes on noting down his order and finally spends 1 minute to know what John wants to eat. So, in total, he spends 10 + 5 + 1 = 16 minutes to take down their orders. However, notice in this sequence of events, John ends up waiting 15 minutes before the waiter gets to him, Kohli waits 10 minutes and Amir waits 0 minutes.
Now consider if the waiter knew the time each customer would take to decide. He can start with John first, then get to Amir and finally to Kohli. This way each customer would experience a 0 minute wait. An illusion of three waiters, one dedicated to each customer is created even though there’s only one. Lastly, the total time it takes for the waiter to take all three orders is 10 minutes, much less than the 16 minutes in the other scenario.
Those with a JavaScript background would find asyncio very similar to how NodeJS works. NodeJS under the hood has a single-threaded event loop that serves all incoming requests.
It’s very difficult to write code that is thread safe. With asynchronous code, you know exactly where the code will shift from one task to the next and race conditions are much harder to come by.
Threads consume a fair amount of data since each thread needs to have its own stack. With async code, all the code shares the same stack and the stack is kept small due to continuously unwinding the stack between tasks.
Threads are OS structures and therefore require more memory for the platform to support. There is no such problem with asynchronous tasks.
So you’ve started a new job and find that the codebase is riddled with legacy Python code. This section will get you up to speed on the old way of creating asynchronous programs.
There is a lot to cover here, so let’s just dive in. The first concept you’ll want to know is iterables and iterators, this is because they serve as the basis for generators which opened the doors for asynchronous programming.
In Python, an iterable is an object which can be looped over its member elements using a for loop. An iterable is capable of returning its members one by one, where the most common type of iterables in Python are sequences which include lists, strings and tuples.
The __getitem()__
can be invoked to return a member at the specified index. Remember that not every type in Python is a sequence; Dictionaries, sets, file objects and generators aren’t indexable but are iterable. Python also allows us to create iterables that are infinite called generators.
In order to qualify as an iterable, an object must define one of the two methods:
__iter__()
__getitem__()
Iterator is an object which can be used to sequentially access the elements of an iterable object. The iterator exposes __next__()
method in Python 3 and next()
in Python 2. Both the methods fetch the next element in sequence of the iterable object. Note: An iterator must support the following methods:
__iter__()
__next__()
The iterator object returns itself for the __iter__()
method. And this allows us to use the iterator and the iterable in a for loop.
When the end of an iterable object’s iteration is reached, next()
throws a StopIteration
exception. Put together, these rules are called the iterator protocol. The __iter__()
method for a container can also returns something called a generator, which is also an iterator.
Python’s use of yield can both produce values and also give way to them. This becomes particularly important when creating generator functions.
Consider the program below, which returns a string:
def keep_learning_synchronous():
return "Educative"
if __name__ == "__main__":
str = keep_learning_synchronous()
print(str)
By replacing return
with yield
, you’ll notice that what’s returned is a generator object. In fact our method keep_learning_asynchronous()
is now a generator function. Generator functions are called generators because they generate values. In order for the generator object to produce or yield the string from the code snippet above, you can invoke next()
on it.
In summary, we can use yield in a function as yield <expression>.
Yield allows a function to return a value and let the state of the function suspend till next()
is invoked on the associated generator object.
This should provide a solid grounding in how the yield
keyword is used and prepare you to get into more depth with generator functions.
Functions containing a yield
statement are compiled as generators. Using a yield expression in a function’s body causes that function to be a generator. These functions return an object which supports the iteration protocol methods. The generator object created automatically receives a __next()__
method. Going back to the example from the previous section we can invoke __next__
directly on the generator object instead of using next()
:
def keep_learning_asynchronous():
yield "Educative"
if __name__ == "__main__":
gen = keep_learning_asynchronous()
str = gen.__next__()
print(str)
Remember the following about generators:
Generator functions allow you to procrastinate computing expensive values. You only compute the next value when required. This makes generators memory and compute efficient; they refrain from saving long sequences in memory or doing all expensive computations upfront.
Generators, when suspended, retain the code location, which is the last yield statement executed, and their entire local scope. This allows them to resume execution from where they left off.
Generator objects are nothing more than iterators.
Remember to make a distinction between a generator function and the associated generator object which are often used interchangeably. A generator function when invoked returns a generator object and next()
is invoked on the generator object to run the code within the generator function.
A generator goes through the following states:
GEN_CREATED
when a generator object has been returned for the first time from a generator function and iteration hasn’t started.
GEN_RUNNING
when next has been invoked on the generator object and is being executed by the python interpreter.
GEN_SUSPENDED
when a generator is suspended at a yield
GEN_CLOSED
when a generator has completed execution or has been closed.
A generator object exposes different methods that can be invoked to manipulate the generator. These are:
throw()
send()
close()
Python created a distinction between Python generators and generators that were meant to be used as coroutines. These coroutines are called generator-based coroutines and require the decorator @asynio.coroutine
to be added to the function definition, though this isn’t strictly enforced.
Generator based coroutines use yield from
syntax instead of yield
. A coroutine can:
Coroutines in Python make cooperative multitasking possible. Cooperative multitasking is the approach in which the running process voluntarily gives up the CPU to other processes. A process may do so when it is logically blocked, say while waiting for user input or when it has initiated a network request and will be idle for a while.
A coroutine can be defined as a special function that can give up control to its caller without losing its state.
So what’s the difference between coroutines and generators?
Generators are essentially iterators though they look like functions. The distinction between generators and coroutines, in general, is that:
Generators yield back a value to the invoker whereas a coroutine yields control to another coroutine and can resume execution from the point it gives up control.
A generator can’t accept arguments once started whereas a coroutine can.
Generators are primarily used to simplify writing iterators. They are a type of coroutine and sometimes also called as semicoroutines.
The simplest generator based coroutine we can write is as follows:
@asyncio.coroutine
def do_something_important():
yield from asyncio.sleep(1)
The coroutine sleeps for one second. Note the decorator and the use of yield from
. Without, either of them you wouldn’t be able to use the coroutine with asyncio. The yield from
statement gives up control back to the event loop and resumes execution after the coroutine asyncio.sleep()
has completed. Note that asyncio.sleep()
is itself a coroutine. Let us modify this coroutine to call another coroutine which performs the sleep. The changes are shown below:
@asyncio.coroutine
def go_to_sleep(sleep):
print("sleeping for " + str(sleep) + " seconds")
yield from asyncio.sleep(sleep)
@asyncio.coroutine
def do_something_important(sleep):
# what is more important than getting
# enough sleep!
yield from go_to_sleep(sleep)
Now imagine you invoke the coroutine do_something_important()
thrice serially with the values 1, 2 and 3 respectively. Without using threads or multiprocessing the serial code will execute in 1 + 2 + 3 = 6 seconds, however, if you leverage asyncio, the same code can complete in roughly 3 seconds even though all of the invocations run in the same thread.
The intuition is that whenever a blocking operation is encountered the control is passed back to the event loop and execution is only resumed when the blocking operation has completed.
In the case of Python, generators are used as producers of data and coroutines as consumers of data. Before support for native coroutines was introduced in Python 3.5, coroutines were implemented using generators.
Objects of both, however, are of type generator. However, since version 3.5, Python makes a distinction between coroutines and generators.
There are three main elements to creating asynchronous programs in Python: Native coroutines, event loops, and futures. Let’s dive in and examine each.
in Python 3.5 the language introduced support for native coroutines. By native it is meant that the language introduced syntax to specifically define coroutines, making them first class citizens in the language. Native coroutines can be defined using the async/await
syntax. Before getting into further details, here is an example of a very simple native coroutine:
async def coro():
await asyncio.sleep(1)
The above coroutine can be run with an event loop as follows:
loop = asyncio.get_event_loop()
loop.run_until_complete(coro())
Async
We can create a native coroutine by using async def
. A method prefixed with async def
automatically becomes a native coroutine.
async def useless_native_coroutine():
pass
The inspect.iscoroutine()
method would return True
for a coroutine object returned from the above coroutine function. Note that yield
or yield from
can’t appear in the body of an async-defined method, else the occurrence would be flagged as a syntax error.
import inspect
import asyncio
async def useless_native_coroutine():
pass
if __name__ == "__main__":
coro = useless_native_coroutine()
print(inspect.iscoroutine(coro))
//Returns True
Await
await
can be used to obtain the result of a coroutine object’s execution. You use await as:
await <expr>
where <expr>
must be an awaitable object. Awaitable objects must implement the __await__()
method that should return an iterator. If you recall yield from
also expects its argument to be an iterable from which an iterator can be obtained. Under the hood, await
borrows implementation from yield from
with an additional check if its argument is indeed an awaitable. The following objects are awaitable:
A native coroutine object returned from calling a native coroutine function.
A generator based coroutine object returned from a generator decorated with @types.coroutine
or @asyncio.coroutine
. Decorated generator-based coroutines are awaitables, even though they do not have an __await__()
method.
Future objects are awaitable.
Task objects are awaitable and Task is a subclass of Future.
Objects defined with CPython C API with a tp_as_async.am_await()
function, returning an iterator (similar to __await__()
method).
Additionally, await
must appear inside an async-defined method, else it’s a syntax error.
As things stand now, generators are used to refer to functions that produce values only, vanilla coroutines receive values only, generator-based coroutines are identified via the presence of yield from
in the method body and finally native coroutines are defined using the async/await
syntax.
Another way to summarize this discussion is:
Generators return values using yield
for their invokers
Generators that can receive values from outside are coroutines
Generators with yield from
in their function bodies are generator-based coroutines and methods defined using async-def
are native coroutines.
Use the @asyncio.coroutine
or @types.coroutine
decorators on generator-based coroutines to make them compatible with native coroutines.
The event loop is a programming construct that waits for events to happen and then dispatches them to an event handler. An event can be a user clicking on a UI button or a process initiating a file download. At the core of asynchronous programming, sits the event loop.
The concept isn’t novel to Python. In fact, many programming languages enable asynchronous programming with event loops. In Python, event loops run asynchronous tasks and callbacks, perform network IO operations, run subprocesses and delegate costly function calls to pool of threads.
One of the most common use cases you’ll find in the wild is of webservers implemented using asynchronous design. A webserver waits for an HTTP request to arrive and returns the matching resource. Those familiar with JavaScript would recall NodeJS works on the same principle: It is a webserver that runs an event loop to receive web requests in a single thread.
Contrast that to webservers which create a new thread or worse fork a new process, to handle each web request. In some benchmarks, the asynchronous event loop based webservers outperformed multithreaded ones, which may seem counterintuitive.
With Python 3.7+ the preferred way to run the event loop is to use the asyncio.run()
method. The method is a blocking call till the passed-in coroutine finishes. A sample program appears below:
async def do_something_important():
await asyncio.sleep(10)
if __name__ == "__main__":
asyncio.run(do_something_important())
Note: If you are working with Python 3.5, then the asyncio.run()
API isn’t available. In that case, you explicitly retrieve the event loop using asyncio.new_event_loop()
and run your desired coroutine using run_until_complete()
defined on the loop object.
You should never need to start an event loop yourself. Rather, utilize the higher-level APIs to submit coroutines. For instructional purposes, we’ll demonstrate launching event loop per thread. The example in the code sample below uses the API asyncio.new_event_loop()
to get a new event loop and then use it to run another coroutine.
import asyncio, random
from threading import Thread
from threading import current_thread
async def do_something_important(sleep_for):
print("Is event loop running in thread {0} = {1}\n".format(current_thread().getName(),
asyncio.get_event_loop().is_running()))
await asyncio.sleep(sleep_for)
def launch_event_loops():
# get a new event loop
loop = asyncio.new_event_loop()
# set the event loop for the current thread
asyncio.set_event_loop(loop)
# run a coroutine on the event loop
loop.run_until_complete(do_something_important(random.randint(1, 5)))
# remember to close the loop
loop.close()
if __name__ == "__main__":
t1 = Thread(target=launch_event_loops)
t2 = Thread(target=launch_event_loops)
t1.start()
t2.start()
print("Is event loop running in thread {0} = {1}\n".format(current_thread().getName(),
asyncio.get_event_loop().is_running()))
t1.join()
t2.join()
Try it out yourself and examine the output and you’ll realize that each spawned thread is running its own event loop.
There are two types of event loops:
The SelectorEventLoop is based on the selectors module and is the default loop on all platforms. The selectors module contains the poll()
and the select()
APIs that form the secret sauce behind the event loop. ProactorEventLoop, on the other hand, uses Windows’ I/O Completion Ports and is only supported on Windows. We’ll not go into the finer implementation details of the two types but end on a note here that both the type and the associated policy with a loop control the behavior of the event loop.
Future represents a computation that is either in progress or will get scheduled in the future. It is a special low-level awaitable object that represents an eventual result of an asynchronous operation. Don’t confuse threading.Future
and asyncio.Future
.
The former is part of the threading module and doesn’t have an __iter__()
method defined on it. asyncio.Future
is an awaitable and can be used with the yield from
statement. In general you shouldn’t need to deal with futures directly. They are usually exposed by libraries or asyncio APIs.
For instructional purposes we’ll show an example that creates a future that is awaited by a coroutine. Study the snippet below:
import asyncio
from asyncio import Future
async def bar(future):
print("bar will sleep for 3 seconds")
await asyncio.sleep(3)
print("bar resolving the future")
future.done()
future.set_result("future is resolved")
async def foo(future):
print("foo will await the future")
await future
print("foo finds the future resolved")
async def main():
future = Future()
results = await asyncio.gather(foo(future), bar(future))
if __name__ == "__main__":
asyncio.run(main())
print("main exiting")
Both the coroutines are passed a future. The foo()
coroutine awaits for the future to get resolved, while the bar()
coroutine resolves the future after three seconds.
Tasks are like futures, in fact, Task is a subclass of Future and can be created using the following methods:
asyncio.create_task()
introduced in Python 3.7 and preferred way of creating tasks. The method accepts coroutines and wraps them as tasks.
loop.create_task()
only accepts coroutines.
asyncio.ensure_future()
accepts futures, coroutines and any awaitable objects.
Tasks wrap coroutines and run them in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the Future to complete. When the Future is done, the execution of the wrapped coroutine resumes.
Event loops use cooperative scheduling, meaning the event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other tasks, callbacks, or performs IO operations. Tasks can also be cancelled.
We rewrite the future example using tasks as follows:
import asyncio
from asyncio import Future
async def bar(future):
print("bar will sleep for 3 seconds")
await asyncio.sleep(3)
print("bar resolving the future")
future.done()
future.set_result("future is resolved")
async def foo(future):
print("foo will await the future")
await future
print("foo finds the future resolved")
async def main():
future = Future()
loop = asyncio.get_event_loop()
t1 = loop.create_task(bar(future))
t2 = loop.create_task(foo(future))
await t2, t1
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("main exiting")
One of the most prominent uses of coroutines is to chain them to process data pipelines. You can chain coroutines in a fashion similar to how you pipe Unix commands in a shell.
The idea is that the input passes through the first coroutine, which may perform some actions on the input and then passes on the modified data to the second coroutine which may perform additional operations on the input.
The input travels through the chain of coroutines with each coroutine applying some operation on the input until the input reaches the last coroutine from where it is yielded to the original caller.
Let’s consider the following example, which computes the values for the expression x2 + 3 for the first hundred natural numbers.
You manually work the data pipeline using the next()
method so you can setup a chain without worrying about the changes required to make it work with the asyncio’s event loop. The setup is as follows:
The first coroutine produces natural numbers starting from 1.
The second coroutine computes the square of each passed in input.
The last function is a generator and adds 3 to the value passed into it and yields the result.
def coro3(k):
yield (k + 3)
def coro2(j):
j = j * j
yield from coro3(j)
def coro1():
i = 0
while True:
yield from coro2(i)
i += 1
if __name__ == "__main__":
# The first 100 natural numbers evaluated for the following expression
# x^2 + 3
cr = coro1()
for v in range(100):
print("f({0}) = {1}".format(v, next(cr)))
In the example above, the end of the chain consists of a generator, however, this chain wouldn’t run with the asyncio’s event loop since it doesn’t work with generators. One way to fix this is to change the last generator into an ordinary function that returns a future with the result computed. The method coro3()
would change to:
def coro3(k):
f = Future()
f.set_result(k + 3)
f.done()
return f
Yet another way is to tack on the @asyncio.coroutine
onto the coro3()
and return from it instead of yielding. The change would look like as follows:
@asyncio.coroutine
def coro3(k):
return k + 3
An important caveat to consider is that if we instead used the @types.coroutine
decorator the program would fail. This is because @asyncio.coroutine
can convert an ordinary function into a coroutine but @types.coroutine
can’t.
Note that in the previous examples we didn’t decorate coro1()
and coro2()
with @asyncio.coroutine
. Both the functions are generator-based coroutine functions because of the presence of yield from
in their function bodies. Additionally, the appearance of the decorator isn’t strictly enforced but if you put on the decorators the program would still work correctly.
Similar to generators and generator-based coroutines we can also chain native coroutines.
import asyncio
async def coro3(k):
return k + 3
async def coro2(j):
j = j * j
res = await coro3(j)
return res
async def coro1():
i = 0
while i < 100:
res = await coro2(i)
print("f({0}) = {1}".format(i, res))
i += 1
if __name__ == "__main__":
# The first 100 natural numbers evaluated for the following expression
# x^2 + 3
cr = coro1()
loop = asyncio.get_event_loop()
loop.run_until_complete(cr)
Problem
The problem at hand is to implement your own coroutine that sleeps asynchronously. The signature of the coroutine is as follows.
# Implement the following coroutine where
# sleep_for is defined in seconds
async def asleep(sleep_for):
pass
Solution
The first thought to cross your mind will be to use time.sleep()
API to wait out the requested sleeping time. However, the API is a blocking one and will block the thread that executes it. Obviously, this rules out invoking the API using the main thread. But it doesn’t preclude us from executing this API on a different thread.
This insight leads us to a possible solution. We can create a Future object and await it in the asleep()
coroutine. The only requirement is now to have another thread resolve the future after sleep_for
seconds have elapsed. The partial solution looks as follows:
async def asleep(sleep_for):
future = Future()
Thread(target=sync_sleep, args=(sleep_for, future)).start()
await future
def sync_sleep(sleep_for, future):
# sleep synchronously
time.sleep(sleep_for)
# resolve the future
future.set_result(None)
Let’s add the rest and see what we get:
from threading import Thread
from threading import current_thread
from asyncio import Future
import asyncio
import time
async def asleep(sleep_for):
future = Future()
Thread(target=sync_sleep, args=(sleep_for, future)).start()
await future
def sync_sleep(sleep_for, future):
# sleep synchronously
time.sleep(sleep_for)
# resolve the future
future.set_result(None)
print("Sleeping completed in {0}".format(current_thread().getName()), flush=True)
if __name__ == "__main__":
start = time.time()
work = list()
work.append(asleep(1))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(work, return_when=asyncio.ALL_COMPLETED))
print("main program exiting after running for {0}".format(time.time() - start))
Surprisingly, the above program hangs and doesn’t complete even though the message from the method sync_sleep()
is printed. Somehow the coroutine asleep()
is never resumed after the future it is awaiting has been resolved. The reason is that Future isn’t thread-safe. Fortunately, asyncio provides a method to execute a coroutine on a given loop in a thread-safe manner. The API is run_coroutine_threadsafe()
.
So we have a way to resolve the future in a thread-safe manner however, we need to do that in yet another coroutine since the API run_coroutine_threadsafe()
takes in only coroutines. This requires us to slightly modify our sync_sleep()
method as follows:
def sync_sleep(sleep_for, future, loop):
# sleep synchronously
time.sleep(sleep_for)
# define a nested coroutine to resolve the future
async def sleep_future_resolver():
# resolve the future
future.set_result(None)
asyncio.run_coroutine_threadsafe(sleep_future_resolver(), loop)
We define a nested coroutine sleep_future_resolver
that resolves the Future object. Also, note that sync_sleepnow
takes in the event loop as a parameter. This should be the same event loop that executed the asleep()
coroutine in the first place. Changes to asleep()
coroutine are shown below:
async def asleep(sleep_for):
future = Future()
# get the current event loop
current_loop = asyncio.get_running_loop()
Thread(target=sync_sleep, args=(sleep_for, future, current_loop)).start()
await future
Here’s what we have so far:
from threading import Thread
from threading import current_thread
from asyncio import Future
import asyncio
import time
async def asleep(sleep_for):
future = Future()
current_loop = asyncio.get_event_loop()
Thread(target=sync_sleep, args=(sleep_for, future, current_loop)).start()
await future
def sync_sleep(sleep_for, future, loop):
# sleep synchronously
time.sleep(sleep_for)
# define a nested coroutine to resolve the future
async def sleep_future_resolver():
# resolve the future
future.set_result(None)
asyncio.run_coroutine_threadsafe(sleep_future_resolver(), loop)
print("Sleeping completed in {0}\n".format(current_thread().getName()), flush=True)
if __name__ == "__main__":
start = time.time()
work = list()
work.append(asleep(5))
work.append(asleep(5))
work.append(asleep(5))
work.append(asleep(5))
work.append(asleep(5))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(work, return_when=asyncio.ALL_COMPLETED))
print("main program exiting after running for {0}".format(time.time() - start))
The output shows that sleeping takes place in the threads we spawn and not the main thread. Furthermore, even though we submit the asleep()
coroutine five times to sleep for five seconds each but the total runtime of the program is roughly five seconds as it should be if we implemented the solution correctly.
As an exercise consider what would happen if we created five threads and had each thread invoke time.sleep()
, will the program in that case take five or twenty five seconds to complete? Try it out and observe the time taken by the program to complete.
from threading import Thread
from threading import current_thread
import time
def sync_sleep(sleep_for):
time.sleep(sleep_for)
print("Sleeping completed in {0}".format(current_thread().getName()))
if __name__ == "__main__":
start = time.time()
threads = list()
for _ in range(0, 5):
threads.append(Thread(target=sync_sleep, args=(5,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("main program exiting after running for {0}".format(time.time() - start))
The synchronous sleep test still takes five seconds to complete! You may wonder what is the difference between our asynchronous sleep versus synchronous sleep programs? The answer is the asynchronous sleep call is non-blocking whereas the synchronous sleep call is blocking.
Internally though, the scheduler on seeing a thread is about to block on a sleep call for five seconds, switches it out for another thread and only resumes executing it after at least five seconds have elapsed.
If you’d like to dive even deeper into Python concurrency, you can check out Python Concurrency for Senior Engineering Interviews. In this course, you’ll gain the foundations of advanced concurrency and multithreading. You’ll learn or revisit concepts such as Monitors, Event Loops and Deferred Callbacks in depth.
By the end, you’ll be prepared for any advanced Python interview question.
Happy learning!