Database Operations

Database Architecture & Theory

Go Database Ecosystem

Go provides excellent support for both SQL and NoSQL databases through its standard library and rich ecosystem. Understanding database patterns, connection pooling, and transaction management is crucial for building robust applications.

Go Database Architecture Go Application database/sql Interface Standard library abstraction SQL Drivers pq (Postgres) mysql, sqlite3 NoSQL Drivers MongoDB Elasticsearch Cache Drivers Redis Memcached ORMs GORM Ent, Sqlx PostgreSQL ACID, Relations MySQL ACID, Relations MongoDB Documents Redis Key-Value, Cache Elasticsearch Search & Analytics SQLite Embedded Connection Pool Management MaxOpenConns MaxIdleConns ConnMaxLifetime Automatic connection lifecycle management

SQL Databases

Relational databases with ACID properties, structured schemas, and powerful query capabilities using SQL.

ACID Relations Transactions

NoSQL Databases

Document, key-value, graph, and column-family databases designed for scalability and flexible data models.

Scalable Flexible Distributed

Connection Pooling

Efficient management of database connections to optimize performance and resource utilization in concurrent applications.

Pool Performance

PostgreSQL Connection

Connect to PostgreSQL using the standard database/sql package with pq driver.

package main

import (
    "database/sql"
    "fmt"
    "log"
    
    _ "github.com/lib/pq"
)

func connectPostgres() (*sql.DB, error) {
    connStr := "user=username dbname=mydb sslmode=disable"
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        return nil, err
    }
    
    // Verify connection
    if err := db.Ping(); err != nil {
        return nil, err
    }
    
    // Configure connection pool
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(5)
    db.SetConnMaxLifetime(5 * time.Minute)
    
    return db, nil
}

CRUD Operations

Implement Create, Read, Update, and Delete operations with prepared statements.

type User struct {
    ID        int
    Email     string
    Name      string
    CreatedAt time.Time
}

// Create a new user
func createUser(db *sql.DB, user User) (int64, error) {
    query := `
        INSERT INTO users (email, name, created_at) 
        VALUES ($1, $2, $3) 
        RETURNING id`
    
    var id int64
    err := db.QueryRow(query, user.Email, user.Name, time.Now()).Scan(&id)
    return id, err
}

// Read user by ID
func getUserByID(db *sql.DB, id int) (*User, error) {
    var user User
    query := "SELECT id, email, name, created_at FROM users WHERE id = $1"
    
    err := db.QueryRow(query, id).Scan(
        &user.ID, &user.Email, &user.Name, &user.CreatedAt,
    )
    if err == sql.ErrNoRows {
        return nil, fmt.Errorf("user not found")
    }
    return &user, err
}

// Update user
func updateUser(db *sql.DB, user User) error {
    query := "UPDATE users SET email = $1, name = $2 WHERE id = $3"
    
    result, err := db.Exec(query, user.Email, user.Name, user.ID)
    if err != nil {
        return err
    }
    
    rows, err := result.RowsAffected()
    if err != nil {
        return err
    }
    if rows == 0 {
        return fmt.Errorf("user not found")
    }
    return nil
}

// Delete user
func deleteUser(db *sql.DB, id int) error {
    query := "DELETE FROM users WHERE id = $1"
    _, err := db.Exec(query, id)
    return err
}

Transactions

Handle database transactions for atomic operations.

func transferFunds(db *sql.DB, fromID, toID int, amount float64) error {
    // Start transaction
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback() // Rollback if not committed
    
    // Debit from source account
    _, err = tx.Exec(`
        UPDATE accounts 
        SET balance = balance - $1 
        WHERE id = $2 AND balance >= $1`,
        amount, fromID,
    )
    if err != nil {
        return err
    }
    
    // Credit to destination account
    _, err = tx.Exec(`
        UPDATE accounts 
        SET balance = balance + $1 
        WHERE id = $2`,
        amount, toID,
    )
    if err != nil {
        return err
    }
    
    // Commit transaction
    return tx.Commit()
}

GORM ORM Usage

Use GORM for object-relational mapping and simpler database operations.

import (
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
)

type Product struct {
    gorm.Model
    Code  string
    Price uint
}

func gormExample() {
    dsn := "host=localhost user=gorm password=gorm dbname=gorm"
    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        panic("failed to connect database")
    }
    
    // Auto Migrate
    db.AutoMigrate(&Product{})
    
    // Create
    db.Create(&Product{Code: "D42", Price: 100})
    
    // Read
    var product Product
    db.First(&product, 1) // find by primary key
    db.First(&product, "code = ?", "D42")
    
    // Update
    db.Model(&product).Update("Price", 200)
    db.Model(&product).Updates(Product{Price: 200, Code: "F42"})
    
    // Delete
    db.Delete(&product, 1)
}

