Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Chapter 26: Network Programming

Project 4: Distributed Key-Value Store with Replication

Problem Statement

Build a distributed key-value database that evolves from a simple in-memory HashMap to a production-ready replicated system with persistence, leader election, and consistency guarantees. You’ll start with basic TCP commands (GET/SET/DELETE), add write-ahead logging for durability, implement async replication to followers, upgrade to synchronous quorum writes for strong consistency, add automatic leader election on failures, and finish with client-side connection pooling and smart routing.

Why It Matters

Real-World Impact: Distributed key-value stores are the foundation of modern infrastructure:

  • Redis: 10M+ deployments, powers caching for Twitter, GitHub, StackOverflow (100K+ ops/sec per instance)
  • etcd: Kubernetes control plane storage, manages cluster state for millions of containers
  • Consul: Service discovery and configuration for HashiCorp Vault, Netflix microservices
  • DynamoDB: AWS’s managed KV store, handles 10+ trillion requests/day
  • Riak: Distributed database for chat systems (WhatsApp used it for 900M users)

Performance Numbers:

  • Single-node: 100K reads/sec, 50K writes/sec (memory-bound)
  • Async replication: 30K writes/sec (3x slower than no replication, but durable)
  • Quorum writes (N=3, W=2): 15K writes/sec (consistency cost), but survives 1 node failure
  • Read scaling: 3 replicas = 300K reads/sec (linear scaling with replicas)
  • Failover time: Manual = minutes, automatic leader election = 2-5 seconds

Rust-Specific Challenge: Distributed systems require careful handling of concurrent mutable state, network failures, and partial failures. Rust’s ownership system prevents many classes of bugs (use-after-free, data races) that plague distributed systems in other languages. This project teaches you to use Arc<RwLock>, async networking, and message passing to build reliable distributed systems that handle failures gracefully.

Use Cases

When you need this pattern:

  1. Caching layer - Speed up database queries, API responses (Redis pattern)
  2. Session storage - Distributed web sessions across servers (sticky sessions without stickiness)
  3. Configuration management - Distribute config to microservices (etcd/Consul pattern)
  4. Service discovery - Track which services are available at which addresses
  5. Distributed locks - Coordinate exclusive access across servers (leader election, cron job deduplication)
  6. Feature flags - Toggle features dynamically across fleet (LaunchDarkly pattern)
  7. Metadata storage - Store file locations in distributed filesystem (HDFS NameNode pattern)

Real Examples:

  • Redis Cluster: Sharded KV store with automatic failover, 1000 nodes max
  • etcd: Raft consensus for strong consistency, used by Kubernetes, CloudFoundry
  • Cassandra: Eventually consistent KV store, Netflix uses it for 2.5 trillion ops/day
  • Memcached: Simple KV cache, Facebook uses 800+ servers with 28 TB of RAM

Learning Goals

  • Master TCP client-server patterns with custom protocols
  • Understand write-ahead logging (WAL) for durability
  • Learn async vs sync replication trade-offs
  • Practice quorum-based consistency (CAP theorem in action)
  • Implement leader election (simplified Raft/Paxos)
  • Build connection pooling and client-side routing
  • Experience distributed systems failure modes

Core Concepts

Before building a distributed key-value store, let’s understand the fundamental concepts that make distributed databases work:

1. Key-Value Store Fundamentals

What is a Key-Value Store? A key-value store is the simplest form of database: a giant HashMap that maps keys to values, accessible over the network.

#![allow(unused)]
fn main() {
// In-memory version
let mut db: HashMap<String, String> = HashMap::new();
db.insert("user:123".to_string(), "Alice".to_string());
let value = db.get("user:123"); // Some("Alice")
}

Network Protocol:

Client → Server: GET user:123\n
Server → Client: VALUE Alice\n

Client → Server: SET user:123 Bob\n
Server → Client: OK\n

Client → Server: DELETE user:123\n
Server → Client: OK\n

Why KV Stores?

  • Simple API: Just GET/SET/DELETE (vs SQL with complex queries)
  • Fast: O(1) lookups with hash table
  • Scalable: Easy to partition data across machines
  • Foundation: Building block for complex databases (Redis, DynamoDB, etcd)

Use Cases:

  • Caching: Store frequently accessed data (avoid DB queries)
  • Sessions: Web session storage across servers
  • Counters: Real-time metrics (page views, likes)
  • Configuration: Distributed config for microservices

2. Write-Ahead Logging (WAL)

The Problem: In-memory data is lost on crash. How do we make it durable?

The Solution: Write-Ahead Log

Before modifying in-memory state, write the operation to an append-only log file on disk. On crash, replay the log to rebuild state.

WAL Pattern:

#![allow(unused)]
fn main() {
// Every write operation:
1. Append operation to log file
2. Call fsync() to force disk write
3. Update in-memory HashMap
4. Return OK to client

// On server restart:
1. Read entire WAL file
2. Replay each operation
3. Rebuild in-memory HashMap
4. Resume normal operation
}

Example:

# wal.log
SET key1 value1
SET key2 value2
DELETE key1
SET key3 value3

# After replay:
HashMap = {key2: value2, key3: value3}

Performance Cost:

#![allow(unused)]
fn main() {
// Without WAL (memory-only)
HashMap.insert(key, value);  // ~0.01ms
→ 50,000 writes/sec

// With WAL (durable)
wal.append(SET key value);   // ~1ms (disk I/O + fsync)
HashMap.insert(key, value);  // ~0.01ms
→ 10,000 writes/sec
}

Why fsync() is Critical:

#![allow(unused)]
fn main() {
// WITHOUT fsync - data can be lost!
file.write(b"SET key value\n")?;  // Writes to OS buffer (not disk!)
// CRASH → Data in buffer is lost

// WITH fsync - guaranteed durability
file.write(b"SET key value\n")?;
file.sync_all()?;  // Forces OS to flush to physical disk
// CRASH → Data is on disk, safe!
}

WAL is used by:

  • PostgreSQL (write-ahead log for ACID transactions)
  • Redis (AOF - Append-Only File)
  • etcd, Consul, Raft implementations

3. Replication: Async vs Synchronous

Why Replicate?

  • Durability: Multiple copies survive disk failures
  • Availability: System continues if one server fails
  • Read Scaling: Distribute reads across replicas

Async Replication (Fire-and-Forget):

#![allow(unused)]
fn main() {
// Master receives write
master.set("key", "value");

// Send to replicas WITHOUT waiting
for replica in replicas {
    tokio::spawn(async move {
        replica.send("REPLICATE SET key value").await;
    });
}

// Immediately return OK to client
return Ok(());
}

Pros: Fast writes (don’t wait for replicas) Cons: Data loss if master crashes before replicas receive write

Synchronous Replication (Wait for ACK):

#![allow(unused)]
fn main() {
// Master receives write
master.set("key", "value");

// Send to replicas and WAIT for acknowledgments
let (tx, mut rx) = mpsc::channel(replicas.len());

for replica in replicas {
    tokio::spawn(async move {
        replica.send("REPLICATE SET key value").await;
        let ack = replica.recv_ack().await;
        tx.send(ack).await;
    });
}

// Wait for W replicas to acknowledge
let mut acks = 0;
while let Some(ack) = rx.recv().await {
    acks += 1;
    if acks >= WRITE_QUORUM {
        break;
    }
}

return Ok();  // Guaranteed on W replicas
}

Pros: No data loss (data on multiple servers before OK) Cons: Slower writes (wait for network + replica processing)

Performance Comparison:

Async Replication:
  Write latency: 1ms (local write only)
  Throughput: 30,000 writes/sec
  Data loss risk: YES (if master crashes)

Sync Replication (W=2 out of N=3):
  Write latency: 5ms (local + network + replica)
  Throughput: 15,000 writes/sec
  Data loss risk: NO (data on 2 servers)

4. Quorum-Based Consistency (CAP Theorem)

CAP Theorem: You can only have 2 out of 3:

  • Consistency: All nodes see the same data
  • Availability: System responds to all requests
  • Partition tolerance: System works despite network failures

In Practice: Network partitions happen, so choose between C and A.

Quorum Writes (Choose Consistency):

N = Total replicas (e.g., 3)
W = Write quorum (e.g., 2) - must ack before success
R = Read quorum (e.g., 2) - must read from this many

Consistency guarantee: If W + R > N, reads see committed writes
Example: N=3, W=2, R=2 → 2+2 > 3 ✓ Strong consistency

How Quorum Works:

Write "key=value" with N=3, W=2:

Client → Master: SET key value
Master → Replica1: REPLICATE SET key value
Master → Replica2: REPLICATE SET key value
Master → Replica3: REPLICATE SET key value

Wait for 2 ACKs:
Replica1 → Master: ACK ✓
Replica2 → Master: ACK ✓
[Replica3 is slow/dead, ignore]

Master → Client: OK (data is durable on 2/3 nodes)

Fault Tolerance:

N=3, W=2, R=2:
  Can survive: 1 node failure
  - Writes: 2 nodes still form quorum
  - Reads: 2 nodes still available

N=5, W=3, R=3:
  Can survive: 2 node failures
  - Writes: 3 nodes still form quorum
  - Reads: 3 nodes still available

Trade-offs:

Higher W (stronger consistency):
  ✓ More durable (data on more nodes)
  ✗ Slower writes (wait for more ACKs)
  ✗ Less available (more nodes must be up)

Lower W (higher availability):
  ✓ Faster writes
  ✓ More available (fewer nodes needed)
  ✗ Less durable

5. Leader Election and Raft Consensus

The Problem: In a replicated system, who decides the order of writes?

The Solution: Elect one node as the leader. Only the leader accepts writes.

Leader Election Algorithm (Simplified Raft):

States:

  • Leader: Accepts writes, sends heartbeats
  • Follower: Redirects writes to leader, receives replication
  • Candidate: Requesting votes to become leader

Normal Operation:

Leader → Followers: HEARTBEAT (every 1 second)
Followers: "Leader is alive, don't start election"

Leader Failure:

Time 0:  Leader sends heartbeats
Time 5:  Leader crashes (no more heartbeats)
Time 6:  Follower times out (no heartbeat for 5s)
Time 6:  Follower becomes Candidate
         - Increment term: 1 → 2
         - Vote for self
         - Request votes from other nodes

