The DB-API (PEP 249)
Why Databases Matters
The Problem: In-memory data dies when the process restarts. Files don't scale to concurrent writes. CSV breaks the moment your data has commas.
The Solution: SQL databases provide ACID guarantees, indexes, and concurrent access. SQLAlchemy's Core + ORM give you the right abstraction for every layer of the stack.
Real Impact: Knowing parameterized queries, transactions, and connection pooling is the difference between a toy and a production service.
Real-World Analogy
Think of a database as a vault with a librarian:
- Table = a labelled shelf where similar records live
- Row = one record on the shelf
- Index = the card catalogue that tells the librarian where to find a record fast
- Transaction = borrowing several books at once — all returned together or not at all
- Connection pool = a queue of pre-greeted librarians ready to serve requests
Python's standard database interface — almost every driver implements it the same way.
import sqlite3
with sqlite3.connect("app.db") as conn:
cur = conn.cursor()
cur.execute(
"INSERT INTO users (name, age) VALUES (?, ?)",
("Alice", 30),
)
cur.execute("SELECT id, name FROM users WHERE age > ?", (18,))
for row in cur:
print(row)
⚠️ NEVER build SQL with f-strings
cur.execute(f"SELECT * FROM users WHERE name='{name}'") is a textbook SQL injection. ALWAYS use parameter placeholders (? for sqlite, %s for psycopg, named binds for SQLAlchemy).
PostgreSQL with psycopg
psycopg (version 3) is the canonical PostgreSQL driver. Supports DB-API plus async.
$ pip install "psycopg[binary]"
import psycopg
with psycopg.connect("postgresql://user:pass@localhost/mydb") as conn:
with conn.cursor() as cur:
cur.execute("SELECT id, name FROM users WHERE age > %s", (18,))
for uid, name in cur:
print(uid, name)
Async with psycopg
import psycopg
async def main():
async with await psycopg.AsyncConnection.connect(DSN) as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
print(await cur.fetchone())
SQLAlchemy — The Heavyweight
SQLAlchemy is the de-facto Python ORM. Two layers: Core (SQL expression language) and ORM (object mapping). Both can run sync or async.
$ pip install "sqlalchemy[asyncio]" asyncpg
Declarative ORM
from sqlalchemy import create_engine, String, select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
class Base(DeclarativeBase): pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
email: Mapped[str] = mapped_column(String(100), unique=True)
engine = create_engine("sqlite:///app.db", echo=True)
Base.metadata.create_all(engine)
with Session(engine) as sess:
sess.add(User(name="Alice", email="[email protected]"))
sess.commit()
stmt = select(User).where(User.name == "Alice")
user = sess.scalars(stmt).first()
print(user.email)
Transactions and Connection Pooling
Transactions
with Session(engine) as sess:
try:
sess.add(User(...))
sess.add(Order(...))
sess.commit()
except:
sess.rollback()
raise
# Or use begin() which commits/rollbacks for you
with sess.begin():
sess.add(User(...))
sess.add(Order(...))
Connection Pooling
SQLAlchemy pools connections by default. Configure size and overflow:
engine = create_engine(
DSN,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # detect stale connections
pool_recycle=3600,
)
Async SQLAlchemy
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
engine = create_async_engine("postgresql+asyncpg://user:pw@host/db")
async def get_user(uid):
async with AsyncSession(engine) as sess:
result = await sess.scalar(select(User).where(User.id == uid))
return result
Redis
For caches, queues, rate limits, pub/sub. The redis-py package is the standard client.
import redis
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
r.set("user:42:name", "Alice", ex=3600) # with 1h TTL
r.get("user:42:name") # 'Alice'
r.hset("user:42", mapping={"name": "Alice", "age": 30})
r.hgetall("user:42")
r.incr("page:home:views") # atomic counter
r.lpush("queue", "job1", "job2") # job queue
job = r.brpop("queue", timeout=5)
🎯 Practice Exercises
Exercise 1: CRUD with sqlite3
Build a small tasks table. Insert, list, update done, delete. All via parameterized queries.
Exercise 2: SQLAlchemy models
Define User and Post with a relationship. Add a few rows. Query all posts by a user.
Exercise 3: Transaction safety
Wrap a multi-statement update in a transaction. Trigger a failure halfway through and verify rollback.
Exercise 4: Redis cache
Memoize the result of an expensive function in Redis with a 5-minute TTL. Add a cache-hit counter.