9 min read

Python Async Programming for Data Engineers: Harnessing the Power of Concurrency

Boost Your Python Applications with Async Programming: Learn how to leverage asyncio for efficient, non-blocking I/O operations, handle concurrency, and avoid the pitfalls of traditional threading and multiprocessing.
Python Async Programming for Data Engineers: Harnessing the Power of Concurrency
Photo by Isaac Mitchell / Unsplash

Let me start by saying that I love Python. I truly believe that it is the best programming language in the world, but it is not without faults. The Zen of Python contains the following line:

There should be one-- and preferably only one --obvious way to do it.

If you have been programming for a while in Python, you can see how far the maintainers and the community as a whole are starting to stray from this principle. Parallel/concurrent computing is a prime example of this, but I guarantee it's worth exploring all the different options Python provides for us.

In this article, we'll explore async programming in Python using the powerful asyncio library. We'll start with the basics and gradually build up to more complex concepts, providing real-life examples along the way. By the end of this tutorial, you'll have a solid understanding of how to write efficient and non-blocking code in Python and we'll take a look at some Data Engineering use cases as well.

We'll cover the following topics:

  • A short comparison of the Threading, Multiprocessing, and Asyncio libraries
  • Introduction to Async programming in Python using Asyncio
  • Some more advanced Asyncio features
  • Async programming in Data Engineering
  • When NOT to use Asyncio

0. Threads? Multiprocessing?

Alright, I know your first question is going to be about the other similar libraries available in Python and how they relate to asyncio. They all have their place in the Python ecosystem, so let's first take a short look at Threading and Multiprocessing, so we have all the context we need to dive deeper into the world of asyncio.

Threading

Concurrency Model: Threading in Python allows multiple threads to execute concurrently within a single process. However, due to the Global Interpreter Lock (GIL), only one thread can execute Python bytecode at a time, limiting the true parallelism in CPU-bound tasks.

flowchart LR subgraph Thread 1 A(Task A) B(Task B) C(Task C) end subgraph Shared Memory direction LR M1[Data M1] M2[Data M2] end subgraph Thread 2 D(Task D) E(Task E) F(Task F) end A-->B-->C D-->E-->F A-->|Shared Memory| M1 B-->|Shared Memory| M2 D-->|Shared Memory| M1

Use Cases: Threading is well-suited for I/O-bound tasks where threads can efficiently wait for I/O operations to complete without blocking the entire program. Examples include network requests, file I/O, and database queries.

Pros:

  • Simplified programming model compared to multiprocessing.
  • Efficient for I/O-bound tasks due to Python's built-in I/O operations releasing the GIL.
  • Well-suited for tasks with many blocking I/O operations.

Cons:

  • Limited CPU-bound performance due to the GIL preventing true parallel execution.
  • Debugging can be complex due to potential race conditions and thread synchronization issues.
  • Not ideal for CPU-bound tasks or scenarios requiring full utilization of multi-core CPUs.

Multiprocessing

Concurrency Model: Multiprocessing in Python involves creating multiple processes, each with its own Python interpreter and memory space. These processes can execute in parallel across multiple CPU cores, avoiding the GIL limitations present in threading.

flowchart LR subgraph Process 1 A(Task A) B(Task B) C(Task C) end subgraph Process 2 D(Task D) E(Task E) F(Task F) end A-->|Process 1| B A-->|Process 1| C D-->|Process 2| E D-->|Process 2| F subgraph Memory M1[Data M1] M2[Data M2] M3[Data M3] M4[Data M4] end A-->|Separate Memory| M1 B-->|Separate Memory| M2 D-->|Separate Memory| M4 E-->|Separate Memory| M3

Use Cases: Multiprocessing is suitable for CPU-bound tasks that can benefit from true parallelism across multiple CPU cores. Examples include intensive mathematical computations, image processing, and simulations.

Pros:

  • True parallelism and improved performance for CPU-bound tasks.
  • Isolation of processes ensures that one process does not impact others in case of an error.
  • Straightforward to debug and manage, as each process operates independently.

Cons:

  • Higher memory overhead compared to threading due to separate Python interpreter instances.
  • A more complex programming model compared to threading, as data sharing between processes, requires explicit mechanisms like queues and shared memory.

Here's a small comparison table to help you decide which framework to use.

AspectAsyncioThreadingMultiprocessing
ConcurrencyCooperativePreemptivePreemptive
PerformanceEfficient I/O-boundLimited CPU-boundTrue parallelism in CPU-bound tasks
Use CasesI/O-bound tasksI/O-bound tasksCPU-bound tasks
ProgrammingCoroutinesThreadsProcesses
GIL ImpactGIL not an issueGIL restricts true parallelismNo GIL constraints
Resource UsageMinimalModerateHigher memory overhead
DebuggingModerateComplex race conditionsStraightforward