Time 6.1: Candidate → Other nodes: VOTE_REQUEST term=2
Time 6.2: Nodes respond: VOTE_GRANTED (if haven't voted)
Time 6.3: Candidate receives majority → becomes Leader
Time 6.4: New Leader → All: HEARTBEAT (establish leadership)

Voting Rules: A node grants a vote if:

  1. Candidate’s term is higher than current term
  2. Node hasn’t voted for anyone else this term

Majority Quorum:

3 nodes: Need 2 votes (majority)
5 nodes: Need 3 votes (majority)
7 nodes: Need 4 votes (majority)

Formula: (N / 2) + 1

Split Vote Handling:

4-node cluster, 2 candidates start election simultaneously:

Candidate A: Votes from A, B (2/4 - not majority)
Candidate B: Votes from C, D (2/4 - not majority)

No majority → Election times out → Retry with higher term
Random timeouts prevent repeated split votes

Why Raft?

  • Safety: At most one leader per term
  • Liveness: Eventually elects a leader (if majority available)
  • Understandable: Simpler than Paxos
  • Production-proven: etcd, Consul, TiKV use it

6. Distributed Systems Failure Modes

Network Partitions (Split Brain):

Before:
  [Leader - Replica1 - Replica2]  (all connected)

After network split:
  [Leader] | [Replica1 - Replica2]  (network partition)

Without quorum:
  - Leader thinks it's still leader (bad!)
  - Replica1 could become new leader (two leaders!)

With quorum (N=3, W=2):
  - Leader can't reach quorum → stops accepting writes ✓
  - Replica1+Replica2 can elect new leader ✓
  - Only one side accepts writes (safe)

Partial Failures:

Scenario: Master receives write, sends to 3 replicas

Replica1: ACK (success)
Replica2: Timeout (network slow)
Replica3: NACK (disk full)

Question: Did the write succeed?
Answer: Depends on quorum!
  - W=2: YES (1 replica + master = 2)
  - W=3: NO (only 1 replica confirmed)

Clock Skew:

Server A: time = 10:00:00.000
Server B: time = 10:00:05.123 (5 seconds ahead!)

Problem: Can't use timestamps for ordering
Solution: Logical clocks (version numbers, Lamport clocks)

Byzantine Failures: Nodes lie or behave maliciously (not covered here, see BFT algorithms)

7. Connection Pooling

The Problem: Creating TCP connections is expensive.

TCP 3-Way Handshake:

Client → Server: SYN (synchronize)
Server → Client: SYN-ACK
Client → Server: ACK
[Connection established - took 1-3ms]

Total overhead: 1-3ms per connection

Without Pooling:

#![allow(unused)]
fn main() {
for _ in 0..1000 {
    let stream = TcpStream::connect("db:6379")?;  // 3ms handshake
    stream.write(b"GET key\n")?;                   // 0.5ms
    stream.read(&mut buf)?;                        // 0.5ms
}
// Total: 1000 * (3ms + 0.5ms + 0.5ms) = 4000ms = 4 seconds
}

With Pooling:

#![allow(unused)]
fn main() {
let pool = ConnectionPool::new("db:6379", 10);

for _ in 0..1000 {
    let stream = pool.acquire()?;  // 0ms (reuse existing)
    stream.write(b"GET key\n")?;   // 0.5ms
    stream.read(&mut buf)?;        // 0.5ms
    // stream returns to pool on drop
}
// Total: 3ms (first conn) + 1000 * 1ms = 1003ms = 1 second
}

Connection Pool Implementation:

#![allow(unused)]
fn main() {
struct ConnectionPool {
    available: Arc<Mutex<VecDeque<TcpStream>>>,
    max_size: usize,
}

impl ConnectionPool {
    fn acquire(&self) -> PooledConnection {
        let mut pool = self.available.lock();

        // Reuse connection if available
        if let Some(stream) = pool.pop_front() {
            return PooledConnection { stream, pool };
        }

        // Otherwise create new
        let stream = TcpStream::connect(addr)?;
        PooledConnection { stream, pool }
    }
}

// Auto-return to pool on drop
impl Drop for PooledConnection {
    fn drop(&mut self) {
        self.pool.lock().push_back(self.stream);
    }
}
}

Benefits:

  • Lower latency: 1-3ms saved per request
  • Higher throughput: 3-10x more requests/sec
  • TCP window tuning: Reused connections have optimized TCP window
  • TLS session resumption: Reuse TLS sessions (if using HTTPS)

8. Smart Client Routing

The Problem: Clients need to find the leader and balance reads.

Naive Client:

#![allow(unused)]
fn main() {
// Always connect to server1
let mut stream = TcpStream::connect("server1:6379")?;
stream.write(b"SET key value\n")?;

// Problems:
// - What if server1 is not the leader?
// - What if server1 is down?
// - All reads go to one server (no load balancing)
}

Smart Client:

#![allow(unused)]
fn main() {
struct KvClient {
    pools: HashMap<String, ConnectionPool>,  // Pool per server
    leader: Arc<RwLock<Option<String>>>,     // Cached leader
    replicas: Vec<String>,                   // All servers
}

impl KvClient {
    async fn set(&self, key: String, value: String) {
        loop {
            // Get leader (cached or discover)
            let leader = self.get_leader().await;

            // Try write
            match self.send_write(&leader, SET, key, value).await {
                Ok(_) => return Ok(()),
                Err(NotLeader(new_leader)) => {
                    // Update cache and retry
                    *self.leader.write().await = Some(new_leader);
                }
                Err(e) => return Err(e),
            }
        }
    }

    async fn get(&self, key: &str) {
        // Pick random replica (load balancing)
        let replica = self.replicas.choose_random();

        // Send read
        self.send_read(&replica, GET, key).await
    }
}
}

Discovery Protocol:

Client → Any Server: WHO_IS_LEADER\n
Server → Client: LEADER server2:6379\n

Client caches: leader = server2:6379

Client → server2: SET key value\n
server2 → Client: OK\n

If server2 crashes:
Client → server2: SET key value\n
(timeout or connection refused)

Client → server1: WHO_IS_LEADER\n
Server1 → Client: LEADER server3:6379\n
(server3 was elected)

Client → server3: SET key value\n
server3 → Client: OK\n

Load Balancing Strategies:

  • Random: Pick random replica
  • Round-robin: Cycle through replicas
  • Least-loaded: Track connection count, pick lowest
  • Geographically closest: Minimize network latency

Connection to This Project

Now let’s see how all these concepts come together in building a distributed KV store:

1. Progressive Complexity: From Local to Distributed

This project takes you from a simple HashMap to a full distributed system:

Milestone 1 (Local KV Store):

  • Start with Arc<RwLock<HashMap>> for thread-safe in-memory storage
  • Learn TCP protocol design (GET/SET/DELETE)
  • Understand concurrent access patterns (many readers, few writers)

Milestone 2 (Persistence):

  • Add durability with Write-Ahead Log
  • Experience the 5-10x slowdown from fsync()
  • Build crash recovery (replay WAL on startup)
  • Understand the durability vs performance trade-off

Milestone 3 (Replication):

  • Scale to multiple servers with async replication
  • Experience the simplicity and speed of fire-and-forget
  • Also experience the risk: data loss if master crashes

2. The CAP Theorem in Practice

Each milestone makes different CAP trade-offs:

Milestone 1-2 (CP: Consistency + Partition tolerance):

Single node:
  ✓ Consistency: One source of truth
  ✓ Partition tolerance: N/A (no network)
  ✗ Availability: Node down = system down

Milestone 3 (AP: Availability + Partition tolerance):

Async replication:
  ✗ Consistency: Replicas lag behind master
  ✓ Availability: Reads work even if master is slow
  ✓ Partition tolerance: System continues during partition

Milestone 4 (CP: Consistency + Partition tolerance):

Quorum writes (N=3, W=2, R=2):
  ✓ Consistency: W+R > N guarantees reads see writes
  ✗ Availability: Can't write if < W nodes available
  ✓ Partition tolerance: Majority side continues

3. Building Consensus from Scratch

Milestone 5 implements simplified Raft:

Core Algorithm:

#![allow(unused)]
fn main() {
// 1. Leader sends heartbeats
async fn run_heartbeat_loop(&self) {
    loop {
        sleep(1s).await;
        for peer in peers {
            send_heartbeat(peer, self.term, self.id).await;
        }
    }
}

// 2. Followers timeout → election
async fn run_election_timeout(&self) {
    if last_heartbeat.elapsed() > 5s {
        self.start_election().await;
    }
}

// 3. Voting
async fn start_election(&self) {
    self.term += 1;
    self.vote_for_self();

    for peer in peers {
        votes += request_vote(peer, self.term).await;
    }

    if votes > majority {
        self.become_leader();
    }
}
}

Why This Matters:

  • etcd (Kubernetes’ brain) uses this exact algorithm
  • Consul (service discovery) uses Raft
  • CockroachDB uses multi-Raft
  • Understanding Raft = understanding modern distributed databases

4. Performance Evolution

Watch performance change across milestones:

Milestone 1: In-Memory
  - Writes: 50,000/sec
  - Latency: 0.02ms
  - Durability: None
  - Availability: None

Milestone 2: WAL
  - Writes: 10,000/sec (-80%)
  - Latency: 1ms (+50x)
  - Durability: Survives crashes
  - Availability: Still single node

Milestone 3: Async Replication
  - Writes: 30,000/sec
  - Latency: 1ms (no wait for replicas)
  - Durability: Multiple copies (but async)
  - Availability: Reads scale 3x

Milestone 4: Quorum Writes
  - Writes: 15,000/sec (-50%)
  - Latency: 5ms (wait for W replicas)
  - Durability: Strong (data on W nodes before OK)
  - Availability: Survives F = W-1 failures

Milestone 6: Connection Pooling
  - Requests: 10,000/sec per client (+10x)
  - Latency: 1ms (no handshake overhead)
  - Client failover: Automatic

5. Failure Handling Throughout

Each milestone adds resilience:

Milestone 2: Crash recovery (WAL replay) Milestone 3: Replica failure (master continues) Milestone 4: Master failure (quorum still works if W nodes up) Milestone 5: Automatic master recovery (leader election) Milestone 6: Network failure (client retries with new leader)

6. Real-World Architecture

This is exactly how production systems work:

Redis:

  • Milestone 1-2: Redis single node with AOF
  • Milestone 3: Redis with async replication to replicas
  • Milestone 5: Redis Sentinel for leader election

etcd (Kubernetes control plane):

  • Milestone 2: WAL for durability
  • Milestone 4: Raft consensus with W=majority
  • Milestone 5: Full Raft leader election
  • Milestone 6: gRPC client with smart routing

Cassandra:

  • Milestone 3: Async replication (eventually consistent)
  • Milestone 4: Tunable quorum (CL=QUORUM)
  • No leader: peer-to-peer (different from this project)

7. Design Decisions You’ll Make

This project forces you to answer real engineering questions:

Q: Async or sync replication?

  • Async: Fast writes, risk of data loss
  • Sync: Slow writes, guaranteed durability
  • Answer: Depends on use case (caching vs financial transactions)

Q: What quorum size (W)?

  • W=1: Fast, no fault tolerance
  • W=majority: Balance of speed and safety
  • W=all: Maximum safety, no availability
  • Answer: Most systems use W=majority (N/2 + 1)

Q: How long to wait for election?

  • Too short: Elections during network hiccups
  • Too long: Long downtime on failure
  • Answer: 5-10 seconds is typical

Q: Connection pool size?

  • Too small: Connection creation overhead
  • Too large: Memory waste, TCP congestion
  • Answer: 10-50 connections per client is common

8. From Learning to Production

After this project, you’ll be able to:

  • Read the etcd/Raft papers and understand them
  • Evaluate Redis vs etcd vs Consul for your use case
  • Tune replication settings (W, R, N) for your needs
  • Debug distributed systems: “Why did this write succeed on 2/3 nodes but the client saw an error?”
  • Build custom distributed systems for specific needs

You’ve built your own etcd/Redis/Consul!

This is the foundation of:

  • etcd: Kubernetes uses this for all cluster state
  • Redis Cluster: Sharded KV store with replication
  • Consul: Service discovery with Raft consensus
  • CockroachDB: Distributed SQL with Raft
  • Every distributed database system

Milestone 1: In-Memory KV Store (TCP Protocol)

Introduction

Starting Point: Before building distribution and replication, we need a functional single-node key-value store. This is the foundation we’ll extend.

What We’re Building: A TCP server that:

  • Stores key-value pairs in a HashMap
  • Implements a simple text protocol: GET key, SET key value, DELETE key
  • Handles multiple concurrent clients
  • Returns responses: OK, VALUE data, NOT_FOUND

Key Limitation: This is an in-memory store with no persistence. If the server crashes, all data is lost. Also, it’s a single point of failure—if the server goes down, the entire system is unavailable.

Key Concepts

Structs/Types:

  • KvStore - Wraps HashMap with thread-safe access
  • Command - Enum representing GET/SET/DELETE operations
  • Response - Enum for OK/VALUE/NOT_FOUND/ERROR

Functions and Their Roles:

#![allow(unused)]
fn main() {
struct KvStore {
    data: Arc<RwLock<HashMap<String, String>>>,
}

enum Command {
    Get { key: String },
    Set { key: String, value: String },
    Delete { key: String },
}

enum Response {
    Ok,
    Value { data: String },
    NotFound,
    Error { msg: String },
}

impl KvStore {
    fn new() -> Self
        // Initialize with empty HashMap

    async fn get(&self, key: &str) -> Option<String>
        // Read lock, lookup key, return value

    async fn set(&self, key: String, value: String)
        // Write lock, insert key-value

    async fn delete(&self, key: &str) -> bool
        // Write lock, remove key, return true if existed
}

fn parse_command(line: &str) -> Result<Command, String>
    // Parse "GET key" or "SET key value" etc.

async fn handle_client(stream: TcpStream, store: Arc<KvStore>)
    // Read commands, execute, send responses
}

Protocol:

  • Client → Server: GET mykey\n
  • Server → Client: VALUE myvalue\n or NOT_FOUND\n
  • Client → Server: SET mykey myvalue\n
  • Server → Client: OK\n

Checkpoint Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_set_and_get() {
        let store = KvStore::new();

        store.set("name".to_string(), "Alice".to_string()).await;
        let value = store.get("name").await;

        assert_eq!(value, Some("Alice".to_string()));
    }

    #[tokio::test]
    async fn test_get_nonexistent() {
        let store = KvStore::new();
        let value = store.get("missing").await;
        assert_eq!(value, None);
    }

    #[tokio::test]
    async fn test_delete() {
        let store = KvStore::new();

        store.set("temp".to_string(), "value".to_string()).await;
        assert!(store.delete("temp").await);
        assert_eq!(store.get("temp").await, None);
    }

    #[tokio::test]
    async fn test_overwrite() {
        let store = KvStore::new();

        store.set("key".to_string(), "v1".to_string()).await;
        store.set("key".to_string(), "v2".to_string()).await;

        assert_eq!(store.get("key").await, Some("v2".to_string()));
    }

    #[test]
    fn test_parse_get() {
        let cmd = parse_command("GET mykey").unwrap();
        assert!(matches!(cmd, Command::Get { key } if key == "mykey"));
    }

    #[test]
    fn test_parse_set() {
        let cmd = parse_command("SET mykey myvalue").unwrap();
        assert!(matches!(cmd, Command::Set { key, value }
            if key == "mykey" && value == "myvalue"));
    }

    #[test]
    fn test_parse_set_with_spaces() {
        let cmd = parse_command("SET mykey hello world").unwrap();
        assert!(matches!(cmd, Command::Set { key, value }
            if key == "mykey" && value == "hello world"));
    }

    #[tokio::test]
    async fn test_concurrent_clients() {
        tokio::spawn(async {
            run_kv_server("127.0.0.1:9301").await.unwrap();
        });
        sleep(Duration::from_millis(100)).await;

        // Connect multiple clients
        let client1 = TcpStream::connect("127.0.0.1:9301").await.unwrap();
        let client2 = TcpStream::connect("127.0.0.1:9301").await.unwrap();

        let mut writer1 = client1;
        let mut writer2 = client2;

        // Both clients set different keys
        writer1.write_all(b"SET key1 value1\n").await.unwrap();
        writer2.write_all(b"SET key2 value2\n").await.unwrap();

        // Both should succeed
        let mut buf1 = [0u8; 1024];
        let mut buf2 = [0u8; 1024];

        let n1 = writer1.read(&mut buf1).await.unwrap();
        let n2 = writer2.read(&mut buf2).await.unwrap();

        assert!(String::from_utf8_lossy(&buf1[..n1]).contains("OK"));
        assert!(String::from_utf8_lossy(&buf2[..n2]).contains("OK"));
    }
}
}

Starter Code

use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::RwLock;

struct KvStore {
    data: Arc<RwLock<HashMap<String, String>>>,
}

#[derive(Debug)]
enum Command {
    Get { key: String },
    Set { key: String, value: String },
    Delete { key: String },
}

enum Response {
    Ok,
    Value { data: String },
    NotFound,
    Error { msg: String },
}