MongoDB Integration

Connect and work with MongoDB using the official Go driver.

import (
    "context"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "go.mongodb.org/mongo-driver/bson"
)

func mongoExample() {
    // Connect to MongoDB
    client, err := mongo.Connect(context.TODO(), 
        options.Client().ApplyURI("mongodb://localhost:27017"))
    if err != nil {
        panic(err)
    }
    defer client.Disconnect(context.TODO())
    
    // Get collection
    collection := client.Database("testdb").Collection("users")
    
    // Insert document
    user := bson.D{
        {"name", "John Doe"},
        {"email", "john@example.com"},
        {"age", 30},
    }
    result, err := collection.InsertOne(context.TODO(), user)
    
    // Find documents
    cursor, err := collection.Find(context.TODO(), bson.D{{"age", 30}})
    if err != nil {
        panic(err)
    }
    
    var results []bson.M
    if err = cursor.All(context.TODO(), &results); err != nil {
        panic(err)
    }
    
    // Update document
    filter := bson.D{{"name", "John Doe"}}
    update := bson.D{{"$set", bson.D{{"age", 31}}}}
    _, err = collection.UpdateOne(context.TODO(), filter, update)
}

Redis Caching

Implement caching with Redis to improve performance.

import (
    "github.com/go-redis/redis/v8"
    "context"
    "encoding/json"
    "time"
)

func redisExample() {
    // Connect to Redis
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "", // no password
        DB:       0,  // default DB
    })
    
    ctx := context.Background()
    
    // Set key-value
    err := rdb.Set(ctx, "key", "value", 5*time.Minute).Err()
    if err != nil {
        panic(err)
    }
    
    // Get value
    val, err := rdb.Get(ctx, "key").Result()
    if err == redis.Nil {
        fmt.Println("key does not exist")
    } else if err != nil {
        panic(err)
    }
    
    // Cache struct
    user := User{ID: 1, Name: "Alice", Email: "alice@example.com"}
    json, _ := json.Marshal(user)
    rdb.Set(ctx, "user:1", json, 10*time.Minute)
    
    // Retrieve cached struct
    val, _ = rdb.Get(ctx, "user:1").Result()
    var cachedUser User
    json.Unmarshal([]byte(val), &cachedUser)
}

Database Migrations

Manage database schema changes with migration tools.

// Using golang-migrate/migrate
// Install: go get -u github.com/golang-migrate/migrate/v4