With that out of the way, let's dive into Async programming and asyncio!

1. Introduction to Async Programming

What is Async Programming?

Async programming allows us to write concurrent and non-blocking code. Instead of waiting for tasks to complete, we can let the program move on to other tasks while waiting for the original tasks to finish. This is particularly useful for I/O-bound operations and network requests, as it maximizes the CPU's utilization.

Using asyncio in Python provides a safe and efficient approach to concurrency. It simplifies asynchronous code with coroutines and async/await syntax. Unlike threads or multiprocessing, asyncio is non-blocking, allowing tasks to run concurrently without the need for complex synchronization. It effectively utilizes multi-core CPUs, making it ideal for I/O-bound and CPU-bound tasks.

Additionally, asyncio's single-threaded nature eases debugging, while its ecosystem offers a range of libraries for various tasks. Overall, it's safe to say that asyncio is the future of concurrent Python development.

The asyncio Library

Python's asyncio library provides the infrastructure for async programming. It introduces two fundamental concepts: coroutines and tasks.

Sync vs Async program flow

Coroutines

Coroutines are special functions defined using the async def syntax. They allow us to write asynchronous code in a more natural way. Coroutines can be paused using the await keyword, which allows other tasks to run in the meantime.

Let's start with a simple example to understand coroutines:

import asyncio

async def greet(name):
    await asyncio.sleep(1)  # Simulate some asynchronous operation
    return f"Hello, {name}!"

async def main():
    result = await greet("Alice")
    print(result)

asyncio.run(main())

Output:

Hello, Alice!

In this example, the greet() coroutine simulates an asynchronous operation using await asyncio.sleep(1), and the main() coroutine waits for the result of greet() using await. While waiting for greet() to complete, other tasks can run, making the code non-blocking.

Notice, that we use asyncio.run to execute the main coro of our program.

This function runs the passed coroutine, taking care of managing the asyncio event loop, finalizing asynchronous generators, and closing the threadpool.

2. Running Multiple Coroutines Concurrently

Creating Tasks

Tasks are used to run coroutines concurrently. We can create tasks using the asyncio.create_task() function. Tasks run in the background; we can await their results when needed.

Let's create tasks for multiple coroutines:

import asyncio

async def greet(name):
    await asyncio.sleep(1)  # Simulate some asynchronous operation
    return f"Hello, {name}!"

async def main():
    task1 = asyncio.create_task(greet("Alice"))
    task2 = asyncio.create_task(greet("Bob"))
    task3 = asyncio.create_task(greet("Charlie"))

    results = await asyncio.gather(task1, task2, task3)
    for result in results:
        print(result)

asyncio.run(main())

Output:

Hello, Alice!
Hello, Bob!
Hello, Charlie!

In this example, we create tasks for three greet() coroutines and use asyncio.gather() to wait for all tasks to complete. This allows us to run multiple coroutines concurrently.

3. Task Cancellation and Timeouts

Task Cancellation

We can easily cancel tasks in asyncio. When a task is canceled, it raises an asyncio.CancelledError. We can handle the cancellation gracefully using try/except blocks.

Let's see an example of canceling a task:

import asyncio

async def long_running_task():
    try:
        print("Task started.")
        await asyncio.sleep(10)
        print("Task completed.")
    except asyncio.CancelledError:
        print("Task was cancelled.")

async def main():
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(5)  # Wait for 5 seconds
    task.cancel()  # Cancel the task

asyncio.run(main())

Output:

Task started.
Task was cancelled.

In this example, we start the long_running_task() coroutine and then cancel it after 5 seconds. The coroutine handles the cancellation by catching the asyncio.CancelledError.

Timeouts

Timeouts can be used to limit the amount of time spent waiting for an awaitable to complete. We can use asyncio.wait_for() to specify a maximum time for a coroutine to finish.

Let's use a timeout to limit the execution time of a coroutine:

import asyncio

async def long_running_task():
    await asyncio.sleep(10)
    return "Task completed!"

async def main():
    try:
        result = await asyncio.wait_for(long_running_task(), timeout=5)
        print(result)
    except asyncio.TimeoutError:
        print("Task timed out!")

asyncio.run(main())

Output:

Task timed out!

In this example, we set a timeout of 5 seconds for the long_running_task() coroutine. Since the task takes 10 seconds to complete, it raises an asyncio.TimeoutError after the specified timeout.

4. Working with Task Groups

The asyncio.TaskGroup

asyncio.TaskGroup is a more modern alternative to asyncio.create_task() for creating and managing tasks. It provides a convenient and reliable way to wait for all tasks in the group to finish.

Let's use asyncio.TaskGroup to manage tasks:

import asyncio