impl KvStore {
    fn new() -> Self {
        KvStore {
            data: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    async fn get(&self, key: &str) -> Option<String> {
        // TODO: Acquire read lock and get value
        let data = todo!(); // self.data.read().await
        data.get(key).cloned()
    }

    async fn set(&self, key: String, value: String) {
        // TODO: Acquire write lock and insert
        let mut data = todo!(); // self.data.write().await
        data.insert(key, value);
    }

    async fn delete(&self, key: &str) -> bool {
        // TODO: Acquire write lock and remove
        let mut data = todo!();
        data.remove(key).is_some()
    }
}

impl Response {
    fn to_string(&self) -> String {
        match self {
            Response::Ok => "OK\n".to_string(),
            Response::Value { data } => format!("VALUE {}\n", data),
            Response::NotFound => "NOT_FOUND\n".to_string(),
            Response::Error { msg } => format!("ERROR {}\n", msg),
        }
    }
}

fn parse_command(line: &str) -> Result<Command, String> {
    let parts: Vec<&str> = line.trim().splitn(3, ' ').collect();

    match parts.as_slice() {
        ["GET", key] => Ok(Command::Get {
            key: key.to_string(),
        }),
        ["SET", key, value] => Ok(Command::Set {
            key: key.to_string(),
            value: value.to_string(),
        }),
        ["DELETE", key] => Ok(Command::Delete {
            key: key.to_string(),
        }),
        _ => Err("Invalid command".to_string()),
    }
}

#[tokio::main]
async fn main() {
    if let Err(e) = run_kv_server("127.0.0.1:6379").await {
        eprintln!("Server error: {}", e);
    }
}

async fn run_kv_server(addr: &str) -> tokio::io::Result<()> {
    let store = Arc::new(KvStore::new());
    let listener = TcpListener::bind(addr).await?;

    println!("KV store listening on {}", addr);

    loop {
        let (stream, addr) = listener.accept().await?;
        let store = store.clone();

        tokio::spawn(async move {
            if let Err(e) = handle_client(stream, store).await {
                eprintln!("Client {} error: {}", addr, e);
            }
        });
    }
}

async fn handle_client(stream: TcpStream, store: Arc<KvStore>) -> tokio::io::Result<()> {
    let (reader, mut writer) = stream.into_split();
    let mut reader = BufReader::new(reader);
    let mut line = String::new();

    loop {
        line.clear();

        // TODO: Read command from client
        let bytes_read = todo!(); // reader.read_line(&mut line).await?

        if bytes_read == 0 {
            break; // EOF
        }

        // TODO: Parse command
        let command = match parse_command(&line) {
            Ok(cmd) => cmd,
            Err(e) => {
                // Send error response
                writer.write_all(Response::Error { msg: e }.to_string().as_bytes()).await?;
                continue;
            }
        };

        // TODO: Execute command
        let response = match command {
            Command::Get { key } => {
                // store.get(&key).await
                todo!();
            }
            Command::Set { key, value } => {
                // store.set(key, value).await
                todo!();
            }
            Command::Delete { key } => {
                // store.delete(&key).await
                todo!();
            }
        };

        // TODO: Send response
        // writer.write_all(response.to_string().as_bytes()).await?;
        todo!();
    }

    Ok(())
}

Check Your Understanding

  • Why use Arc<RwLock<HashMap>>? Arc for shared ownership across tasks, RwLock for concurrent read/write access.
  • What’s the advantage of RwLock over Mutex? Multiple concurrent readers (GET operations) don’t block each other.
  • Why parse commands as an enum? Type-safe representation, pattern matching for execution.
  • What happens if the server crashes? All data is lost (no persistence yet).
  • How many concurrent readers can access the store? Unlimited (RwLock allows multiple readers).

Why Milestone 1 Isn’t Enough → Moving to Milestone 2

Limitation: No Durability

  • All data is in RAM only
  • Server crash = total data loss
  • Unacceptable for production databases
  • Restart = empty database

What We’re Adding:

  • Write-Ahead Log (WAL): Append-only log of all writes
  • Durability: Writes persisted to disk before acknowledging
  • Recovery: Replay WAL on startup to restore state
  • Crash resistance: Can recover from power failures

Improvement:

  • Durability: Volatile → persistent (survives crashes)
  • Recovery: Empty on restart → full state restored
  • Reliability: Data loss risk eliminated (at cost of ~2x slower writes)
  • Production-ready: Foundation for real databases

Performance Impact:

  • Write latency: 0.01ms (memory) → 1-5ms (with fsync to disk)
  • Throughput: 50K writes/sec → 10K writes/sec (disk I/O bound)
  • Trade-off: Speed vs durability (can batch writes for better throughput)

Milestone 2: Persistence with Write-Ahead Log (WAL)

Introduction

The Problem: In-memory data is volatile. Crash = data loss.

The Solution: Write-Ahead Logging

  1. Before modifying in-memory state, append operation to log file
  2. Sync log to disk (fsync)
  3. Then modify in-memory HashMap
  4. On restart: replay log to rebuild state

WAL Pattern (used by PostgreSQL, Redis, etcd):

Time 0: SET key1 value1  → Write to log, fsync, update HashMap
Time 1: SET key2 value2  → Write to log, fsync, update HashMap
Time 2: DELETE key1      → Write to log, fsync, update HashMap
--- CRASH ---
Time 3: Restart → Replay log: SET key1, SET key2, DELETE key1
        → State: {key2: value2}

Key Concepts

Structs:

#![allow(unused)]
fn main() {
struct WalEntry {
    command: Command,
    timestamp: u64,
}

struct KvStore {
    data: Arc<RwLock<HashMap<String, String>>>,
    wal: Arc<RwLock<WriteAheadLog>>,
}

struct WriteAheadLog {
    file: File,
    path: PathBuf,
}
}

Functions:

#![allow(unused)]
fn main() {
impl WriteAheadLog {
    async fn new(path: PathBuf) -> io::Result<Self>
        // Open or create WAL file in append mode

    async fn append(&mut self, entry: &WalEntry) -> io::Result<()>
        // Serialize entry to bytes
        // Write to file
        // fsync to ensure durability

    async fn replay(&self) -> io::Result<Vec<WalEntry>>
        // Read entire file
        // Deserialize all entries
        // Return for playback
}

impl KvStore {
    async fn new_with_wal(wal_path: PathBuf) -> io::Result<Self>
        // Create or open WAL
        // Replay WAL to rebuild state
        // Return initialized store

    async fn set_durable(&self, key: String, value: String) -> io::Result<()>
        // 1. Append to WAL
        // 2. Sync to disk
        // 3. Update in-memory HashMap
}
}

Serialization Format (simple text format):

SET key1 value1
SET key2 value2
DELETE key1

Checkpoint Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[tokio::test]
    async fn test_wal_append_and_replay() {
        let dir = tempdir().unwrap();
        let wal_path = dir.path().join("test.wal");

        // Create WAL and append entries
        {
            let mut wal = WriteAheadLog::new(wal_path.clone()).await.unwrap();

            wal.append(&WalEntry {
                command: Command::Set {
                    key: "key1".to_string(),
                    value: "value1".to_string(),
                },
                timestamp: 1,
            }).await.unwrap();

            wal.append(&WalEntry {
                command: Command::Set {
                    key: "key2".to_string(),
                    value: "value2".to_string(),
                },
                timestamp: 2,
            }).await.unwrap();
        }

        // Replay WAL
        let wal = WriteAheadLog::new(wal_path).await.unwrap();
        let entries = wal.replay().await.unwrap();

        assert_eq!(entries.len(), 2);
    }

    #[tokio::test]
    async fn test_persistence_across_restart() {
        let dir = tempdir().unwrap();
        let wal_path = dir.path().join("store.wal");

        // First run: set some data
        {
            let store = KvStore::new_with_wal(wal_path.clone()).await.unwrap();
            store.set_durable("name".to_string(), "Alice".to_string()).await.unwrap();
            store.set_durable("age".to_string(), "30".to_string()).await.unwrap();
        }

        // Second run: reload from WAL
        {
            let store = KvStore::new_with_wal(wal_path).await.unwrap();
            assert_eq!(store.get("name").await, Some("Alice".to_string()));
            assert_eq!(store.get("age").await, Some("30".to_string()));
        }
    }

    #[tokio::test]
    async fn test_delete_persistence() {
        let dir = tempdir().unwrap();
        let wal_path = dir.path().join("delete.wal");

        {
            let store = KvStore::new_with_wal(wal_path.clone()).await.unwrap();
            store.set_durable("temp".to_string(), "value".to_string()).await.unwrap();
            store.delete_durable("temp").await.unwrap();
        }

        {
            let store = KvStore::new_with_wal(wal_path).await.unwrap();
            assert_eq!(store.get("temp").await, None);
        }
    }

    #[tokio::test]
    async fn test_wal_file_size_grows() {
        let dir = tempdir().unwrap();
        let wal_path = dir.path().join("grow.wal");

        let store = KvStore::new_with_wal(wal_path.clone()).await.unwrap();

        let initial_size = tokio::fs::metadata(&wal_path).await.unwrap().len();

        for i in 0..10 {
            store.set_durable(format!("key{}", i), format!("value{}", i))
                .await
                .unwrap();
        }

        let final_size = tokio::fs::metadata(&wal_path).await.unwrap().len();
        assert!(final_size > initial_size);
    }
}
}

Starter Code

#![allow(unused)]
fn main() {
use std::path::PathBuf;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[derive(Debug, Clone)]
struct WalEntry {
    command: Command,
    timestamp: u64,
}

struct WriteAheadLog {
    file: File,
    path: PathBuf,
}

impl WriteAheadLog {
    async fn new(path: PathBuf) -> io::Result<Self> {
        // TODO: Open file in append mode, create if doesn't exist
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .read(true)
            .open(&path)
            .await?;

        Ok(WriteAheadLog { file, path })
    }

    async fn append(&mut self, entry: &WalEntry) -> io::Result<()> {
        // TODO: Serialize command to text format
        let line = match &entry.command {
            Command::Set { key, value } => format!("SET {} {}\n", key, value),
            Command::Delete { key } => format!("DELETE {}\n", key),
            Command::Get { .. } => return Ok(()), // Don't log reads
        };

        // TODO: Write to file
        // self.file.write_all(line.as_bytes()).await?;
        todo!();

        // TODO: Sync to disk (ensure durability)
        // self.file.sync_all().await?;
        todo!();

        Ok(())
    }

    async fn replay(&self) -> io::Result<Vec<WalEntry>> {
        // TODO: Read entire file
        let mut file = File::open(&self.path).await?;
        let mut contents = String::new();
        file.read_to_string(&mut contents).await?;

        // TODO: Parse each line into WalEntry
        let mut entries = Vec::new();
        for (idx, line) in contents.lines().enumerate() {
            if let Ok(command) = parse_command(line) {
                entries.push(WalEntry {
                    command,
                    timestamp: idx as u64,
                });
            }
        }

        Ok(entries)
    }
}

impl KvStore {
    async fn new_with_wal(wal_path: PathBuf) -> io::Result<Self> {
        let wal = WriteAheadLog::new(wal_path).await?;

        // TODO: Replay WAL to rebuild state
        let entries = wal.replay().await?;

        let data = HashMap::new();
        // TODO: Apply each entry to rebuild state
        // for entry in entries {
        //     match entry.command {
        //         Command::Set { key, value } => data.insert(key, value),
        //         Command::Delete { key } => data.remove(&key),
        //         _ => {}
        //     }
        // }
        todo!();

        Ok(KvStore {
            data: Arc::new(RwLock::new(data)),
            wal: Arc::new(RwLock::new(wal)),
        })
    }

    async fn set_durable(&self, key: String, value: String) -> io::Result<()> {
        // TODO: 1. Append to WAL
        let mut wal = self.wal.write().await;
        wal.append(&WalEntry {
            command: Command::Set {
                key: key.clone(),
                value: value.clone(),
            },
            timestamp: 0, // Use current time in production
        }).await?;
        drop(wal);

        // TODO: 2. Update in-memory HashMap
        // self.data.write().await.insert(key, value);
        todo!();

        Ok(())
    }

    async fn delete_durable(&self, key: &str) -> io::Result<bool> {
        // TODO: Similar to set_durable but for delete
        todo!();
    }
}
}

Check Your Understanding

  • What is a Write-Ahead Log? Append-only log of operations written before applying them to in-memory state.
  • Why write to WAL before updating HashMap? Ensures we can recover operations even if we crash before updating memory.
  • What does fsync do? Forces OS to flush data to physical disk (ensures durability).
  • How do we recover from a crash? Replay entire WAL on startup to rebuild HashMap.
  • What’s the performance cost of fsync? ~1-5ms per write (vs 0.01ms in-memory), limits to ~1K writes/sec.

Why Milestone 2 Isn’t Enough → Moving to Milestone 3

Limitation: Single Point of Failure

  • Only one server holds the data
  • Server crash = system unavailable until restart
  • No redundancy if disk fails
  • Cannot scale reads

What We’re Adding:

  • Replication: Copy data to multiple servers (master + replicas)
  • Async replication: Master sends writes to replicas without waiting
  • Fault tolerance: System stays available if 1 replica fails
  • Read scaling: Distribute reads across replicas

Improvement:

  • Availability: Single failure point → N-1 fault tolerance
  • Durability: 1 copy → N copies (survive disk failures)
  • Read throughput: 100K reads/sec → 300K reads/sec (3 replicas)
  • Write latency: Unchanged (async replication doesn’t wait)

Architecture:

         Master (read/write)
           /    \
          /      \
    Replica1   Replica2
   (read-only) (read-only)

Milestone 3: Async Replication (Master-Replica)

Introduction

The Problem: Single server = single point of failure and limited read capacity.

The Solution: Master-Replica Replication

  • One master accepts writes
  • Multiple replicas receive replicated writes asynchronously
  • Reads can go to any replica (eventual consistency)
  • Writes only to master

Replication Flow:

Client → SET key value → Master
                           ↓ (async)
                        Replica1, Replica2, Replica3
                           ↓ (eventually)
                        All replicas have key=value

Key Concepts

Structs:

#![allow(unused)]
fn main() {
struct ReplicaInfo {
    address: String,
    client: TcpStream,
}

struct KvStore {
    data: Arc<RwLock<HashMap<String, String>>>,
    wal: Arc<RwLock<WriteAheadLog>>,
    replicas: Arc<RwLock<Vec<ReplicaInfo>>>,
    is_master: bool,
}
}

Functions:

#![allow(unused)]
fn main() {
impl KvStore {
    async fn add_replica(&self, address: String) -> io::Result<()>
        // Connect to replica
        // Add to replicas list
        // Send current snapshot

    async fn replicate_to_all(&self, command: &Command)
        // For each replica: send command (don't wait for ack)

    async fn set_with_replication(&self, key: String, value: String) -> io::Result<()>
        // 1. Append to WAL
        // 2. Update HashMap
        // 3. Replicate to followers (async, fire-and-forget)
}

// Replica server
async fn run_replica(master_addr: &str, listen_addr: &str)
    // Connect to master
    // Receive replicated commands
    // Apply to local store
}

Replication Protocol:

  • Master → Replica: REPLICATE SET key value\n
  • Replica → Master: (no ack in async mode)

Checkpoint Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_add_replica() {
        // Start replica server
        tokio::spawn(async {
            run_replica_server("127.0.0.1:9401").await.unwrap();
        });
        sleep(Duration::from_millis(100)).await;

        let store = KvStore::new();
        store.add_replica("127.0.0.1:9401".to_string()).await.unwrap();

        assert_eq!(store.replicas.read().await.len(), 1);
    }

    #[tokio::test]
    async fn test_replication_propagates() {
        // Start master
        let master = Arc::new(KvStore::new());
        tokio::spawn({
            let master = master.clone();
            async move {
                run_master_server("127.0.0.1:9402", master).await.unwrap();
            }
        });

        // Start replica
        let replica = Arc::new(KvStore::new());
        tokio::spawn({
            let replica = replica.clone();
            async move {
                run_replica_server_with_store("127.0.0.1:9403", replica).await.unwrap();
            }
        });

        sleep(Duration::from_millis(100)).await;

        // Connect master to replica
        master.add_replica("127.0.0.1:9403".to_string()).await.unwrap();

        // Write to master
        master.set_with_replication("key1".to_string(), "value1".to_string())
            .await
            .unwrap();

        // Wait for async replication
        sleep(Duration::from_millis(100)).await;

        // Read from replica
        let replica_value = replica.get("key1").await;
        assert_eq!(replica_value, Some("value1".to_string()));
    }

    #[tokio::test]
    async fn test_multiple_replicas() {
        let master = Arc::new(KvStore::new());

        // Start 3 replicas
        let replica1 = Arc::new(KvStore::new());
        let replica2 = Arc::new(KvStore::new());
        let replica3 = Arc::new(KvStore::new());

        // ... (start servers and connect)

        master.add_replica("127.0.0.1:9404".to_string()).await.unwrap();
        master.add_replica("127.0.0.1:9405".to_string()).await.unwrap();
        master.add_replica("127.0.0.1:9406".to_string()).await.unwrap();

        master.set_with_replication("shared".to_string(), "data".to_string())
            .await
            .unwrap();

        sleep(Duration::from_millis(100)).await;

        // All replicas should have the data
        assert_eq!(replica1.get("shared").await, Some("data".to_string()));
        assert_eq!(replica2.get("shared").await, Some("data".to_string()));
        assert_eq!(replica3.get("shared").await, Some("data".to_string()));
    }

    #[tokio::test]
    async fn test_replica_failure_doesnt_block_master() {
        let master = Arc::new(KvStore::new());

        // Add a replica that will fail
        master.add_replica("127.0.0.1:9999".to_string()).await.ok(); // Nonexistent

        // Master should still accept writes
        let result = master.set_with_replication("key".to_string(), "value".to_string()).await;
        assert!(result.is_ok());
    }
}
}

Starter Code

#![allow(unused)]
fn main() {
use tokio::net::TcpStream;

struct ReplicaInfo {
    address: String,
    stream: TcpStream,
}

impl KvStore {
    async fn add_replica(&self, address: String) -> io::Result<()> {
        // TODO: Connect to replica
        let stream = TcpStream::connect(&address).await?;

        // TODO: Add to replicas list
        let mut replicas = self.replicas.write().await;
        replicas.push(ReplicaInfo {
            address,
            stream,
        });

        Ok(())
    }

    async fn replicate_to_all(&self, command: &Command) {
        // TODO: For each replica, send command
        let replicas = self.replicas.read().await;

        for replica in replicas.iter() {
            // Serialize command
            let msg = match command {
                Command::Set { key, value } => format!("REPLICATE SET {} {}\n", key, value),
                Command::Delete { key } => format!("REPLICATE DELETE {}\n", key),
                _ => continue,
            };

            // TODO: Send to replica (ignore errors - fire and forget)
            // replica.stream.write_all(msg.as_bytes()).await.ok();
            todo!();
        }
    }

    async fn set_with_replication(&self, key: String, value: String) -> io::Result<()> {
        // TODO: 1. Append to WAL (if enabled)
        if let Some(wal) = &self.wal {
            // wal.write().await.append(...).await?;
            todo!();
        }

        // TODO: 2. Update HashMap
        self.data.write().await.insert(key.clone(), value.clone());

        // TODO: 3. Replicate to followers (async, spawn task)
        let command = Command::Set { key, value };
        let store = self.clone();
        tokio::spawn(async move {
            store.replicate_to_all(&command).await;
        });

        Ok(())
    }
}

async fn run_replica_server(listen_addr: &str) -> io::Result<()> {
    let store = Arc::new(KvStore::new());
    let listener = TcpListener::bind(listen_addr).await?;

    println!("Replica listening on {}", listen_addr);

    loop {
        let (stream, _) = listener.accept().await?;
        let store = store.clone();

        tokio::spawn(async move {
            handle_replica_client(stream, store).await.ok();
        });
    }
}

async fn handle_replica_client(stream: TcpStream, store: Arc<KvStore>) -> io::Result<()> {
    let (reader, _writer) = stream.into_split();
    let mut reader = BufReader::new(reader);
    let mut line = String::new();

    loop {
        line.clear();
        let n = reader.read_line(&mut line).await?;
        if n == 0 {
            break;
        }

        // TODO: Parse REPLICATE commands
        if let Some(cmd_str) = line.strip_prefix("REPLICATE ") {
            if let Ok(command) = parse_command(cmd_str) {
                // TODO: Apply to local store
                match command {
                    Command::Set { key, value } => {
                        store.set(key, value).await;
                    }
                    Command::Delete { key } => {
                        store.delete(&key).await;
                    }
                    _ => {}
                }
            }
        }
    }

    Ok(())
}
}

Check Your Understanding

  • What is async replication? Master sends writes to replicas but doesn’t wait for acknowledgment.
  • Why is async replication faster than sync? Master doesn’t wait for replicas, so write latency is just local write time.
  • What’s the downside of async replication? Data loss if master crashes before replicas receive the write.
  • Can replicas serve reads? Yes, but data may be slightly stale (eventual consistency).
  • What happens if a replica is down? Master continues operating (fire-and-forget pattern).

Why Milestone 3 Isn’t Enough → Moving to Milestone 4

Limitation: Data Loss Window

  • Async replication = master can crash before replicas receive write
  • Example: Master receives write, crashes before replicating → data lost
  • Eventual consistency = replicas lag behind master
  • No guarantee writes are durable

What We’re Adding:

  • Synchronous replication: Wait for quorum before acknowledging
  • Quorum writes: W=2 out of N=3 replicas must acknowledge
  • Strong consistency: Guaranteed durability (data on ≥2 nodes)
  • Configurable consistency: Trade latency for durability

Improvement:

  • Durability: Async (data loss possible) → Sync quorum (guaranteed durability)
  • Consistency: Eventual → Strong (reads see committed writes)
  • Fault tolerance: Survive F=W-1 failures (W=2 → survive 1 failure)
  • Latency cost: Write time increases (wait for slowest replica in quorum)

Quorum Example (N=3, W=2):

Client → SET key value → Master
                           ↓ (wait for 2 acks)
                    Replica1 ✓, Replica2 ✓, Replica3 ✗
                           ↓
Client ← OK (write durable on 2 nodes)

Milestone 4: Synchronous Replication with Quorum Writes

Introduction

The Problem: Async replication can lose data on master crash.

The Solution: Quorum Writes

  • Configure N (total replicas) and W (write quorum)
  • Master waits for W replicas to acknowledge before returning OK
  • Common: N=3, W=2 (majority quorum, survive 1 failure)
  • Trade-off: Higher latency for guaranteed durability

Consistency Guarantee:

If W + R > N (where R = read quorum), reads see committed writes
Example: N=3, W=2, R=2 → 2+2 > 3 → strong consistency

Key Concepts

Structs:

#![allow(unused)]
fn main() {
struct ReplicationConfig {
    total_replicas: usize,  // N
    write_quorum: usize,     // W
    read_quorum: usize,      // R
}

struct ReplicaInfo {
    address: String,
    stream: TcpStream,
    healthy: bool,
}

struct WriteAck {
    replica_id: usize,
    success: bool,
}
}

Functions:

#![allow(unused)]
fn main() {
impl KvStore {
    async fn set_with_quorum(&self, key: String, value: String) -> io::Result<()>
        // 1. Write to WAL locally
        // 2. Send to all replicas
        // 3. Wait for W acknowledgments (with timeout)
        // 4. If quorum reached: commit, return OK
        // 5. If quorum failed: rollback, return error

    async fn wait_for_quorum(&self, write_id: u64) -> Result<(), QuorumError>
        // Wait for W replicas to acknowledge
        // Timeout after 5 seconds
        // Return Ok if quorum reached, Err otherwise
}

// Replica acknowledges writes
async fn handle_replica_sync_write(stream: TcpStream, store: Arc<KvStore>)
    // Receive REPLICATE_SYNC command
    // Apply to local store
    // Send ACK back to master
}

Protocol:

  • Master → Replica: REPLICATE_SYNC <write_id> SET key value\n
  • Replica → Master: ACK <write_id>\n or NACK <write_id> <error>\n

Checkpoint Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_quorum_write_success() {
        let config = ReplicationConfig {
            total_replicas: 3,
            write_quorum: 2,
            read_quorum: 2,
        };

        let master = Arc::new(KvStore::new_with_config(config));

        // Start 3 replicas
        let replicas = start_replicas(3).await;

        // Connect master to replicas
        for (i, addr) in replicas.iter().enumerate() {
            master.add_replica(addr.clone()).await.unwrap();
        }

        // Write with quorum (should succeed with 2/3 acks)
        let result = master.set_with_quorum("key".to_string(), "value".to_string()).await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn test_quorum_write_failure() {
        let config = ReplicationConfig {
            total_replicas: 3,
            write_quorum: 3, // Require all 3
            read_quorum: 2,
        };

        let master = Arc::new(KvStore::new_with_config(config));

        // Only connect 2 replicas (1 is down)
        master.add_replica("127.0.0.1:9501".to_string()).await.unwrap();
        master.add_replica("127.0.0.1:9502".to_string()).await.unwrap();

        // Write should fail (need 3, have 2)
        let result = master.set_with_quorum("key".to_string(), "value".to_string()).await;
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn test_quorum_with_slow_replica() {
        let config = ReplicationConfig {
            total_replicas: 3,
            write_quorum: 2,
            read_quorum: 2,
        };

        let master = Arc::new(KvStore::new_with_config(config));

        // 2 fast replicas, 1 slow replica
        master.add_replica("127.0.0.1:9503".to_string()).await.unwrap();
        master.add_replica("127.0.0.1:9504".to_string()).await.unwrap();
        master.add_replica("127.0.0.1:9999".to_string()).await.ok(); // Slow/dead

        // Should succeed (2 fast replicas = quorum)
        let start = Instant::now();
        let result = master.set_with_quorum("key".to_string(), "value".to_string()).await;
        let elapsed = start.elapsed();

        assert!(result.is_ok());
        assert!(elapsed < Duration::from_secs(1)); // Doesn't wait for slow replica
    }

    #[tokio::test]
    async fn test_majority_quorum() {
        // N=5, W=3 (majority)
        let config = ReplicationConfig {
            total_replicas: 5,
            write_quorum: 3,
            read_quorum: 3,
        };

        let master = Arc::new(KvStore::new_with_config(config));

        // Connect 5 replicas
        for i in 0..5 {
            let addr = format!("127.0.0.1:{}", 9505 + i);
            start_replica_on(&addr).await;
            master.add_replica(addr).await.unwrap();
        }

        // Should succeed with 3/5 acks
        let result = master.set_with_quorum("data".to_string(), "value".to_string()).await;
        assert!(result.is_ok());
    }
}
}

Starter Code