// migrations/001_create_users.up.sql
CREATE TABLE IF NOT EXISTS users (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

// migrations/001_create_users.down.sql
DROP TABLE IF EXISTS users;

// Run migrations in Go
import (
    "github.com/golang-migrate/migrate/v4"
    _ "github.com/golang-migrate/migrate/v4/database/postgres"
    _ "github.com/golang-migrate/migrate/v4/source/file"
)

func runMigrations(dbURL string) error {
    m, err := migrate.New(
        "file://migrations",
        dbURL,
    )
    if err != nil {
        return err
    }
    
    // Run all up migrations
    if err := m.Up(); err != nil && err != migrate.ErrNoChange {
        return err
    }
    
    return nil
}

Advanced Database Patterns

Production-ready patterns for scalable and reliable database applications.

// Repository pattern with interface segregation
type UserRepository interface {
    Create(ctx context.Context, user *User) error
    GetByID(ctx context.Context, id string) (*User, error)
    GetByEmail(ctx context.Context, email string) (*User, error)
    Update(ctx context.Context, user *User) error
    Delete(ctx context.Context, id string) error
    List(ctx context.Context, filter UserFilter) ([]*User, error)
}

type PostgresUserRepository struct {
    db *sql.DB
    stmts map[string]*sql.Stmt
}

func NewPostgresUserRepository(db *sql.DB) (*PostgresUserRepository, error) {
    repo := &PostgresUserRepository{
        db: db,
        stmts: make(map[string]*sql.Stmt),
    }
    
    // Prepare statements once
    queries := map[string]string{
        "create": `INSERT INTO users (id, email, name, created_at) VALUES ($1, $2, $3, $4)`,
        "getById": `SELECT id, email, name, created_at, updated_at FROM users WHERE id = $1`,
        "getByEmail": `SELECT id, email, name, created_at, updated_at FROM users WHERE email = $1`,
        "update": `UPDATE users SET email = $2, name = $3, updated_at = $4 WHERE id = $1`,
        "delete": `DELETE FROM users WHERE id = $1`,
        "list": `SELECT id, email, name, created_at, updated_at FROM users WHERE ($1::text IS NULL OR name ILIKE '%' || $1 || '%') ORDER BY created_at DESC LIMIT $2 OFFSET $3`,
    }
    
    for name, query := range queries {
        stmt, err := db.Prepare(query)
        if err != nil {
            return nil, fmt.Errorf("preparing %s statement: %w", name, err)
        }
        repo.stmts[name] = stmt
    }
    
    return repo, nil
}

func (r *PostgresUserRepository) Create(ctx context.Context, user *User) error {
    _, err := r.stmts["create"].ExecContext(ctx, 
        user.ID, user.Email, user.Name, time.Now())
    if err != nil {
        var pqErr *pq.Error
        if errors.As(err, &pqErr) && pqErr.Code == "23505" {
            return ErrUserAlreadyExists
        }
        return fmt.Errorf("creating user: %w", err)
    }
    return nil
}

// Database transaction management
type TxManager struct {
    db *sql.DB
}

func (tm *TxManager) WithTransaction(ctx context.Context, fn func(*sql.Tx) error) error {
    tx, err := tm.db.BeginTx(ctx, &sql.TxOptions{
        Isolation: sql.LevelReadCommitted,
    })
    if err != nil {
        return fmt.Errorf("beginning transaction: %w", err)
    }
    
    defer func() {
        if p := recover(); p != nil {
            tx.Rollback()
            panic(p) // re-throw panic after Rollback
        } else if err != nil {
            tx.Rollback()
        } else {
            err = tx.Commit()
        }
    }()
    
    err = fn(tx)
    return err
}

// Advanced connection pool configuration
func ConfigureConnectionPool(db *sql.DB, config DatabaseConfig) {
    // Maximum number of open connections
    db.SetMaxOpenConns(config.MaxOpenConnections)
    
    // Maximum number of idle connections
    db.SetMaxIdleConns(config.MaxIdleConnections)
    
    // Maximum lifetime of a connection
    db.SetConnMaxLifetime(time.Duration(config.ConnectionMaxLifetime) * time.Second)
    
    // Maximum idle time for a connection
    db.SetConnMaxIdleTime(time.Duration(config.ConnectionMaxIdleTime) * time.Second)
}

// Database health monitoring
type HealthChecker struct {
    db *sql.DB
}

func (hc *HealthChecker) CheckHealth(ctx context.Context) error {
    // Check basic connectivity
    if err := hc.db.PingContext(ctx); err != nil {
        return fmt.Errorf("database ping failed: %w", err)
    }
    
    // Check connection pool stats
    stats := hc.db.Stats()
    if stats.OpenConnections >= stats.MaxOpenConnections {
        return errors.New("connection pool exhausted")
    }
    
    // Perform a simple query
    var result int
    if err := hc.db.QueryRowContext(ctx, "SELECT 1").Scan(&result); err != nil {
        return fmt.Errorf("health check query failed: %w", err)
    }
    
    return nil
}

// Query builder pattern
type QueryBuilder struct {
    table   string
    selects []string
    wheres  []WhereClause
    orders  []OrderClause
    limit   *int
    offset  *int
    args    []interface{}
}

type WhereClause struct {
    Column   string
    Operator string
    Value    interface{}
}

func NewQueryBuilder(table string) *QueryBuilder {
    return &QueryBuilder{
        table: table,
        args:  make([]interface{}, 0),
    }
}

func (qb *QueryBuilder) Select(columns ...string) *QueryBuilder {
    qb.selects = append(qb.selects, columns...)
    return qb
}

func (qb *QueryBuilder) Where(column, operator string, value interface{}) *QueryBuilder {
    qb.wheres = append(qb.wheres, WhereClause{
        Column:   column,
        Operator: operator,
        Value:    value,
    })
    qb.args = append(qb.args, value)
    return qb
}

func (qb *QueryBuilder) OrderBy(column string, direction string) *QueryBuilder {
    qb.orders = append(qb.orders, OrderClause{
        Column:    column,
        Direction: direction,
    })
    return qb
}

func (qb *QueryBuilder) Limit(limit int) *QueryBuilder {
    qb.limit = &limit
    return qb
}

func (qb *QueryBuilder) Build() (string, []interface{}) {
    var query strings.Builder
    
    // SELECT clause
    if len(qb.selects) > 0 {
        query.WriteString("SELECT ")
        query.WriteString(strings.Join(qb.selects, ", "))
    } else {
        query.WriteString("SELECT *")
    }
    
    // FROM clause
    query.WriteString(" FROM ")
    query.WriteString(qb.table)
    
    // WHERE clause
    if len(qb.wheres) > 0 {
        query.WriteString(" WHERE ")
        for i, where := range qb.wheres {
            if i > 0 {
                query.WriteString(" AND ")
            }
            query.WriteString(fmt.Sprintf("%s %s $%d", where.Column, where.Operator, i+1))
        }
    }
    
    // ORDER BY clause
    if len(qb.orders) > 0 {
        query.WriteString(" ORDER BY ")
        orderStrs := make([]string, len(qb.orders))
        for i, order := range qb.orders {
            orderStrs[i] = fmt.Sprintf("%s %s", order.Column, order.Direction)
        }
        query.WriteString(strings.Join(orderStrs, ", "))
    }
    
    // LIMIT clause
    if qb.limit != nil {
        query.WriteString(fmt.Sprintf(" LIMIT %d", *qb.limit))
    }
    
    // OFFSET clause
    if qb.offset != nil {
        query.WriteString(fmt.Sprintf(" OFFSET %d", *qb.offset))
    }
    
    return query.String(), qb.args
}

Database Technology Comparison

Choosing the Right Database

Different databases excel in different scenarios. Understanding their strengths and trade-offs is crucial for architectural decisions.

PostgreSQL

Best for: Complex queries, ACID compliance

  • Full ACID compliance
  • Advanced SQL features
  • JSON/JSONB support
  • Extensible with plugins
  • Strong consistency
ACID JSON

MySQL

Best for: Web applications, read-heavy workloads

  • High performance reads
  • Master-slave replication
  • Wide ecosystem support
  • Storage engine options
  • Easy to scale horizontally
Fast Replicated

MongoDB

Best for: Document storage, flexible schemas

  • Document-oriented
  • Horizontal scaling
  • Flexible schema
  • Built-in sharding
  • Rich aggregation pipeline
Documents Scalable

Redis

Best for: Caching, session storage, real-time analytics

  • In-memory performance
  • Rich data structures
  • Pub/Sub messaging
  • Lua scripting support
  • Persistence options
In-Memory Sub-ms
Database Type ACID Scalability Performance Best Use Case
PostgreSQL Relational Full Vertical High Complex applications, analytics
MySQL Relational Full Horizontal Very High Web applications, CMS
MongoDB Document Limited Horizontal High Content management, IoT
Redis Key-Value No Horizontal Ultra High Caching, real-time features
Elasticsearch Search No Horizontal High Search, log analysis

Production Best Practices

Connection Management

  • Configure appropriate pool sizes
  • Set connection lifetimes
  • Monitor connection usage
  • Implement health checks
  • Handle connection failures gracefully

Query Optimization

  • Use prepared statements
  • Add appropriate indexes
  • Implement query timeouts
  • Monitor slow queries
  • Use connection context cancellation

Security & Reliability

  • Prevent SQL injection attacks
  • Use transactions for atomic operations
  • Implement proper error handling
  • Use database migrations
  • Backup and recovery strategies

Database Performance Checklist

  • Connection pooling: Optimize MaxOpenConns, MaxIdleConns, ConnMaxLifetime
  • Prepared statements: Cache and reuse for better performance
  • Context timeouts: Prevent long-running queries from blocking
  • Transaction boundaries: Keep transactions short and focused
  • Index optimization: Create indexes for frequently queried columns
  • Query monitoring: Log and analyze slow queries regularly

Common Database Anti-Patterns

  • N+1 queries: Loading related data in loops instead of joins
  • Unbounded queries: SELECT without LIMIT on large tables
  • Long transactions: Holding locks for extended periods
  • Connection leaks: Not properly closing database connections
  • SQL injection vulnerabilities: Not using parameterized queries

Database Challenges

Hands-On Database Projects

Master database operations with these practical challenges:

1. Multi-Tenant SaaS Database

Design and implement a multi-tenant database architecture with proper data isolation and efficient querying.

Beginner Multi-tenant

2. Event Sourcing System

Build an event-sourced application with PostgreSQL, implementing event store, snapshots, and projections.

Intermediate Events

3. Real-time Analytics Pipeline

Create a high-throughput data pipeline combining PostgreSQL, Redis, and Elasticsearch for real-time analytics.

Advanced Real-time

4. Distributed Database System

Implement a distributed database with sharding, replication, and consistency guarantees across multiple nodes.

Expert Distributed