async def long_running_task(task_name):
    await asyncio.sleep(5)
    print(f"Task {task_name} completed!")

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(long_running_task("A"))
        tg.create_task(long_running_task("B"))
        tg.create_task(long_running_task("C"))

    print("All tasks completed!")

asyncio.run(main())

Output:

Task A completed!
Task B completed!
Task C completed!
All tasks completed!

In this example, we used asyncio.TaskGroup to create and manage tasks for the long_running_task() coroutine. The with statement ensures that we wait for all tasks to be completed before proceeding.

Let's say you want to fetch data from an API but don't want to iterate over all pages sequentially.

import asyncio
import httpx

async def fetch_data(session, url):
    async with session.get(url) as response:
        data = await response.json()
        print(f"Fetched data from {url}")
        # save to disk
        return data

async def main():
    base_url = "https://api.example.com/data/"
    pages = [1, 2, 3]

    async with httpx.AsyncClient() as session, asyncio.TaskGroup() as tg:
        for page in pages:
            tg.create_task(fetch_data(session, f"{base_url}?page={page}"))

    # No need to use await here as the TaskGroup takes care of the completion of tasks.
    print("All API calls completed!")

asyncio.run(main())

In this example, we use asyncio.TaskGroup() to create tasks that fetch data from different pages of the same API endpoint concurrently. The async with statement ensures that the tasks are created and managed within the context of the task group.

Each task is created using tg.create_task() with the fetch_data function and the corresponding URL for each page. The tasks are executed concurrently by the task group, leveraging the non-blocking nature of asyncio.

Once the async with block is exited, the task group ensures that all tasks are completed before moving to the next line of code. Therefore, the print("All API calls completed!") statement will be executed after all API calls are finished, demonstrating the power of asyncio's concurrency in data engineering tasks.

5. Async programming in Data Engineering

Async programming has several real-life use cases for data engineers, where it can significantly improve performance and efficiency in handling large-scale data processing tasks.

Here are some common scenarios:

  1. Data Ingestion: When ingesting data from various sources, such as APIs, databases, or streaming platforms, data engineers can use async programming to perform concurrent data retrieval. This allows fetching multiple data streams simultaneously, reducing the overall data ingestion time.
  2. Data Loading: When loading large volumes of data into data warehouses or databases, async programming can be utilized to insert data into different tables concurrently, minimizing the overall data loading time.
  3. Web Scraping: In scenarios where data engineers need to collect data from various websites or APIs, async programming can be used to perform multiple requests concurrently, making web scraping more efficient.

Overall, async programming offers data engineers a powerful toolset to optimize data processing, reduce execution times, and build scalable data systems. It enables them to leverage the full potential of modern hardware and distributed architectures to handle large-scale data challenges effectively.

6. When NOT to use asyncio

While asyncio is a powerful tool for concurrent programming in Python, it may not always be the best choice for every situation. Here are some examples of when NOT to use asyncio:

  1. CPU-Bound Tasks: Asyncio is most suitable for I/O-bound tasks that involve waiting for external resources like network requests or file I/O. However, for CPU-bound tasks that require heavy computation and do not involve much waiting, asyncio may not provide significant benefits. In such cases, using multiprocessing or multithreading might be more appropriate.
  2. Blocking Libraries: If you need to work with third-party libraries that are synchronous and block the event loop, using asyncio may not be effective. Some libraries are not designed to work with asynchronous code and may not provide the benefits of concurrency. In such cases, consider alternatives like threading or multiprocessing.
  3. Complex Synchronization: When dealing with complex synchronization and shared resources, asyncio's cooperative multitasking may become challenging to manage. In scenarios with intricate inter-task dependencies or where low-level control over thread execution is required, traditional multithreading or multiprocessing might be more suitable.
  4. Existing Codebase: If you have a large existing synchronous codebase and want to introduce asynchronous features, refactoring the entire codebase to work with asyncio can be time-consuming and error-prone. In such cases, it might be more practical to stick with traditional synchronous programming or gradually introduce asyncio in specific parts of the codebase.

Remember, the choice of programming paradigm depends on the specific requirements and characteristics of your application. While asyncio is a powerful tool for handling I/O-bound tasks and asynchronous programming, it's essential to evaluate the needs of your project and consider alternatives when necessary.

6. Conclusion

In this article, we covered the basics of async programming in Python using the asyncio library. We learned about coroutines, tasks, task cancellation, timeouts, and working with task groups. With async programming, we can write efficient and non-blocking code that can handle multiple tasks concurrently.

Async programming is especially beneficial for I/O-bound tasks and network requests, as it allows us to utilize CPU resources more effectively. By mastering async programming in Python, you can build high-performance and scalable applications. Keep exploring and experimenting with async programming to become a proficient async Python developer!