#![allow(unused)]
fn main() {
use tokio::sync::oneshot;
use tokio::time::{timeout, Duration};
use std::collections::HashMap;

struct ReplicationConfig {
    total_replicas: usize,
    write_quorum: usize,
    read_quorum: usize,
}

struct WriteAck {
    replica_id: usize,
    success: bool,
}

impl KvStore {
    async fn set_with_quorum(&self, key: String, value: String) -> io::Result<()> {
        // Generate unique write ID
        let write_id = generate_write_id();

        // TODO: 1. Write to local WAL
        if let Some(wal) = &self.wal {
            // wal.write().await.append(...).await?;
            todo!();
        }

        // TODO: 2. Send to all replicas
        let replicas = self.replicas.read().await;
        let msg = format!("REPLICATE_SYNC {} SET {} {}\n", write_id, key, value);

        let (ack_tx, mut ack_rx) = tokio::sync::mpsc::channel(replicas.len());

        for (replica_id, replica) in replicas.iter().enumerate() {
            let msg = msg.clone();
            let ack_tx = ack_tx.clone();
            let mut stream = replica.stream.clone();

            tokio::spawn(async move {
                // Send command
                if stream.write_all(msg.as_bytes()).await.is_err() {
                    ack_tx.send(WriteAck {
                        replica_id,
                        success: false,
                    }).await.ok();
                    return;
                }

                // Wait for ACK with timeout
                let mut buf = [0u8; 1024];
                match timeout(Duration::from_secs(5), stream.read(&mut buf)).await {
                    Ok(Ok(n)) if n > 0 => {
                        let response = String::from_utf8_lossy(&buf[..n]);
                        let success = response.starts_with("ACK");
                        ack_tx.send(WriteAck { replica_id, success }).await.ok();
                    }
                    _ => {
                        ack_tx.send(WriteAck {
                            replica_id,
                            success: false,
                        }).await.ok();
                    }
                }
            });
        }
        drop(ack_tx);
        drop(replicas);

        // TODO: 3. Wait for quorum acknowledgments
        let mut acks = 1; // Master counts as 1
        while let Some(ack) = ack_rx.recv().await {
            if ack.success {
                acks += 1;
            }

            if acks >= self.config.write_quorum {
                break;
            }
        }

        // TODO: 4. Check if quorum reached
        if acks >= self.config.write_quorum {
            // Quorum reached: commit to local store
            self.data.write().await.insert(key, value);
            Ok(())
        } else {
            // Quorum failed: return error
            Err(io::Error::new(
                io::ErrorKind::Other,
                "Failed to reach write quorum",
            ))
        }
    }
}

async fn handle_replica_sync_write(
    stream: TcpStream,
    store: Arc<KvStore>,
) -> io::Result<()> {
    let (reader, mut writer) = stream.into_split();
    let mut reader = BufReader::new(reader);
    let mut line = String::new();

    loop {
        line.clear();
        let n = reader.read_line(&mut line).await?;
        if n == 0 {
            break;
        }

        // TODO: Parse REPLICATE_SYNC command
        // Format: REPLICATE_SYNC <write_id> SET key value
        if let Some(cmd_str) = line.strip_prefix("REPLICATE_SYNC ") {
            let parts: Vec<&str> = cmd_str.splitn(2, ' ').collect();
            if parts.len() != 2 {
                continue;
            }

            let write_id = parts[0];
            let command_str = parts[1];

            // TODO: Parse and apply command
            if let Ok(command) = parse_command(command_str) {
                match command {
                    Command::Set { key, value } => {
                        store.set(key, value).await;

                        // TODO: Send ACK
                        writer.write_all(format!("ACK {}\n", write_id).as_bytes()).await?;
                    }
                    Command::Delete { key } => {
                        store.delete(&key).await;
                        writer.write_all(format!("ACK {}\n", write_id).as_bytes()).await?;
                    }
                    _ => {}
                }
            } else {
                // Send NACK on parse error
                writer.write_all(format!("NACK {} parse_error\n", write_id).as_bytes()).await?;
            }
        }
    }

    Ok(())
}

fn generate_write_id() -> u64 {
    use std::time::{SystemTime, UNIX_EPOCH};
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_nanos() as u64
}
}

Check Your Understanding

  • What is a write quorum? Minimum number of replicas that must acknowledge a write.
  • Why use quorum writes? Guarantee durability—data is on multiple nodes before acknowledging.
  • What’s the trade-off? Higher write latency (wait for W replicas) vs stronger consistency.
  • With N=3, W=2, how many failures can we tolerate? 1 failure (2 nodes still form quorum).
  • What if quorum is not reached? Write fails, client receives error, should retry.

Why Milestone 4 Isn’t Enough → Moving to Milestone 5

Limitation: Manual Failover

  • Master crashes → system unavailable until manual intervention
  • Need operator to promote replica to master
  • Downtime = minutes to hours (human in the loop)
  • No automatic recovery

What We’re Adding:

  • Leader election: Replicas automatically elect new master on failure
  • Heartbeats: Detect master failure quickly (2-5 seconds)
  • Automatic promotion: Replica becomes master without human intervention
  • Simplified Raft: Voting-based consensus for leader election

Improvement:

  • Availability: Manual failover (minutes) → automatic (seconds)
  • Recovery: Human required → fully automated
  • Downtime: Minutes → 2-5 seconds
  • Production-ready: Can deploy without 24/7 on-call

Leader Election Algorithm (simplified Raft):

  1. Nodes send heartbeats to leader
  2. If no heartbeat for N seconds → start election
  3. Candidate increments term, votes for self
  4. Requests votes from other nodes
  5. Node grants vote if: term is newer, haven’t voted this term
  6. Candidate with majority becomes leader

Milestone 5: Leader Election (Simplified Raft)

Introduction

The Problem: Master failure requires manual intervention (downtime).

The Solution: Automated Leader Election

  • All nodes monitor leader via heartbeats
  • On timeout: start election
  • Majority vote determines new leader
  • New leader starts replicating to followers

Simplified Raft Election:

Time 0: Master sends heartbeat every 1s
Time 5: Master crashes (no heartbeat)
Time 7: Replica timeout → starts election (term 2, votes for self)
Time 7.1: Requests votes from other replicas
Time 7.2: Receives majority votes → becomes master
Time 7.3: Sends heartbeat to establish leadership

Key Concepts

Structs:

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Copy, PartialEq)]
enum NodeRole {
    Leader,
    Follower,
    Candidate,
}

struct NodeState {
    role: NodeRole,
    current_term: u64,
    voted_for: Option<String>, // Node ID
    leader_id: Option<String>,
}

struct KvStore {
    // ... existing fields ...
    node_id: String,
    state: Arc<RwLock<NodeState>>,
    peers: Arc<RwLock<Vec<String>>>, // Other node addresses
}
}

Functions:

#![allow(unused)]
fn main() {
impl KvStore {
    async fn start_election(&self)
        // Increment term
        // Change role to Candidate
        // Vote for self
        // Request votes from all peers
        // If majority: become Leader

    async fn send_heartbeat(&self)
        // Send heartbeat to all peers
        // Maintain leadership

    async fn handle_vote_request(&self, term: u64, candidate_id: String) -> bool
        // Grant vote if:
        //   1. Term is greater than current term
        //   2. Haven't voted for anyone else this term

    async fn handle_heartbeat(&self, term: u64, leader_id: String)
        // Reset election timeout
        // Update leader_id

    async fn run_election_timeout(&self)
        // Background task
        // If no heartbeat for N seconds: start election
}
}

Messages:

  • HEARTBEAT term=5 leader=node1
  • VOTE_REQUEST term=6 candidate=node2
  • VOTE_RESPONSE term=6 granted=true

Checkpoint Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_leader_sends_heartbeats() {
        let node = KvStore::new_with_role(NodeRole::Leader, "node1".to_string());

        // Start heartbeat task
        let node_clone = node.clone();
        tokio::spawn(async move {
            node_clone.run_heartbeat_loop().await;
        });

        sleep(Duration::from_secs(2)).await;

        // Verify heartbeats were sent (check logs or mock peers)
    }

    #[tokio::test]
    async fn test_follower_starts_election_on_timeout() {
        let node = KvStore::new_with_role(NodeRole::Follower, "node1".to_string());

        // Start election timeout task
        let node_clone = node.clone();
        tokio::spawn(async move {
            node_clone.run_election_timeout().await;
        });

        sleep(Duration::from_secs(6)).await; // Timeout is 5s

        // Should have started election
        let state = node.state.read().await;
        assert_eq!(state.role, NodeRole::Candidate);
    }

    #[tokio::test]
    async fn test_vote_granting() {
        let node = KvStore::new_with_role(NodeRole::Follower, "node1".to_string());

        // First vote request (term 2)
        let granted = node.handle_vote_request(2, "candidate1".to_string()).await;
        assert!(granted);

        // Second vote request same term (should reject)
        let granted = node.handle_vote_request(2, "candidate2".to_string()).await;
        assert!(!granted);

        // Higher term (should grant)
        let granted = node.handle_vote_request(3, "candidate2".to_string()).await;
        assert!(granted);
    }

    #[tokio::test]
    async fn test_majority_election() {
        // Create 3-node cluster
        let node1 = Arc::new(KvStore::new_with_id("node1".to_string()));
        let node2 = Arc::new(KvStore::new_with_id("node2".to_string()));
        let node3 = Arc::new(KvStore::new_with_id("node3".to_string()));

        // node1 starts election
        node1.start_election().await;

        // node2 and node3 grant votes
        let vote2 = node2.handle_vote_request(1, "node1".to_string()).await;
        let vote3 = node3.handle_vote_request(1, "node1".to_string()).await;

        assert!(vote2);
        assert!(vote3);

        // node1 should become leader (has majority: 3/3)
        let state = node1.state.read().await;
        assert_eq!(state.role, NodeRole::Leader);
    }

    #[tokio::test]
    async fn test_split_vote_retry() {
        // Create 4-node cluster
        let nodes = vec![
            Arc::new(KvStore::new_with_id("node1".to_string())),
            Arc::new(KvStore::new_with_id("node2".to_string())),
            Arc::new(KvStore::new_with_id("node3".to_string())),
            Arc::new(KvStore::new_with_id("node4".to_string())),
        ];

        // node1 and node2 both start election simultaneously
        tokio::join!(
            nodes[0].start_election(),
            nodes[1].start_election(),
        );

        // Split vote: each gets 2 votes (self + 1 other)
        // No majority (need 3/4)

        // Should timeout and retry with higher term
        sleep(Duration::from_secs(6)).await;

        // Eventually one should become leader
    }
}
}

Starter Code

#![allow(unused)]
fn main() {
use std::time::{Duration, Instant};

#[derive(Debug, Clone, Copy, PartialEq)]
enum NodeRole {
    Leader,
    Follower,
    Candidate,
}

struct NodeState {
    role: NodeRole,
    current_term: u64,
    voted_for: Option<String>,
    leader_id: Option<String>,
    last_heartbeat: Instant,
}

impl KvStore {
    async fn start_election(&self) {
        println!("[{}] Starting election", self.node_id);

        // TODO: Increment term and vote for self
        let mut state = self.state.write().await;
        state.current_term += 1;
        state.role = NodeRole::Candidate;
        state.voted_for = Some(self.node_id.clone());
        let term = state.current_term;
        drop(state);

        // TODO: Request votes from all peers
        let peers = self.peers.read().await.clone();
        let mut votes = 1; // Vote for self

        for peer in peers.iter() {
            // TODO: Send VOTE_REQUEST to peer
            if let Ok(granted) = send_vote_request(peer, term, &self.node_id).await {
                if granted {
                    votes += 1;
                }
            }
        }

        // TODO: Check if we have majority
        let total_nodes = peers.len() + 1;
        let majority = total_nodes / 2 + 1;

        if votes >= majority {
            // TODO: Become leader
            let mut state = self.state.write().await;
            state.role = NodeRole::Leader;
            state.leader_id = Some(self.node_id.clone());
            println!("[{}] Became leader (term {})", self.node_id, term);
        } else {
            // TODO: Revert to follower
            let mut state = self.state.write().await;
            state.role = NodeRole::Follower;
            state.voted_for = None;
        }
    }

    async fn run_heartbeat_loop(&self) {
        let mut interval = tokio::time::interval(Duration::from_secs(1));

        loop {
            interval.tick().await;

            let state = self.state.read().await;
            if state.role != NodeRole::Leader {
                break; // Stop if no longer leader
            }
            let term = state.current_term;
            drop(state);

            // TODO: Send heartbeat to all peers
            let peers = self.peers.read().await;
            for peer in peers.iter() {
                send_heartbeat(peer, term, &self.node_id).await.ok();
            }
        }
    }

    async fn run_election_timeout(&self) {
        let timeout_duration = Duration::from_secs(5);

        loop {
            tokio::time::sleep(Duration::from_secs(1)).await;

            let state = self.state.read().await;
            if state.role == NodeRole::Leader {
                continue; // Leaders don't timeout
            }

            let elapsed = state.last_heartbeat.elapsed();
            drop(state);

            // TODO: If timeout, start election
            if elapsed > timeout_duration {
                self.start_election().await;
            }
        }
    }

    async fn handle_vote_request(&self, term: u64, candidate_id: String) -> bool {
        let mut state = self.state.write().await;

        // TODO: Grant vote if term is newer and haven't voted
        if term > state.current_term {
            state.current_term = term;
            state.voted_for = Some(candidate_id.clone());
            state.role = NodeRole::Follower;
            return true;
        }

        if term == state.current_term && state.voted_for.is_none() {
            state.voted_for = Some(candidate_id);
            return true;
        }

        false
    }

    async fn handle_heartbeat(&self, term: u64, leader_id: String) {
        let mut state = self.state.write().await;

        // TODO: Update term and reset timeout
        if term >= state.current_term {
            state.current_term = term;
            state.role = NodeRole::Follower;
            state.leader_id = Some(leader_id);
            state.last_heartbeat = Instant::now();
        }
    }
}

async fn send_vote_request(peer: &str, term: u64, candidate_id: &str) -> io::Result<bool> {
    // TODO: Connect to peer and send VOTE_REQUEST
    // Format: VOTE_REQUEST term=X candidate=Y
    todo!();
}

async fn send_heartbeat(peer: &str, term: u64, leader_id: &str) -> io::Result<()> {
    // TODO: Connect to peer and send HEARTBEAT
    // Format: HEARTBEAT term=X leader=Y
    todo!();
}
}

