Database Programming

Medium45 min read

The DB-API (PEP 249)

Why Databases Matters

The Problem: In-memory data dies when the process restarts. Files don't scale to concurrent writes. CSV breaks the moment your data has commas.

The Solution: SQL databases provide ACID guarantees, indexes, and concurrent access. SQLAlchemy's Core + ORM give you the right abstraction for every layer of the stack.

Real Impact: Knowing parameterized queries, transactions, and connection pooling is the difference between a toy and a production service.

Real-World Analogy

Think of a database as a vault with a librarian:

  • Table = a labelled shelf where similar records live
  • Row = one record on the shelf
  • Index = the card catalogue that tells the librarian where to find a record fast
  • Transaction = borrowing several books at once — all returned together or not at all
  • Connection pool = a queue of pre-greeted librarians ready to serve requests

Python's standard database interface — almost every driver implements it the same way.

import sqlite3

with sqlite3.connect("app.db") as conn:
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO users (name, age) VALUES (?, ?)",
        ("Alice", 30),
    )
    cur.execute("SELECT id, name FROM users WHERE age > ?", (18,))
    for row in cur:
        print(row)

⚠️ NEVER build SQL with f-strings

cur.execute(f"SELECT * FROM users WHERE name='{name}'") is a textbook SQL injection. ALWAYS use parameter placeholders (? for sqlite, %s for psycopg, named binds for SQLAlchemy).

PostgreSQL with psycopg

psycopg (version 3) is the canonical PostgreSQL driver. Supports DB-API plus async.

$ pip install "psycopg[binary]"
import psycopg

with psycopg.connect("postgresql://user:pass@localhost/mydb") as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT id, name FROM users WHERE age > %s", (18,))
        for uid, name in cur:
            print(uid, name)

Async with psycopg

import psycopg

async def main():
    async with await psycopg.AsyncConnection.connect(DSN) as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 1")
            print(await cur.fetchone())

SQLAlchemy — The Heavyweight

SQLAlchemy is the de-facto Python ORM. Two layers: Core (SQL expression language) and ORM (object mapping). Both can run sync or async.

$ pip install "sqlalchemy[asyncio]" asyncpg

Declarative ORM

from sqlalchemy import create_engine, String, select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session

class Base(DeclarativeBase): pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50))
    email: Mapped[str] = mapped_column(String(100), unique=True)

engine = create_engine("sqlite:///app.db", echo=True)
Base.metadata.create_all(engine)

with Session(engine) as sess:
    sess.add(User(name="Alice", email="[email protected]"))
    sess.commit()

    stmt = select(User).where(User.name == "Alice")
    user = sess.scalars(stmt).first()
    print(user.email)

Transactions and Connection Pooling

Transactions

with Session(engine) as sess:
    try:
        sess.add(User(...))
        sess.add(Order(...))
        sess.commit()
    except:
        sess.rollback()
        raise

# Or use begin() which commits/rollbacks for you
with sess.begin():
    sess.add(User(...))
    sess.add(Order(...))

Connection Pooling

SQLAlchemy pools connections by default. Configure size and overflow:

engine = create_engine(
    DSN,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,        # detect stale connections
    pool_recycle=3600,
)

Async SQLAlchemy

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

engine = create_async_engine("postgresql+asyncpg://user:pw@host/db")

async def get_user(uid):
    async with AsyncSession(engine) as sess:
        result = await sess.scalar(select(User).where(User.id == uid))
        return result

Redis

For caches, queues, rate limits, pub/sub. The redis-py package is the standard client.

import redis

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

r.set("user:42:name", "Alice", ex=3600)    # with 1h TTL
r.get("user:42:name")                       # 'Alice'

r.hset("user:42", mapping={"name": "Alice", "age": 30})
r.hgetall("user:42")

r.incr("page:home:views")                  # atomic counter

r.lpush("queue", "job1", "job2")              # job queue
job = r.brpop("queue", timeout=5)

🎯 Practice Exercises

Exercise 1: CRUD with sqlite3

Build a small tasks table. Insert, list, update done, delete. All via parameterized queries.

Exercise 2: SQLAlchemy models

Define User and Post with a relationship. Add a few rows. Query all posts by a user.

Exercise 3: Transaction safety

Wrap a multi-statement update in a transaction. Trigger a failure halfway through and verify rollback.

Exercise 4: Redis cache

Memoize the result of an expensive function in Redis with a 5-minute TTL. Add a cache-hit counter.