Async Programming
When your program spends most of its time waiting — for a network response, a database query, a file — async lets you do something else during that wait. One thread, huge concurrency.
Why Async? The Waiting Problem
Traditional (synchronous) code is like a chef who puts pasta on to boil, then stands staring at it for 10 minutes before doing anything else. Async code is a chef who starts the pasta, then chops vegetables while they wait.
import asyncio
import time
# Synchronous — takes 3 seconds (1 + 1 + 1)
def sync_task(name, delay):
time.sleep(delay)
return f"{name} done"
start = time.perf_counter()
r1 = sync_task("A", 1)
r2 = sync_task("B", 1)
r3 = sync_task("C", 1)
print(f"Sync: {time.perf_counter()-start:.2f}s")
# Asynchronous — takes ~1 second (all run concurrently)
async def async_task(name, delay):
await asyncio.sleep(delay) # non-blocking wait
return f"{name} done"
async def main():
start = time.perf_counter()
results = await asyncio.gather(
async_task("A", 1),
async_task("B", 1),
async_task("C", 1),
)
print(f"Async: {time.perf_counter()-start:.2f}s")
print(results)
asyncio.run(main())
Output
async def and await
async def defines a coroutine. await suspends the coroutine until the awaitable is done, letting the event loop run other tasks meanwhile.
import asyncio
async def fetch_data(url: str, delay: float):
"""Simulates an HTTP request."""
print(f" Fetching {url}...")
await asyncio.sleep(delay) # simulate network latency
return {"url": url, "data": f"response from {url}"}
async def main():
# Sequential — waits for each before starting next
print("Sequential:")
r1 = await fetch_data("api/users", 0.5)
r2 = await fetch_data("api/posts", 0.5)
print(f" Got: {r1['url']}, {r2['url']}")
# Concurrent with gather — runs both at the same time
print("Concurrent:")
r1, r2, r3 = await asyncio.gather(
fetch_data("api/users", 0.5),
fetch_data("api/posts", 0.3),
fetch_data("api/comments", 0.4),
)
print(f" Got: {r1['url']}, {r2['url']}, {r3['url']}")
asyncio.run(main())
Output
The event loopasyncio runs a single-threaded event loop. When a coroutine
awaits, it hands control back to the loop, which runs another coroutine. No threads — pure cooperative multitasking.Async Context Managers and Error Handling
import asyncio
async def risky(n):
if n == 0:
raise ValueError("Can't process zero")
await asyncio.sleep(0.1)
return n * 10
async def main():
# Error handling in async — same try/except
try:
result = await risky(0)
except ValueError as e:
print(f"Caught: {e}")
# gather with return_exceptions=True — don't fail on one error
results = await asyncio.gather(
risky(1),
risky(0), # will fail
risky(3),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f" Error: {r}")
else:
print(f" Result: {r}")
# asyncio.create_task — fire and forget
task = asyncio.create_task(risky(5))
print("Task created, doing other work...")
result = await task
print(f"Task result: {result}")
asyncio.run(main())
Output