Check Your Understanding

  • What triggers a leader election? Follower doesn’t receive heartbeat within timeout period.
  • How does a node decide who to vote for? Grants vote if term is newer and hasn’t voted this term yet.
  • What is split brain? Two nodes think they’re leader (prevented by majority quorum).
  • Why send heartbeats? Maintain leadership and prevent unnecessary elections.
  • What happens if no majority? Election times out, nodes retry with higher term.

Why Milestone 5 Isn’t Enough → Moving to Milestone 6

Limitation: Client Complexity

  • Clients must manually track which node is leader
  • Need to retry on different node if leader changes
  • Connection overhead (new TCP connection per request)
  • No load balancing across replicas for reads

What We’re Adding:

  • Client connection pool: Reuse TCP connections (avoid handshake overhead)
  • Smart routing: Automatically send writes to leader, reads to any node
  • Automatic failover: Retry on different node if leader changes
  • Read load balancing: Distribute reads across all replicas

Improvement:

  • Performance: New connection (3-way handshake) → pooled connection (instant)
  • Throughput: 1K req/sec → 10K req/sec (connection reuse)
  • Availability: Manual retry → automatic failover
  • Read scaling: All reads to leader → distributed across N replicas

Client Architecture:

Client → Pool[Leader, Replica1, Replica2]
           ├─ GET key → Replica2 (load balanced)
           └─ SET key → Leader (routed)

Milestone 6: Client Connection Pool and Smart Routing

Introduction

The Problem: Creating new TCP connections is expensive (3-way handshake = 1-3ms).

The Solution: Connection Pooling

  • Maintain pool of open connections to each node
  • Reuse connections for multiple requests
  • Route writes to leader, reads to any replica
  • Automatically detect leader changes and re-route

Connection Pool Benefits:

  • Latency: 3ms (new connection) → 0.1ms (pooled)
  • Throughput: 300 req/sec → 10K req/sec per client
  • Efficiency: No handshake overhead, TCP window already tuned

Key Concepts

Structs:

#![allow(unused)]
fn main() {
struct KvClient {
    pools: HashMap<String, ConnectionPool>,
    leader_addr: Arc<RwLock<Option<String>>>,
    replica_addrs: Vec<String>,
}

struct ConnectionPool {
    address: String,
    available: Arc<Mutex<VecDeque<TcpStream>>>,
    max_size: usize,
}

struct PooledConnection {
    stream: Option<TcpStream>,
    pool: Arc<Mutex<VecDeque<TcpStream>>>,
}
}

Functions:

#![allow(unused)]
fn main() {
impl ConnectionPool {
    async fn acquire(&self) -> io::Result<PooledConnection>
        // Try to reuse connection from pool
        // If none available: create new connection
        // Return PooledConnection (returns to pool on drop)

    async fn release(&self, stream: TcpStream)
        // Return connection to pool
}

impl KvClient {
    async fn get(&self, key: &str) -> io::Result<Option<String>>
        // Pick random replica (load balancing)
        // Acquire connection from pool
        // Send GET command
        // Return value

    async fn set(&self, key: String, value: String) -> io::Result<()>
        // Get leader address
        // Acquire connection to leader
        // Send SET command
        // Handle NOT_LEADER error (retry on new leader)

    async fn discover_leader(&self) -> io::Result<String>
        // Ask any node who the leader is
        // Update cached leader address
}
}

Protocol Extensions:

  • WHO_IS_LEADERLEADER node1.example.com:6379
  • SET key valueNOT_LEADER leader=node2:6379 (redirect)

Checkpoint Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_connection_pool_reuse() {
        let pool = ConnectionPool::new("127.0.0.1:9601".to_string(), 10);

        // Acquire connection
        let conn1 = pool.acquire().await.unwrap();
        let stream_addr = format!("{:p}", &conn1.stream);
        drop(conn1); // Return to pool

        // Acquire again - should be same connection
        let conn2 = pool.acquire().await.unwrap();
        let stream_addr2 = format!("{:p}", &conn2.stream);

        assert_eq!(stream_addr, stream_addr2); // Same connection reused
    }

    #[tokio::test]
    async fn test_pool_max_size() {
        let pool = ConnectionPool::new("127.0.0.1:9602".to_string(), 2);

        let _conn1 = pool.acquire().await.unwrap();
        let _conn2 = pool.acquire().await.unwrap();

        // Pool is at max size, should create new connection (not pool it on return)
        let _conn3 = pool.acquire().await.unwrap();
    }

    #[tokio::test]
    async fn test_client_get_with_pool() {
        // Start server
        tokio::spawn(async {
            run_kv_server("127.0.0.1:9603").await.unwrap();
        });
        sleep(Duration::from_millis(100)).await;

        let client = KvClient::new(vec!["127.0.0.1:9603".to_string()]);

        // First GET (creates connection)
        let start = Instant::now();
        client.get("key1").await.unwrap();
        let first_duration = start.elapsed();

        // Second GET (reuses connection)
        let start = Instant::now();
        client.get("key2").await.unwrap();
        let second_duration = start.elapsed();

        // Second should be faster (no handshake)
        println!("First: {:?}, Second: {:?}", first_duration, second_duration);
    }

    #[tokio::test]
    async fn test_smart_routing_to_leader() {
        // Start 3-node cluster
        let leader = start_node_as_leader("127.0.0.1:9604").await;
        let replica1 = start_node_as_replica("127.0.0.1:9605").await;
        let replica2 = start_node_as_replica("127.0.0.1:9606").await;

        let client = KvClient::new(vec![
            "127.0.0.1:9604".to_string(),
            "127.0.0.1:9605".to_string(),
            "127.0.0.1:9606".to_string(),
        ]);

        // SET should go to leader
        client.set("key".to_string(), "value".to_string()).await.unwrap();

        // Verify write reached leader
        assert_eq!(leader.get("key").await, Some("value".to_string()));
    }

    #[tokio::test]
    async fn test_read_load_balancing() {
        let client = KvClient::new(vec![
            "127.0.0.1:9607".to_string(),
            "127.0.0.1:9608".to_string(),
            "127.0.0.1:9609".to_string(),
        ]);

        // Track which replicas were used
        let mut replica_usage = HashMap::new();

        for _ in 0..30 {
            let replica = client.pick_read_replica().await;
            *replica_usage.entry(replica).or_insert(0) += 1;
        }

        // Should have distributed reads across multiple replicas
        assert!(replica_usage.len() > 1);
    }

    #[tokio::test]
    async fn test_automatic_leader_failover() {
        // Start 3-node cluster
        let leader = start_node_as_leader("127.0.0.1:9610").await;
        let replica1 = start_node_as_replica("127.0.0.1:9611").await;

        let client = KvClient::new(vec![
            "127.0.0.1:9610".to_string(),
            "127.0.0.1:9611".to_string(),
        ]);

        // Write succeeds to leader
        client.set("key1".to_string(), "value1".to_string()).await.unwrap();

        // Simulate leader crash
        drop(leader);

        // replica1 should become new leader
        sleep(Duration::from_secs(6)).await; // Election timeout

        // Client should discover new leader and succeed
        client.set("key2".to_string(), "value2".to_string()).await.unwrap();
    }
}
}

Starter Code

#![allow(unused)]
fn main() {
use std::collections::{HashMap, VecDeque};
use tokio::sync::Mutex;
use rand::Rng;

struct ConnectionPool {
    address: String,
    available: Arc<Mutex<VecDeque<TcpStream>>>,
    max_size: usize,
}

impl ConnectionPool {
    fn new(address: String, max_size: usize) -> Self {
        ConnectionPool {
            address,
            available: Arc::new(Mutex::new(VecDeque::new())),
            max_size,
        }
    }

    async fn acquire(&self) -> io::Result<PooledConnection> {
        // TODO: Try to get connection from pool
        let mut pool = self.available.lock().await;

        if let Some(stream) = pool.pop_front() {
            return Ok(PooledConnection {
                stream: Some(stream),
                pool: self.available.clone(),
            });
        }
        drop(pool);

        // TODO: No available connection - create new one
        let stream = TcpStream::connect(&self.address).await?;

        Ok(PooledConnection {
            stream: Some(stream),
            pool: self.available.clone(),
        })
    }
}

struct PooledConnection {
    stream: Option<TcpStream>,
    pool: Arc<Mutex<VecDeque<TcpStream>>>,
}

impl Drop for PooledConnection {
    fn drop(&mut self) {
        // TODO: Return connection to pool
        if let Some(stream) = self.stream.take() {
            let pool = self.pool.clone();
            tokio::spawn(async move {
                let mut pool = pool.lock().await;
                pool.push_back(stream);
            });
        }
    }
}

impl std::ops::Deref for PooledConnection {
    type Target = TcpStream;
    fn deref(&self) -> &Self::Target {
        self.stream.as_ref().unwrap()
    }
}

impl std::ops::DerefMut for PooledConnection {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.stream.as_mut().unwrap()
    }
}

struct KvClient {
    pools: HashMap<String, ConnectionPool>,
    leader_addr: Arc<RwLock<Option<String>>>,
    replica_addrs: Vec<String>,
}

impl KvClient {
    fn new(addrs: Vec<String>) -> Self {
        let mut pools = HashMap::new();
        for addr in &addrs {
            pools.insert(addr.clone(), ConnectionPool::new(addr.clone(), 10));
        }

        KvClient {
            pools,
            leader_addr: Arc::new(RwLock::new(None)),
            replica_addrs: addrs,
        }
    }

    async fn get(&self, key: &str) -> io::Result<Option<String>> {
        // TODO: Pick random replica for read load balancing
        let replica = self.pick_read_replica().await;

        // TODO: Acquire connection from pool
        let pool = self.pools.get(&replica).unwrap();
        let mut conn = pool.acquire().await?;

        // TODO: Send GET command
        conn.write_all(format!("GET {}\n", key).as_bytes()).await?;

        // TODO: Read response
        let mut buf = [0u8; 4096];
        let n = conn.read(&mut buf).await?;
        let response = String::from_utf8_lossy(&buf[..n]);

        // TODO: Parse response
        if let Some(value) = response.strip_prefix("VALUE ") {
            Ok(Some(value.trim().to_string()))
        } else {
            Ok(None)
        }
    }

    async fn set(&self, key: String, value: String) -> io::Result<()> {
        // TODO: Discover leader if not known
        let leader = match self.leader_addr.read().await.clone() {
            Some(addr) => addr,
            None => self.discover_leader().await?,
        };

        // TODO: Acquire connection to leader
        let pool = self.pools.get(&leader).unwrap();
        let mut conn = pool.acquire().await?;

        // TODO: Send SET command
        conn.write_all(format!("SET {} {}\n", key, value).as_bytes()).await?;

        // TODO: Read response
        let mut buf = [0u8; 1024];
        let n = conn.read(&mut buf).await?;
        let response = String::from_utf8_lossy(&buf[..n]);

        // TODO: Handle NOT_LEADER redirect
        if response.contains("NOT_LEADER") {
            // Extract new leader address and retry
            // self.leader_addr.write().await = Some(new_leader);
            // return self.set(key, value).await;
            todo!();
        }

        if response.starts_with("OK") {
            Ok(())
        } else {
            Err(io::Error::new(io::ErrorKind::Other, "SET failed"))
        }
    }

    async fn discover_leader(&self) -> io::Result<String> {
        // TODO: Ask any replica who the leader is
        for addr in &self.replica_addrs {
            if let Ok(leader) = self.query_leader(addr).await {
                *self.leader_addr.write().await = Some(leader.clone());
                return Ok(leader);
            }
        }

        Err(io::Error::new(io::ErrorKind::Other, "No leader found"))
    }

    async fn query_leader(&self, addr: &str) -> io::Result<String> {
        // TODO: Send WHO_IS_LEADER command
        let pool = self.pools.get(addr).unwrap();
        let mut conn = pool.acquire().await?;

        conn.write_all(b"WHO_IS_LEADER\n").await?;

        let mut buf = [0u8; 1024];
        let n = conn.read(&mut buf).await?;
        let response = String::from_utf8_lossy(&buf[..n]);

        if let Some(leader) = response.strip_prefix("LEADER ") {
            Ok(leader.trim().to_string())
        } else {
            Err(io::Error::new(io::ErrorKind::Other, "Unknown leader"))
        }
    }

    async fn pick_read_replica(&self) -> String {
        // TODO: Random load balancing
        let mut rng = rand::thread_rng();
        let idx = rng.gen_range(0..self.replica_addrs.len());
        self.replica_addrs[idx].clone()
    }
}
}

Check Your Understanding

  • What is connection pooling? Reusing TCP connections across multiple requests instead of creating new ones.
  • Why is pooling faster? Avoids TCP handshake (SYN, SYN-ACK, ACK) which takes 1-3ms.
  • How does smart routing work? Writes go to leader, reads go to any replica (load balanced).
  • What happens if leader changes? Client receives NOT_LEADER redirect, updates cached leader address, retries.
  • How much faster is pooling? ~10-30x for small requests (handshake overhead eliminated).

Complete Working Example

Below is a simplified but functional distributed key-value store with replication and leader election:

use std::collections::HashMap;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, RwLock};
use tokio::time::{interval, sleep};

//================================================
// Milestone 1: In-Memory KV Store (TCP Protocol)
//================================================

struct KvStore {
    data: Arc<RwLock<HashMap<String, String>>>,
}

#[derive(Debug, Clone)]
enum Command {
    Get { key: String },
    Set { key: String, value: String },
    Delete { key: String },
}

enum Response {
    Ok,
    Value { data: String },
    NotFound,
    Error { msg: String },
}

impl KvStore {
    fn new() -> Self {
        KvStore {
            data: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    async fn get(&self, key: &str) -> Option<String> {
        let data = self.data.read().await;
        data.get(key).cloned()
    }

    async fn set(&self, key: String, value: String) {
        let mut data = self.data.write().await;
        data.insert(key, value);
    }

    async fn delete(&self, key: &str) -> bool {
        let mut data = self.data.write().await;
        data.remove(key).is_some()
    }
}

impl Response {
    fn to_string(&self) -> String {
        match self {
            Response::Ok => "OK\n".to_string(),
            Response::Value { data } => format!("VALUE {}\n", data),
            Response::NotFound => "NOT_FOUND\n".to_string(),
            Response::Error { msg } => format!("ERROR {}\n", msg),
        }
    }
}

fn parse_command(line: &str) -> Result<Command, String> {
    let parts: Vec<&str> = line.trim().splitn(3, ' ').collect();

    match parts.as_slice() {
        ["GET", key] => Ok(Command::Get {
            key: key.to_string(),
        }),
        ["SET", key, value] => Ok(Command::Set {
            key: key.to_string(),
            value: value.to_string(),
        }),
        ["DELETE", key] => Ok(Command::Delete {
            key: key.to_string(),
        }),
        _ => Err("Invalid command".to_string()),
    }
}

async fn run_kv_server(addr: &str) -> tokio::io::Result<()> {
    let store = Arc::new(KvStore::new());
    let listener = TcpListener::bind(addr).await?;

    println!("KV store listening on {}", addr);

    loop {
        let (stream, addr) = listener.accept().await?;
        let store = store.clone();

        tokio::spawn(async move {
            if let Err(e) = handle_client(stream, store).await {
                eprintln!("Client {} error: {}", addr, e);
            }
        });
    }
}

async fn handle_client(stream: TcpStream, store: Arc<KvStore>) -> tokio::io::Result<()> {
    let (reader, mut writer) = stream.into_split();
    let mut reader = BufReader::new(reader);
    let mut line = String::new();

    loop {
        line.clear();

        let bytes_read = reader.read_line(&mut line).await?;

        if bytes_read == 0 {
            break;
        }

        let command = match parse_command(&line) {
            Ok(cmd) => cmd,
            Err(e) => {
                writer
                    .write_all(Response::Error { msg: e }.to_string().as_bytes())
                    .await?;
                continue;
            }
        };

        let response = match command {
            Command::Get { key } => {
                if let Some(value) = store.get(&key).await {
                    Response::Value { data: value }
                } else {
                    Response::NotFound
                }
            }
            Command::Set { key, value } => {
                store.set(key, value).await;
                Response::Ok
            }
            Command::Delete { key } => {
                if store.delete(&key).await {
                    Response::Ok
                } else {
                    Response::NotFound
                }
            }
        };

        writer.write_all(response.to_string().as_bytes()).await?;
    }

    Ok(())
}

//================================================
// Milestone 2: Persistence with Write-Ahead Log (WAL)
//================================================

#[derive(Debug, Clone)]
struct WalEntry {
    command: Command,
    timestamp: u64,
}

struct WriteAheadLog {
    file: File,
    path: PathBuf,
}

impl WriteAheadLog {
    async fn new(path: PathBuf) -> io::Result<Self> {
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .read(true)
            .open(&path)
            .await?;

        Ok(WriteAheadLog { file, path })
    }

    async fn append(&mut self, entry: &WalEntry) -> io::Result<()> {
        let line = match &entry.command {
            Command::Set { key, value } => format!("SET {} {}\n", key, value),
            Command::Delete { key } => format!("DELETE {}\n", key),
            Command::Get { .. } => return Ok(()),
        };

        self.file.write_all(line.as_bytes()).await?;
        self.file.sync_all().await?;

        Ok(())
    }

    async fn replay(&self) -> io::Result<Vec<WalEntry>> {
        let mut file = File::open(&self.path).await?;
        let mut contents = String::new();
        file.read_to_string(&mut contents).await?;

        let mut entries = Vec::new();
        for (idx, line) in contents.lines().enumerate() {
            if let Ok(command) = parse_command(line) {
                entries.push(WalEntry {
                    command,
                    timestamp: idx as u64,
                });
            }
        }

        Ok(entries)
    }
}

struct KvStoreWithWal {
    data: Arc<RwLock<HashMap<String, String>>>,
    wal: Arc<RwLock<WriteAheadLog>>,
}

impl KvStoreWithWal {
    async fn new_with_wal(wal_path: PathBuf) -> io::Result<Self> {
        let wal = WriteAheadLog::new(wal_path).await?;

        let entries = wal.replay().await?;

        let mut data = HashMap::new();
        for entry in entries {
            match entry.command {
                Command::Set { key, value } => {
                    data.insert(key, value);
                }
                Command::Delete { key } => {
                    data.remove(&key);
                }
                _ => {}
            }
        }

        Ok(KvStoreWithWal {
            data: Arc::new(RwLock::new(data)),
            wal: Arc::new(RwLock::new(wal)),
        })
    }

    async fn get(&self, key: &str) -> Option<String> {
        let data = self.data.read().await;
        data.get(key).cloned()
    }

    async fn set_durable(&self, key: String, value: String) -> io::Result<()> {
        let mut wal = self.wal.write().await;
        wal.append(&WalEntry {
            command: Command::Set {
                key: key.clone(),
                value: value.clone(),
            },
            timestamp: 0,
        })
        .await?;
        drop(wal);

        self.data.write().await.insert(key, value);

        Ok(())
    }

    async fn delete_durable(&self, key: &str) -> io::Result<bool> {
        let mut wal = self.wal.write().await;
        wal.append(&WalEntry {
            command: Command::Delete {
                key: key.to_string(),
            },
            timestamp: 0,
        })
        .await?;
        drop(wal);

        let existed = self.data.write().await.remove(key).is_some();
        Ok(existed)
    }
}

//================================================
// Milestone 3: Async Replication (Master-Replica)
//================================================

struct ReplicatedKvStore {
    data: Arc<RwLock<HashMap<String, String>>>,
    wal: Arc<RwLock<WriteAheadLog>>,
    replicas: Vec<String>,
}

impl ReplicatedKvStore {
    async fn new_replicated(wal_path: PathBuf, replicas: Vec<String>) -> io::Result<Self> {
        let wal = WriteAheadLog::new(wal_path).await?;
        let entries = wal.replay().await?;

        let mut data = HashMap::new();
        for entry in entries {
            match entry.command {
                Command::Set { key, value } => {
                    data.insert(key, value);
                }
                Command::Delete { key } => {
                    data.remove(&key);
                }
                _ => {}
            }
        }

        Ok(ReplicatedKvStore {
            data: Arc::new(RwLock::new(data)),
            wal: Arc::new(RwLock::new(wal)),
            replicas,
        })
    }

    async fn get(&self, key: &str) -> Option<String> {
        self.data.read().await.get(key).cloned()
    }

    async fn set_replicated(&self, key: String, value: String) -> io::Result<()> {
        // Write to WAL
        let mut wal = self.wal.write().await;
        wal.append(&WalEntry {
            command: Command::Set {
                key: key.clone(),
                value: value.clone(),
            },
            timestamp: 0,
        })
        .await?;
        drop(wal);

        // Update in-memory
        self.data.write().await.insert(key.clone(), value.clone());

        // Async replicate (fire and forget)
        let replicas = self.replicas.clone();
        tokio::spawn(async move {
            for replica in replicas {
                if let Ok(mut stream) = TcpStream::connect(&replica).await {
                    let cmd = format!("REPLICATE SET {} {}\n", key, value);
                    let _ = stream.write_all(cmd.as_bytes()).await;
                }
            }
        });

        Ok(())
    }

    async fn delete_replicated(&self, key: &str) -> io::Result<bool> {
        // Write to WAL
        let mut wal = self.wal.write().await;
        wal.append(&WalEntry {
            command: Command::Delete {
                key: key.to_string(),
            },
            timestamp: 0,
        })
        .await?;
        drop(wal);

        // Update in-memory
        let existed = self.data.write().await.remove(key).is_some();

        // Async replicate
        let replicas = self.replicas.clone();
        let key = key.to_string();
        tokio::spawn(async move {
            for replica in replicas {
                if let Ok(mut stream) = TcpStream::connect(&replica).await {
                    let cmd = format!("REPLICATE DELETE {}\n", key);
                    let _ = stream.write_all(cmd.as_bytes()).await;
                }
            }
        });

        Ok(existed)
    }
}

//================================================
// Milestone 4: Synchronous Replication with Quorum Writes
//================================================

struct QuorumKvStore {
    data: Arc<RwLock<HashMap<String, String>>>,
    wal: Arc<RwLock<WriteAheadLog>>,
    replicas: Vec<String>,
    write_quorum: usize,
}

impl QuorumKvStore {
    async fn new_quorum(
        wal_path: PathBuf,
        replicas: Vec<String>,
        write_quorum: usize,
    ) -> io::Result<Self> {
        let wal = WriteAheadLog::new(wal_path).await?;
        let entries = wal.replay().await?;

        let mut data = HashMap::new();
        for entry in entries {
            match entry.command {
                Command::Set { key, value } => {
                    data.insert(key, value);
                }
                Command::Delete { key } => {
                    data.remove(&key);
                }
                _ => {}
            }
        }

        Ok(QuorumKvStore {
            data: Arc::new(RwLock::new(data)),
            wal: Arc::new(RwLock::new(wal)),
            replicas,
            write_quorum,
        })
    }

    async fn get(&self, key: &str) -> Option<String> {
        self.data.read().await.get(key).cloned()
    }

    async fn set_quorum(&self, key: String, value: String) -> io::Result<()> {
        // Write to WAL
        let mut wal = self.wal.write().await;
        wal.append(&WalEntry {
            command: Command::Set {
                key: key.clone(),
                value: value.clone(),
            },
            timestamp: 0,
        })
        .await?;
        drop(wal);

        // Update in-memory
        self.data.write().await.insert(key.clone(), value.clone());

        // Synchronous replication with quorum
        let (tx, mut rx) = mpsc::channel(self.replicas.len());

        for replica in &self.replicas {
            let replica = replica.clone();
            let key = key.clone();
            let value = value.clone();
            let tx = tx.clone();

            tokio::spawn(async move {
                let result = async {
                    let mut stream = TcpStream::connect(&replica).await?;
                    let cmd = format!("REPLICATE SET {} {}\n", key, value);
                    stream.write_all(cmd.as_bytes()).await?;

                    // Wait for ACK
                    let mut buf = [0u8; 1024];
                    let n = stream.read(&mut buf).await?;
                    let response = String::from_utf8_lossy(&buf[..n]);

                    if response.contains("OK") {
                        Ok(())
                    } else {
                        Err(io::Error::new(io::ErrorKind::Other, "Replica NACK"))
                    }
                }
                .await;

                let _ = tx.send(result).await;
            });
        }

        drop(tx);

        // Wait for quorum
        let mut acks = 1; // Master itself counts
        while let Some(result) = rx.recv().await {
            if result.is_ok() {
                acks += 1;
                if acks >= self.write_quorum {
                    return Ok(());
                }
            }
        }

        if acks >= self.write_quorum {
            Ok(())
        } else {
            Err(io::Error::new(
                io::ErrorKind::Other,
                format!("Failed to reach quorum: {} < {}", acks, self.write_quorum),
            ))
        }
    }

    async fn delete_quorum(&self, key: &str) -> io::Result<bool> {
        // Write to WAL
        let mut wal = self.wal.write().await;
        wal.append(&WalEntry {
            command: Command::Delete {
                key: key.to_string(),
            },
            timestamp: 0,
        })
        .await?;
        drop(wal);

        // Update in-memory
        let existed = self.data.write().await.remove(key).is_some();

        // Synchronous replication with quorum
        let (tx, mut rx) = mpsc::channel(self.replicas.len());

        for replica in &self.replicas {
            let replica = replica.clone();
            let key = key.to_string();
            let tx = tx.clone();

            tokio::spawn(async move {
                let result = async {
                    let mut stream = TcpStream::connect(&replica).await?;
                    let cmd = format!("REPLICATE DELETE {}\n", key);
                    stream.write_all(cmd.as_bytes()).await?;

                    let mut buf = [0u8; 1024];
                    let n = stream.read(&mut buf).await?;
                    let response = String::from_utf8_lossy(&buf[..n]);

                    if response.contains("OK") {
                        Ok(())
                    } else {
                        Err(io::Error::new(io::ErrorKind::Other, "Replica NACK"))
                    }
                }
                .await;

                let _ = tx.send(result).await;
            });
        }

        drop(tx);

        let mut acks = 1;
        while let Some(result) = rx.recv().await {
            if result.is_ok() {
                acks += 1;
                if acks >= self.write_quorum {
                    return Ok(existed);
                }
            }
        }

        if acks >= self.write_quorum {
            Ok(existed)
        } else {
            Err(io::Error::new(
                io::ErrorKind::Other,
                format!("Failed to reach quorum: {} < {}", acks, self.write_quorum),
            ))
        }
    }
}

//================================================
// Milestone 5: Leader Election (Simplified Raft)
//================================================

#[derive(Debug, Clone, Copy, PartialEq)]
enum NodeRole {
    Follower,
    Candidate,
    Leader,
}

struct RaftNode {
    data: Arc<RwLock<HashMap<String, String>>>,
    role: Arc<RwLock<NodeRole>>,
    term: Arc<RwLock<u64>>,
    voted_for: Arc<RwLock<Option<String>>>,
    peers: Vec<String>,
    node_id: String,
}

impl RaftNode {
    fn new(node_id: String, peers: Vec<String>) -> Self {
        RaftNode {
            data: Arc::new(RwLock::new(HashMap::new())),
            role: Arc::new(RwLock::new(NodeRole::Follower)),
            term: Arc::new(RwLock::new(0)),
            voted_for: Arc::new(RwLock::new(None)),
            peers,
            node_id,
        }
    }

    async fn start_election(&self) -> bool {
        // Become candidate
        *self.role.write().await = NodeRole::Candidate;
        let mut current_term = self.term.write().await;
        *current_term += 1;
        let term = *current_term;
        drop(current_term);

        // Vote for self
        *self.voted_for.write().await = Some(self.node_id.clone());
        let mut votes = 1;

        // If no peers, we have majority
        if self.peers.is_empty() {
            *self.role.write().await = NodeRole::Leader;
            return true;
        }

        // Request votes from peers
        let (tx, mut rx) = mpsc::channel(self.peers.len());

        for peer in &self.peers {
            let peer = peer.clone();
            let node_id = self.node_id.clone();
            let tx = tx.clone();

            tokio::spawn(async move {
                let vote_granted = async {
                    let mut stream = TcpStream::connect(&peer).await.ok()?;
                    let request = format!("VOTE_REQUEST {} {}\n", node_id, term);
                    stream.write_all(request.as_bytes()).await.ok()?;

                    let mut buf = [0u8; 1024];
                    let n = stream.read(&mut buf).await.ok()?;
                    let response = String::from_utf8_lossy(&buf[..n]);

                    if response.contains("VOTE_GRANTED") {
                        Some(true)
                    } else {
                        Some(false)
                    }
                }
                .await;

                let _ = tx.send(vote_granted.unwrap_or(false)).await;
            });
        }

        drop(tx);

        // Count votes
        let majority = (self.peers.len() + 1) / 2 + 1;

        while let Some(vote) = rx.recv().await {
            if vote {
                votes += 1;
                if votes >= majority {
                    *self.role.write().await = NodeRole::Leader;
                    return true;
                }
            }
        }

        votes >= majority
    }

    async fn is_leader(&self) -> bool {
        *self.role.read().await == NodeRole::Leader
    }

    async fn send_heartbeat(&self) {
        let term = *self.term.read().await;

        for peer in &self.peers {
            let peer = peer.clone();
            let node_id = self.node_id.clone();

            tokio::spawn(async move {
                if let Ok(mut stream) = TcpStream::connect(&peer).await {
                    let heartbeat = format!("HEARTBEAT {} {}\n", node_id, term);
                    let _ = stream.write_all(heartbeat.as_bytes()).await;
                }
            });
        }
    }

    async fn get(&self, key: &str) -> Option<String> {
        self.data.read().await.get(key).cloned()
    }

    async fn set(&self, key: String, value: String) -> Result<(), String> {
        if !self.is_leader().await {
            return Err("Not leader".to_string());
        }

        self.data.write().await.insert(key, value);
        Ok(())
    }
}

async fn run_raft_node(node: Arc<RaftNode>) {
    let heartbeat_interval = Duration::from_secs(1);
    let election_timeout = Duration::from_secs(5);

    tokio::spawn({
        let node = node.clone();
        async move {
            let mut heartbeat_timer = interval(heartbeat_interval);
            loop {
                heartbeat_timer.tick().await;
                if node.is_leader().await {
                    node.send_heartbeat().await;
                }
            }
        }
    });

    tokio::spawn({
        let node = node.clone();
        async move {
            sleep(election_timeout).await;
            if !node.is_leader().await {
                node.start_election().await;
            }
        }
    });
}

//================================================
// Milestone 6: Client Connection Pool and Smart Routing
//================================================

struct ConnectionPool {
    addr: String,
    pool: Arc<RwLock<Vec<TcpStream>>>,
    max_size: usize,
}

impl ConnectionPool {
    fn new(addr: String, max_size: usize) -> Self {
        ConnectionPool {
            addr,
            pool: Arc::new(RwLock::new(Vec::new())),
            max_size,
        }
    }

    async fn acquire(&self) -> io::Result<TcpStream> {
        let mut pool = self.pool.write().await;

        if let Some(stream) = pool.pop() {
            Ok(stream)
        } else {
            TcpStream::connect(&self.addr).await
        }
    }

    async fn release(&self, stream: TcpStream) {
        let mut pool = self.pool.write().await;
        if pool.len() < self.max_size {
            pool.push(stream);
        }
    }
}

struct SmartKvClient {
    pools: HashMap<String, ConnectionPool>,
    leader: Arc<RwLock<Option<String>>>,
    replicas: Vec<String>,
}

impl SmartKvClient {
    fn new(servers: Vec<String>) -> Self {
        let mut pools = HashMap::new();
        for server in &servers {
            pools.insert(server.clone(), ConnectionPool::new(server.clone(), 10));
        }

        SmartKvClient {
            pools,
            leader: Arc::new(RwLock::new(None)),
            replicas: servers,
        }
    }

    async fn discover_leader(&self) -> Option<String> {
        for server in &self.replicas {
            if let Ok(mut stream) = TcpStream::connect(server).await {
                if stream.write_all(b"WHO_IS_LEADER\n").await.is_ok() {
                    let mut buf = [0u8; 1024];
                    if let Ok(n) = stream.read(&mut buf).await {
                        let response = String::from_utf8_lossy(&buf[..n]);
                        if let Some(leader) = response.strip_prefix("LEADER ") {
                            return Some(leader.trim().to_string());
                        }
                    }
                }
            }
        }
        None
    }

    async fn get(&self, key: &str) -> io::Result<Option<String>> {
        // Pick random replica for read
        use rand::seq::SliceRandom;
        let replica = self
            .replicas
            .choose(&mut rand::thread_rng())
            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "No replicas"))?;

        let pool = self
            .pools
            .get(replica)
            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Pool not found"))?;

        let mut stream = pool.acquire().await?;
        stream
            .write_all(format!("GET {}\n", key).as_bytes())
            .await?;

        let mut buf = [0u8; 1024];
        let n = stream.read(&mut buf).await?;
        let response = String::from_utf8_lossy(&buf[..n]);

        pool.release(stream).await;

        if let Some(value) = response.strip_prefix("VALUE ") {
            Ok(Some(value.trim().to_string()))
        } else if response.contains("NOT_FOUND") {
            Ok(None)
        } else {
            Err(io::Error::new(io::ErrorKind::Other, "Invalid response"))
        }
    }

    async fn set(&self, key: String, value: String) -> io::Result<()> {
        loop {
            // Get or discover leader
            let leader = {
                let cached = self.leader.read().await;
                if let Some(l) = cached.as_ref() {
                    Some(l.clone())
                } else {
                    None
                }
            };

            let leader = match leader {
                Some(l) => l,
                None => {
                    if let Some(l) = self.discover_leader().await {
                        *self.leader.write().await = Some(l.clone());
                        l
                    } else {
                        return Err(io::Error::new(io::ErrorKind::NotFound, "No leader"));
                    }
                }
            };

            let pool = self
                .pools
                .get(&leader)
                .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Pool not found"))?;

            let mut stream = pool.acquire().await?;
            stream
                .write_all(format!("SET {} {}\n", key, value).as_bytes())
                .await?;

            let mut buf = [0u8; 1024];
            let n = stream.read(&mut buf).await?;
            let response = String::from_utf8_lossy(&buf[..n]);

            pool.release(stream).await;

            if response.contains("OK") {
                return Ok(());
            } else if response.contains("NOT_LEADER") {
                // Invalidate cached leader and retry
                *self.leader.write().await = None;
                continue;
            } else {
                return Err(io::Error::new(io::ErrorKind::Other, "Write failed"));
            }
        }
    }
}

//================================================
// Tests
//================================================

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;
    use tokio::time::sleep;

    // Milestone 1 Tests
    #[tokio::test]
    async fn test_set_and_get() {
        let store = KvStore::new();
        store.set("name".to_string(), "Alice".to_string()).await;
        let value = store.get("name").await;
        assert_eq!(value, Some("Alice".to_string()));
    }

    #[tokio::test]
    async fn test_get_nonexistent() {
        let store = KvStore::new();
        let value = store.get("missing").await;
        assert_eq!(value, None);
    }

    #[tokio::test]
    async fn test_delete() {
        let store = KvStore::new();
        store.set("temp".to_string(), "value".to_string()).await;
        assert!(store.delete("temp").await);
        assert_eq!(store.get("temp").await, None);
    }

    #[tokio::test]
    async fn test_overwrite() {
        let store = KvStore::new();
        store.set("key".to_string(), "v1".to_string()).await;
        store.set("key".to_string(), "v2".to_string()).await;
        assert_eq!(store.get("key").await, Some("v2".to_string()));
    }

    #[test]
    fn test_parse_get() {
        let cmd = parse_command("GET mykey").unwrap();
        assert!(matches!(cmd, Command::Get { key } if key == "mykey"));
    }

    #[test]
    fn test_parse_set() {
        let cmd = parse_command("SET mykey myvalue").unwrap();
        assert!(matches!(cmd, Command::Set { key, value }
            if key == "mykey" && value == "myvalue"));
    }

    #[test]
    fn test_parse_set_with_spaces() {
        let cmd = parse_command("SET mykey hello world").unwrap();
        assert!(matches!(cmd, Command::Set { key, value }
            if key == "mykey" && value == "hello world"));
    }

    // Milestone 2 Tests
    #[tokio::test]
    async fn test_wal_append_and_replay() {
        let dir = tempdir().unwrap();
        let wal_path = dir.path().join("test.wal");

        {
            let mut wal = WriteAheadLog::new(wal_path.clone()).await.unwrap();

            wal.append(&WalEntry {
                command: Command::Set {
                    key: "key1".to_string(),
                    value: "value1".to_string(),
                },
                timestamp: 1,
            })
            .await
            .unwrap();

            wal.append(&WalEntry {
                command: Command::Set {
                    key: "key2".to_string(),
                    value: "value2".to_string(),
                },
                timestamp: 2,
            })
            .await
            .unwrap();
        }

        let wal = WriteAheadLog::new(wal_path).await.unwrap();
        let entries = wal.replay().await.unwrap();

        assert_eq!(entries.len(), 2);
    }

    #[tokio::test]
    async fn test_persistence_across_restart() {
        let dir = tempdir().unwrap();
        let wal_path = dir.path().join("store.wal");

        {
            let store = KvStoreWithWal::new_with_wal(wal_path.clone())
                .await
                .unwrap();
            store
                .set_durable("name".to_string(), "Alice".to_string())
                .await
                .unwrap();
            store
                .set_durable("age".to_string(), "30".to_string())
                .await
                .unwrap();
        }

        {
            let store = KvStoreWithWal::new_with_wal(wal_path).await.unwrap();
            assert_eq!(store.get("name").await, Some("Alice".to_string()));
            assert_eq!(store.get("age").await, Some("30".to_string()));
        }
    }

    #[tokio::test]
    async fn test_delete_persistence() {
        let dir = tempdir().unwrap();
        let wal_path = dir.path().join("delete.wal");

        {
            let store = KvStoreWithWal::new_with_wal(wal_path.clone())
                .await
                .unwrap();
            store
                .set_durable("temp".to_string(), "value".to_string())
                .await
                .unwrap();
            store.delete_durable("temp").await.unwrap();
        }

        {
            let store = KvStoreWithWal::new_with_wal(wal_path).await.unwrap();
            assert_eq!(store.get("temp").await, None);
        }
    }

    // Milestone 3 Tests
    #[tokio::test]
    async fn test_replicated_store_creation() {
        let dir = tempdir().unwrap();
        let wal_path = dir.path().join("replicated.wal");
        let replicas = vec!["127.0.0.1:9401".to_string()];

        let store = ReplicatedKvStore::new_replicated(wal_path, replicas)
            .await
            .unwrap();
        assert!(store.get("nonexistent").await.is_none());
    }

    // Milestone 4 Tests
    #[tokio::test]
    async fn test_quorum_store_creation() {
        let dir = tempdir().unwrap();
        let wal_path = dir.path().join("quorum.wal");
        let replicas = vec!["127.0.0.1:9501".to_string(), "127.0.0.1:9502".to_string()];

        let store = QuorumKvStore::new_quorum(wal_path, replicas, 2)
            .await
            .unwrap();
        assert!(store.get("nonexistent").await.is_none());
    }

    // Milestone 5 Tests
    #[tokio::test]
    async fn test_raft_node_creation() {
        let peers = vec!["127.0.0.1:9601".to_string()];
        let node = RaftNode::new("node1".to_string(), peers);
        assert!(!node.is_leader().await);
    }

    #[tokio::test]
    async fn test_raft_node_election() {
        let peers = vec![];
        let node = RaftNode::new("node1".to_string(), peers);

        // With no peers, should become leader
        let elected = node.start_election().await;
        assert!(elected);
        assert!(node.is_leader().await);
    }

    // Milestone 6 Tests
    #[tokio::test]
    async fn test_connection_pool() {
        let pool = ConnectionPool::new("127.0.0.1:9999".to_string(), 5);
        // Pool operations would require a running server
        assert_eq!(pool.max_size, 5);
    }

    #[tokio::test]
    async fn test_smart_client_creation() {
        let servers = vec!["127.0.0.1:9701".to_string()];
        let client = SmartKvClient::new(servers.clone());
        assert_eq!(client.replicas.len(), 1);
    }
}

#[tokio::main]
async fn main() {
    println!("Distributed Key-Value Store - All Milestones");
    println!("=============================================");
    println!("Run with `cargo test --bin complete_26_network_kv_store` to test all milestones");
    println!("\nMilestones implemented:");
    println!("  1. In-Memory KV Store (TCP Protocol)");
    println!("  2. Persistence with Write-Ahead Log (WAL)");
    println!("  3. Async Replication (Master-Replica)");
    println!("  4. Synchronous Replication with Quorum Writes");
    println!("  5. Leader Election (Simplified Raft)");
    println!("  6. Client Connection Pool and Smart Routing");

    if let Err(e) = run_kv_server("127.0.0.1:6379").await {
        eprintln!("Server error: {}", e);
    }
}