Machine Coding Systems — Complete Practice Reference

Multi-level machine coding drills with resilient, concurrent, and pattern-driven solutions
Loading practice statistics…
Machine Coding - Parking Lot (Levels 1-3)
parking platform - strategy - resiliency
Scope: evolve single-floor lot into resilient campusThemes: strategy, concurrency, circuit breaker

Incrementally build a parking lot platform: begin with a clean single-floor design, extend to multi-entry concurrent operations, and finally wrap it with resiliency patterns that protect external integrations.

Parking Campus
 ├─ Level 1: Floor F0 → Slots S1..Sn
 ├─ Level 2: Entry Dispatchers → Floor Locks → Assignments
 └─ Level 3: Entry Service → CircuitBreaker(Payment) → Ticketing Fallback

Level 1 — Core Design

Level 1 lays the foundations by modelling spots, vehicles, and allocation requests as explicit dataclasses so the domain vocabulary is clear. We compose a repository abstraction, a guard-chain for validations, and a selector strategy before a single state mutation happens, which keeps business rules honest. Finishing with EventBus notifications proves the baseline service can already integrate outward and sets up later extensions without rewiring the core.

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Dict, Iterable, Optional, Protocol


class SpotRepository(Protocol):
    def available(self, *, level: Optional[int] = None) -> Iterable["Spot"]:
        ...

    def get(self, spot_id: str) -> Optional["Spot"]:
        ...

    def update(self, spot_id: str, spot: "Spot") -> None:
        ...


@dataclass
class Spot:
    spot_id: str
    level: int
    size: str
    state: str = "VACANT"
    vehicle_id: Optional[str] = None


@dataclass
class AllocationRequest:
    vehicle_id: str
    size: str
    duration_hours: int
    preferred_level: Optional[int] = None


@dataclass
class AllocationReceipt:
    spot_id: str
    fee: float


class EventBus:
    def __init__(self) -> None:
        self._subscribers: Dict[str, list[Callable[[dict], None]]] = {}

    def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
        self._subscribers.setdefault(event, []).append(handler)

    def publish(self, event: str, payload: dict) -> None:
        for handler in self._subscribers.get(event, []):
            handler(payload)


class BaseGuard:
    def __init__(self) -> None:
        self._next: Optional["BaseGuard"] = None

    def set_next(self, nxt: "BaseGuard") -> "BaseGuard":
        self._next = nxt
        return nxt

    def check(self, context: dict) -> Optional[str]:
        return None

    def handle(self, context: dict) -> Optional[str]:
        failure = self.check(context)
        if failure or not self._next:
            return failure
        return self._next.handle(context)


class VehicleSizeGuard(BaseGuard):
    ORDER = {"compact": 0, "regular": 1, "large": 2}

    def check(self, context: dict) -> Optional[str]:
        request: AllocationRequest = context["request"]
        spot: Spot = context["spot"]
        try:
            if self.ORDER[spot.size] < self.ORDER[request.size]:
                return "SIZE_MISMATCH"
        except KeyError:
            return "UNKNOWN_SIZE"
        return None


class SpotAvailabilityGuard(BaseGuard):
    def check(self, context: dict) -> Optional[str]:
        spot: Spot = context["spot"]
        if spot.state != "VACANT":
            return "TAKEN"
        return None


class SpotSelector(Protocol):
    def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
        ...


class NearestLevelSelector:
    def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
        preferred_level = request.preferred_level
        best: Optional[Spot] = None
        best_distance = float("inf")
        for spot in candidates:
            target = preferred_level if preferred_level is not None else spot.level
            distance = abs(target - spot.level)
            if distance < best_distance:
                best = spot
                best_distance = distance
        return best


class PricingStrategy(Protocol):
    def compute(self, request: AllocationRequest) -> float:
        ...


class FlatRatePricing(PricingStrategy):
    RATES = {"compact": 5, "regular": 8, "large": 12}

    def compute(self, request: AllocationRequest) -> float:
        return self.RATES[request.size] * max(1, request.duration_hours)


class SpotLifecycle:
    def __init__(self, spot: Spot) -> None:
        self.spot = spot

    def reserve(self) -> bool:
        if self.spot.state != "VACANT":
            return False
        self.spot.state = "HELD"
        return True

    def occupy(self, vehicle_id: str) -> bool:
        if self.spot.state != "HELD":
            return False
        self.spot.state = "OCCUPIED"
        self.spot.vehicle_id = vehicle_id
        return True

    def vacate(self) -> bool:
        if self.spot.state != "OCCUPIED":
            return False
        self.spot.state = "VACANT"
        self.spot.vehicle_id = None
        return True


class InMemorySpotRepository(SpotRepository):
    def __init__(self, spots: Iterable[Spot]):
        self._spots: Dict[str, Spot] = {spot.spot_id: spot for spot in spots}

    def available(self, *, level: Optional[int] = None) -> Iterable[Spot]:
        return [
            spot
            for spot in self._spots.values()
            if spot.state == "VACANT" and (level is None or spot.level == level)
        ]

    def get(self, spot_id: str) -> Optional[Spot]:
        return self._spots.get(spot_id)

    def update(self, spot_id: str, spot: Spot) -> None:
        self._spots[spot_id] = spot

    def all(self) -> Iterable[Spot]:
        return self._spots.values()


class ParkingLotService:
    def __init__(self, repo: SpotRepository, selector: SpotSelector, pricing: PricingStrategy, bus: EventBus) -> None:
        self.repo = repo
        self.selector = selector
        self.pricing = pricing
        self.bus = bus
        self.guard = VehicleSizeGuard()
        self.guard.set_next(SpotAvailabilityGuard())

    def park(self, request: AllocationRequest) -> Optional[AllocationReceipt]:
        candidates = self.repo.available(level=request.preferred_level)
        spot = self.selector.choose(request, candidates)
        if not spot:
            return None
        failure = self.guard.handle({"request": request, "spot": spot})
        if failure:
            return None
        lifecycle = SpotLifecycle(spot)
        if not lifecycle.reserve() or not lifecycle.occupy(request.vehicle_id):
            return None
        self.repo.update(spot.spot_id, spot)
        fee = self.pricing.compute(request)
        receipt = AllocationReceipt(spot_id=spot.spot_id, fee=fee)
        self.bus.publish("parking.allocated", {"vehicle": request.vehicle_id, "spot": spot.spot_id, "fee": fee})
        return receipt

    def release(self, spot_id: str) -> bool:
        spot = self.repo.get(spot_id)
        if not spot:
            return False
        lifecycle = SpotLifecycle(spot)
        if not lifecycle.vacate():
            return False
        self.repo.update(spot.spot_id, spot)
        self.bus.publish("parking.released", {"spot": spot.spot_id})
        return True


class StdoutBus(EventBus):
    def publish(self, event: str, payload: dict) -> None:
        super().publish(event, payload)
        print(f"[event] {event}: {payload}")


def main() -> None:
    repo = InMemorySpotRepository(
        [
            Spot("C1", level=1, size="compact"),
            Spot("R1", level=1, size="regular"),
            Spot("L2", level=2, size="large"),
        ]
    )
    service = ParkingLotService(repo, NearestLevelSelector(), FlatRatePricing(), StdoutBus())

    request = AllocationRequest(vehicle_id="CAR-42", size="regular", duration_hours=3, preferred_level=1)
    receipt = service.park(request)
    print("Receipt:", receipt)
    if receipt:
        service.release(receipt.spot_id)
    print("Inventory:", [(spot.spot_id, spot.state, spot.vehicle_id) for spot in repo.all()])


if __name__ == "__main__":
    main()

Level 2 — Event-Driven Campus

Level 2 grows the design into a campus by inserting entry dispatchers that publish intents onto an EventBus instead of grabbing locks directly. We add command objects and floor-specific locks so each doorway can enqueue work without colliding, while a round-robin selector keeps allocation fair across levels. Release commands and reconciliation handlers make recovery explicit so missed acknowledgements do not strand occupied spots.

from __future__ import annotations

from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Optional, Protocol, Tuple


class EventBus:
    def __init__(self) -> None:
        self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
        self._queue: Deque[Tuple[str, dict]] = deque()

    def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
        self._subscribers[event].append(handler)

    def publish(self, event: str, payload: dict) -> None:
        self._queue.append((event, payload))

    def pump(self) -> None:
        while self._queue:
            event, payload = self._queue.popleft()
            for handler in list(self._subscribers.get(event, [])):
                handler(payload)


class Command(Protocol):
    def execute(self) -> None:
        ...


class PublishCommand:
    def __init__(self, bus: EventBus, event: str, payload: dict) -> None:
        self.bus = bus
        self.event = event
        self.payload = payload

    def execute(self) -> None:
        self.bus.publish(self.event, self.payload)


class SpotRepository(Protocol):
    def available(self, *, level: Optional[int] = None) -> Iterable["Spot"]:
        ...

    def get(self, spot_id: str) -> Optional["Spot"]:
        ...

    def update(self, spot_id: str, spot: "Spot") -> None:
        ...

    def levels(self) -> Iterable[int]:
        ...


@dataclass
class Spot:
    spot_id: str
    level: int
    size: str
    state: str = "VACANT"
    vehicle_id: Optional[str] = None


@dataclass
class AllocationRequest:
    vehicle_id: str
    size: str
    duration_hours: int
    preferred_level: Optional[int] = None


@dataclass
class AllocationReceipt:
    spot_id: str
    fee: float


class BaseGuard:
    def __init__(self) -> None:
        self._next: Optional["BaseGuard"] = None

    def set_next(self, nxt: "BaseGuard") -> "BaseGuard":
        self._next = nxt
        return nxt

    def check(self, context: dict) -> Optional[str]:
        return None

    def handle(self, context: dict) -> Optional[str]:
        failure = self.check(context)
        if failure or not self._next:
            return failure
        return self._next.handle(context)


class VehicleSizeGuard(BaseGuard):
    ORDER = {"compact": 0, "regular": 1, "large": 2}

    def check(self, context: dict) -> Optional[str]:
        request: AllocationRequest = context["request"]
        spot: Spot = context["spot"]
        try:
            if self.ORDER[spot.size] < self.ORDER[request.size]:
                return "SIZE_MISMATCH"
        except KeyError:
            return "UNKNOWN_SIZE"
        return None


class SpotAvailabilityGuard(BaseGuard):
    def check(self, context: dict) -> Optional[str]:
        spot: Spot = context["spot"]
        if spot.state != "VACANT":
            return "TAKEN"
        return None


class SpotLifecycle:
    def __init__(self, spot: Spot) -> None:
        self.spot = spot

    def reserve(self) -> bool:
        if self.spot.state != "VACANT":
            return False
        self.spot.state = "HELD"
        return True

    def occupy(self, vehicle_id: str) -> bool:
        if self.spot.state != "HELD":
            return False
        self.spot.state = "OCCUPIED"
        self.spot.vehicle_id = vehicle_id
        return True

    def vacate(self) -> bool:
        if self.spot.state != "OCCUPIED":
            return False
        self.spot.state = "VACANT"
        self.spot.vehicle_id = None
        return True


class InMemorySpotRepository(SpotRepository):
    def __init__(self, spots: Iterable[Spot]):
        self._spots: Dict[str, Spot] = {spot.spot_id: spot for spot in spots}

    def available(self, *, level: Optional[int] = None) -> Iterable[Spot]:
        return [
            spot
            for spot in self._spots.values()
            if spot.state == "VACANT" and (level is None or spot.level == level)
        ]

    def get(self, spot_id: str) -> Optional[Spot]:
        return self._spots.get(spot_id)

    def update(self, spot_id: str, spot: Spot) -> None:
        self._spots[spot_id] = spot

    def levels(self) -> Iterable[int]:
        return sorted({spot.level for spot in self._spots.values()})


class SpotSelector(Protocol):
    def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
        ...


class RoundRobinLevelSelector:
    def __init__(self, levels: Iterable[int]):
        self.order = list(levels)
        self._cursor = 0

    def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
        pools: DefaultDict[int, List[Spot]] = defaultdict(list)
        for spot in candidates:
            pools[spot.level].append(spot)
        if not pools:
            return None
        if not self.order:
            self.order = sorted(pools.keys())
        for _ in range(len(self.order)):
            level = self.order[self._cursor % len(self.order)]
            self._cursor += 1
            bucket = pools.get(level)
            if bucket:
                return bucket[0]
        fallback_origin = request.preferred_level if request.preferred_level is not None else self.order[0]
        fallback = sorted(
            (spot for level_spots in pools.values() for spot in level_spots),
            key=lambda s: abs(fallback_origin - s.level),
        )
        return fallback[0] if fallback else None


class PricingStrategy(Protocol):
    def compute(self, request: AllocationRequest) -> float:
        ...


class FlatRatePricing(PricingStrategy):
    RATES = {"compact": 5, "regular": 8, "large": 12}

    def compute(self, request: AllocationRequest) -> float:
        return self.RATES[request.size] * max(1, request.duration_hours)


class ParkingCampusService:
    def __init__(self, repo: SpotRepository, selector: SpotSelector, pricing: PricingStrategy, bus: EventBus):
        self.repo = repo
        self.selector = selector
        self.pricing = pricing
        self.bus = bus
        self.guard = VehicleSizeGuard()
        self.guard.set_next(SpotAvailabilityGuard())
        bus.subscribe("parking.requested", self._handle_request)
        bus.subscribe("parking.checkout", self._handle_checkout)

    def _handle_request(self, payload: dict) -> None:
        request: AllocationRequest = payload["request"]
        candidates = list(self.repo.available(level=request.preferred_level))
        spot = self.selector.choose(request, candidates)
        if not spot:
            self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "NO_SPOT"})
            return
        failure = self.guard.handle({"request": request, "spot": spot})
        if failure:
            self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": failure})
            return
        lifecycle = SpotLifecycle(spot)
        if not lifecycle.reserve() or not lifecycle.occupy(request.vehicle_id):
            self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "STATE"})
            return
        self.repo.update(spot.spot_id, spot)
        fee = self.pricing.compute(request)
        receipt = AllocationReceipt(spot_id=spot.spot_id, fee=fee)
        self.bus.publish(
            "parking.allocated",
            {"vehicle": request.vehicle_id, "spot": spot.spot_id, "fee": fee, "gate": payload.get("gate"), "receipt": receipt},
        )

    def _handle_checkout(self, payload: dict) -> None:
        spot = self.repo.get(payload["spot_id"])
        if not spot:
            return
        lifecycle = SpotLifecycle(spot)
        if lifecycle.vacate():
            self.repo.update(spot.spot_id, spot)
            self.bus.publish("parking.released", {"spot": spot.spot_id})


class EntryGate:
    def __init__(self, gate_id: str, bus: EventBus) -> None:
        self.gate_id = gate_id
        self.bus = bus

    def enqueue(self, request: AllocationRequest) -> Command:
        payload = {"request": request, "gate": self.gate_id}
        return PublishCommand(self.bus, "parking.requested", payload)


class ReleaseCommand(Command):
    def __init__(self, bus: EventBus, spot_id: str):
        self.bus = bus
        self.spot_id = spot_id

    def execute(self) -> None:
        self.bus.publish("parking.checkout", {"spot_id": self.spot_id})


class AllocationProjector:
    def __init__(self, bus: EventBus) -> None:
        self.assignments: Dict[str, str] = {}
        self.rejections: List[Tuple[str, str]] = []
        bus.subscribe("parking.allocated", self._on_allocated)
        bus.subscribe("parking.rejected", self._on_rejected)
        bus.subscribe("parking.released", self._on_released)

    def _on_allocated(self, payload: dict) -> None:
        self.assignments[payload["vehicle"]] = payload["spot"]

    def _on_rejected(self, payload: dict) -> None:
        self.rejections.append((payload["vehicle"], payload["reason"]))

    def _on_released(self, payload: dict) -> None:
        released_spot = payload["spot"]
        for vehicle, spot in list(self.assignments.items()):
            if spot == released_spot:
                del self.assignments[vehicle]
                break


def main() -> None:
    spots = [
        Spot("C1", level=0, size="compact"),
        Spot("C2", level=0, size="compact"),
        Spot("R1", level=1, size="regular"),
        Spot("R2", level=1, size="regular"),
        Spot("L1", level=2, size="large"),
    ]
    repo = InMemorySpotRepository(spots)
    bus = EventBus()
    selector = RoundRobinLevelSelector(repo.levels())
    campus = ParkingCampusService(repo, selector, FlatRatePricing(), bus)
    projector = AllocationProjector(bus)
    gates = [EntryGate("G1", bus), EntryGate("G2", bus), EntryGate("G3", bus)]

    requests = [
        AllocationRequest(vehicle_id="CAR-1", size="regular", duration_hours=2, preferred_level=1),
        AllocationRequest(vehicle_id="CAR-2", size="regular", duration_hours=1),
        AllocationRequest(vehicle_id="BIKE-3", size="compact", duration_hours=3, preferred_level=0),
        AllocationRequest(vehicle_id="SUV-4", size="large", duration_hours=4, preferred_level=2),
    ]
    for gate, request in zip(gates * 2, requests):
        gate.enqueue(request).execute()

    bus.pump()

    allocated = list(projector.assignments.items())
    if allocated:
        _, spot_id = allocated[0]
        ReleaseCommand(bus, spot_id).execute()
        bus.pump()

    print("Assignments:", projector.assignments)
    print("Rejections:", projector.rejections)


if __name__ == "__main__":
    main()

Level 3 — Resilient Orchestration

Level 3 stress-tests the system by assuming external payments and ticketing can fail, so the dispatcher is wrapped in a saga coordinator that tracks every side effect. We guard external calls with circuit breakers and retry policies, and we queue compensating commands that can unwind reservations if a downstream dependency stays unhealthy. This level highlights how the event log plus explicit state machines let us recover deterministically even when part of the campus is degraded.

from __future__ import annotations

import random
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Optional, Protocol, Tuple


class EventBus:
    def __init__(self) -> None:
        self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
        self._queue: Deque[Tuple[str, dict]] = deque()

    def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
        self._subscribers[event].append(handler)

    def publish(self, event: str, payload: dict) -> None:
        self._queue.append((event, payload))

    def pump(self) -> None:
        while self._queue:
            event, payload = self._queue.popleft()
            for handler in list(self._subscribers.get(event, [])):
                handler(payload)


class Command(Protocol):
    def execute(self) -> None:
        ...


class PublishCommand:
    def __init__(self, bus: EventBus, event: str, payload: dict) -> None:
        self.bus = bus
        self.event = event
        self.payload = payload

    def execute(self) -> None:
        self.bus.publish(self.event, self.payload)


class ReleaseCommand(Command):
    def __init__(self, bus: EventBus, spot_id: str):
        self.bus = bus
        self.spot_id = spot_id

    def execute(self) -> None:
        self.bus.publish("parking.checkout", {"spot_id": self.spot_id})


class SpotRepository(Protocol):
    def available(self, *, level: Optional[int] = None) -> Iterable["Spot"]:
        ...

    def get(self, spot_id: str) -> Optional["Spot"]:
        ...

    def update(self, spot_id: str, spot: "Spot") -> None:
        ...

    def levels(self) -> Iterable[int]:
        ...


@dataclass
class Spot:
    spot_id: str
    level: int
    size: str
    state: str = "VACANT"
    vehicle_id: Optional[str] = None


@dataclass
class AllocationRequest:
    vehicle_id: str
    size: str
    duration_hours: int
    preferred_level: Optional[int] = None


@dataclass
class AllocationReceipt:
    spot_id: str
    fee: float


class BaseGuard:
    def __init__(self) -> None:
        self._next: Optional["BaseGuard"] = None

    def set_next(self, nxt: "BaseGuard") -> "BaseGuard":
        self._next = nxt
        return nxt

    def check(self, context: dict) -> Optional[str]:
        return None

    def handle(self, context: dict) -> Optional[str]:
        failure = self.check(context)
        if failure or not self._next:
            return failure
        return self._next.handle(context)


class VehicleSizeGuard(BaseGuard):
    ORDER = {"compact": 0, "regular": 1, "large": 2}

    def check(self, context: dict) -> Optional[str]:
        request: AllocationRequest = context["request"]
        spot: Spot = context["spot"]
        try:
            if self.ORDER[spot.size] < self.ORDER[request.size]:
                return "SIZE_MISMATCH"
        except KeyError:
            return "UNKNOWN_SIZE"
        return None


class SpotAvailabilityGuard(BaseGuard):
    def check(self, context: dict) -> Optional[str]:
        spot: Spot = context["spot"]
        if spot.state != "VACANT":
            return "TAKEN"
        return None


class SpotLifecycle:
    def __init__(self, spot: Spot) -> None:
        self.spot = spot

    def reserve(self) -> bool:
        if self.spot.state != "VACANT":
            return False
        self.spot.state = "HELD"
        return True

    def occupy(self, vehicle_id: str) -> bool:
        if self.spot.state != "HELD":
            return False
        self.spot.state = "OCCUPIED"
        self.spot.vehicle_id = vehicle_id
        return True

    def vacate(self) -> bool:
        if self.spot.state != "OCCUPIED":
            return False
        self.spot.state = "VACANT"
        self.spot.vehicle_id = None
        return True


class InMemorySpotRepository(SpotRepository):
    def __init__(self, spots: Iterable[Spot]):
        self._spots: Dict[str, Spot] = {spot.spot_id: spot for spot in spots}

    def available(self, *, level: Optional[int] = None) -> Iterable[Spot]:
        return [
            spot
            for spot in self._spots.values()
            if spot.state == "VACANT" and (level is None or spot.level == level)
        ]

    def get(self, spot_id: str) -> Optional[Spot]:
        return self._spots.get(spot_id)

    def update(self, spot_id: str, spot: Spot) -> None:
        self._spots[spot_id] = spot

    def levels(self) -> Iterable[int]:
        return sorted({spot.level for spot in self._spots.values()})


class SpotSelector(Protocol):
    def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
        ...


class RoundRobinLevelSelector:
    def __init__(self, levels: Iterable[int]):
        self.order = list(levels)
        self._cursor = 0

    def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
        pools: DefaultDict[int, List[Spot]] = defaultdict(list)
        for spot in candidates:
            pools[spot.level].append(spot)
        if not pools:
            return None
        if not self.order:
            self.order = sorted(pools.keys())
        for _ in range(len(self.order)):
            level = self.order[self._cursor % len(self.order)]
            self._cursor += 1
            bucket = pools.get(level)
            if bucket:
                return bucket[0]
        fallback_origin = request.preferred_level if request.preferred_level is not None else self.order[0]
        fallback = sorted(
            (spot for level_spots in pools.values() for spot in level_spots),
            key=lambda s: abs(fallback_origin - s.level),
        )
        return fallback[0] if fallback else None


class PricingStrategy(Protocol):
    def compute(self, request: AllocationRequest) -> float:
        ...


class FlatRatePricing(PricingStrategy):
    RATES = {"compact": 5, "regular": 8, "large": 12}

    def compute(self, request: AllocationRequest) -> float:
        return self.RATES[request.size] * max(1, request.duration_hours)


class ParkingCampusService:
    def __init__(self, repo: SpotRepository, selector: SpotSelector, pricing: PricingStrategy, bus: EventBus):
        self.repo = repo
        self.selector = selector
        self.pricing = pricing
        self.bus = bus
        self.guard = VehicleSizeGuard()
        self.guard.set_next(SpotAvailabilityGuard())
        bus.subscribe("parking.requested", self._handle_request)
        bus.subscribe("parking.checkout", self._handle_checkout)

    def _handle_request(self, payload: dict) -> None:
        request: AllocationRequest = payload["request"]
        candidates = list(self.repo.available(level=request.preferred_level))
        spot = self.selector.choose(request, candidates)
        if not spot:
            self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "NO_SPOT"})
            return
        failure = self.guard.handle({"request": request, "spot": spot})
        if failure:
            self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": failure})
            return
        lifecycle = SpotLifecycle(spot)
        if not lifecycle.reserve() or not lifecycle.occupy(request.vehicle_id):
            self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "STATE"})
            return
        self.repo.update(spot.spot_id, spot)
        fee = self.pricing.compute(request)
        receipt = AllocationReceipt(spot_id=spot.spot_id, fee=fee)
        self.bus.publish(
            "parking.allocated",
            {"vehicle": request.vehicle_id, "spot": spot.spot_id, "fee": fee, "gate": payload.get("gate"), "receipt": receipt},
        )

    def _handle_checkout(self, payload: dict) -> None:
        spot = self.repo.get(payload["spot_id"])
        if not spot:
            return
        lifecycle = SpotLifecycle(spot)
        if lifecycle.vacate():
            self.repo.update(spot.spot_id, spot)
            self.bus.publish("parking.released", {"spot": spot.spot_id})


class PaymentGateway:
    def __init__(self, failure_rate: float = 0.35):
        self.failure_rate = failure_rate

    def charge(self, vehicle_id: str, amount: float) -> str:
        if random.random() < self.failure_rate:
            raise RuntimeError("payment timeout")
        return f"receipt::{vehicle_id}::{int(time.time() * 1000)}::{amount:.2f}"


class CircuitBreaker:
    def __init__(self, failure_threshold: int, recovery_timeout: float):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.state = "CLOSED"

    def call(self, func: Callable[[], str]) -> str:
        now = time.time()
        if self.state == "OPEN":
            if now - self.last_failure_time < self.recovery_timeout:
                raise RuntimeError("Circuit open")
            self.state = "HALF_OPEN"
        try:
            result = func()
        except Exception:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
            raise
        else:
            self.failure_count = 0
            self.state = "CLOSED"
            return result


class RetryPolicy:
    def __init__(self, attempts: int, backoff: float):
        self.attempts = attempts
        self.backoff = backoff

    def call(self, func: Callable[[], str]) -> str:
        last_error: Optional[Exception] = None
        for attempt in range(self.attempts):
            try:
                return func()
            except Exception as exc:
                last_error = exc
                time.sleep(self.backoff * (attempt + 1))
        if last_error:
            raise last_error
        raise RuntimeError("retry exhausted")


class PaymentProcessor:
    def __init__(self, bus: EventBus, gateway: PaymentGateway, breaker: CircuitBreaker, retry: RetryPolicy):
        self.bus = bus
        self.gateway = gateway
        self.breaker = breaker
        self.retry = retry
        bus.subscribe("parking.allocated", self._handle_allocated)

    def _handle_allocated(self, payload: dict) -> None:
        vehicle = payload["vehicle"]
        spot_id = payload["spot"]
        amount = payload["fee"]

        def attempt() -> str:
            return self.breaker.call(lambda: self.gateway.charge(vehicle, amount))

        try:
            receipt = self.retry.call(attempt)
            self.bus.publish("parking.confirmed", {"vehicle": vehicle, "spot": spot_id, "receipt": receipt})
        except Exception as exc:
            self.bus.publish("parking.payment_failed", {"vehicle": vehicle, "spot": spot_id, "reason": str(exc)})


class ManualTicketCommand(Command):
    def __init__(self, vehicle_id: str, spot_id: str):
        self.vehicle_id = vehicle_id
        self.spot_id = spot_id
        self.ticket: Optional[str] = None

    def execute(self) -> None:
        self.ticket = f"manual-ticket::{self.vehicle_id}@{self.spot_id}"


class ParkingSaga:
    def __init__(self, bus: EventBus):
        self.bus = bus
        bus.subscribe("parking.payment_failed", self._compensate)

    def _compensate(self, payload: dict) -> None:
        vehicle = payload["vehicle"]
        spot_id = payload["spot"]
        ticket_cmd = ManualTicketCommand(vehicle, spot_id)
        ticket_cmd.execute()
        ReleaseCommand(self.bus, spot_id).execute()
        self.bus.publish(
            "parking.fallback",
            {"vehicle": vehicle, "spot": spot_id, "ticket": ticket_cmd.ticket, "reason": payload["reason"]},
        )


class EntryGate:
    def __init__(self, gate_id: str, bus: EventBus) -> None:
        self.gate_id = gate_id
        self.bus = bus

    def enqueue(self, request: AllocationRequest) -> Command:
        payload = {"request": request, "gate": self.gate_id}
        return PublishCommand(self.bus, "parking.requested", payload)


class AllocationProjection:
    def __init__(self, bus: EventBus) -> None:
        self.pending: Dict[str, str] = {}
        self.confirmed: Dict[str, str] = {}
        self.fallbacks: Dict[str, str] = {}
        self.rejections: List[Tuple[str, str]] = []
        bus.subscribe("parking.allocated", self._on_allocated)
        bus.subscribe("parking.confirmed", self._on_confirmed)
        bus.subscribe("parking.fallback", self._on_fallback)
        bus.subscribe("parking.released", self._on_released)
        bus.subscribe("parking.rejected", self._on_rejected)

    def _on_allocated(self, payload: dict) -> None:
        self.pending[payload["vehicle"]] = payload["spot"]

    def _on_confirmed(self, payload: dict) -> None:
        vehicle = payload["vehicle"]
        spot = payload["spot"]
        self.confirmed[vehicle] = spot
        self.pending.pop(vehicle, None)

    def _on_fallback(self, payload: dict) -> None:
        vehicle = payload["vehicle"]
        self.fallbacks[vehicle] = payload["ticket"]
        self.pending.pop(vehicle, None)

    def _on_released(self, payload: dict) -> None:
        released_spot = payload["spot"]
        for bucket in (self.pending, self.confirmed):
            for vehicle, spot in list(bucket.items()):
                if spot == released_spot:
                    del bucket[vehicle]

    def _on_rejected(self, payload: dict) -> None:
        self.rejections.append((payload["vehicle"], payload["reason"]))


class ParkingMetrics:
    def __init__(self, bus: EventBus) -> None:
        self.snapshot: Dict[str, int] = {"confirmed": 0, "fallback": 0, "rejected": 0}
        bus.subscribe("parking.confirmed", self._on_confirmed)
        bus.subscribe("parking.fallback", self._on_fallback)
        bus.subscribe("parking.rejected", self._on_rejected)

    def _on_confirmed(self, _: dict) -> None:
        self.snapshot["confirmed"] += 1

    def _on_fallback(self, _: dict) -> None:
        self.snapshot["fallback"] += 1

    def _on_rejected(self, _: dict) -> None:
        self.snapshot["rejected"] += 1


def main() -> None:
    random.seed(42)
    spots = [
        Spot("C1", level=0, size="compact"),
        Spot("R1", level=1, size="regular"),
        Spot("R2", level=1, size="regular"),
        Spot("L1", level=2, size="large"),
        Spot("L2", level=2, size="large"),
    ]
    repo = InMemorySpotRepository(spots)
    bus = EventBus()
    selector = RoundRobinLevelSelector(repo.levels())
    pricing = FlatRatePricing()
    ParkingCampusService(repo, selector, pricing, bus)
    PaymentProcessor(bus, PaymentGateway(failure_rate=0.4), CircuitBreaker(3, 1.5), RetryPolicy(3, 0.05))
    ParkingSaga(bus)
    projection = AllocationProjection(bus)
    metrics = ParkingMetrics(bus)
    gates = [EntryGate("Gate-A", bus), EntryGate("Gate-B", bus)]

    requests = [
        AllocationRequest("CAR-101", "regular", 2, preferred_level=1),
        AllocationRequest("SUV-202", "large", 4, preferred_level=2),
        AllocationRequest("BIKE-303", "compact", 1, preferred_level=0),
        AllocationRequest("CAR-404", "regular", 3),
        AllocationRequest("SUV-505", "large", 5, preferred_level=2),
        AllocationRequest("TRUCK-606", "large", 2),
    ]

    for gate, request in zip(gates * 3, requests):
        gate.enqueue(request).execute()

    bus.pump()

    confirmed_snapshot = dict(projection.confirmed)
    for _, spot in list(projection.confirmed.items())[:1]:
        ReleaseCommand(bus, spot).execute()
    bus.pump()

    print("Confirmed:", confirmed_snapshot)
    print("Fallback:", projection.fallbacks)
    print("Pending:", projection.pending)
    print("Rejections:", projection.rejections)
    print("Metrics:", metrics.snapshot)


if __name__ == "__main__":
    main()
Machine Coding - Food Delivery (Levels 1-3)
food delivery - observers - saga
Scope: evolve ordering to concurrent resilient fulfillmentThemes: observer, queues, saga

Incrementally deliver a food ordering platform: start with observer-driven notifications, scale to prioritized concurrent dispatch, and finally orchestrate resilient fulfillment with sagas and circuit breakers.

Food Delivery Platform
 ├─ Level 1: OrderService → Observers
 ├─ Level 2: Priority Queue Dispatchers → Partner Utilization
 └─ Level 3: Saga Orchestrator → Payment Breaker → Courier Assignment

Level 1 — Core Ordering Flow

The introductory food-delivery level focuses on getting the happy path crisp: capture orders in a repository abstraction and broadcast updates through observer callbacks. We track status transitions such as PLACED and PICKED explicitly, which keeps restaurant and courier notifications loosely coupled. Because dependencies sit behind protocols the core service stays easy to test and primes us for scheduling upgrades later.

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Protocol

class DeliveryObserver(Protocol):
    def notify(self, order_id: str, details: str) -> None:
        ...

@dataclass
class DeliveryPartner:
    partner_id: str
    notifications: List[str] = field(default_factory=list)

    def notify(self, order_id: str, details: str) -> None:
        message = f"Order {order_id}: {details}"
        self.notifications.append(message)
        print(f"Notify {self.partner_id}: {message}")

@dataclass
class Order:
    order_id: str
    restaurant: str
    status: str = "PLACED"

class OrderRepository:
    def __init__(self):
        self.orders: Dict[str, Order] = {}

    def save(self, order: Order) -> None:
        self.orders[order.order_id] = order

    def update_status(self, order_id: str, status: str) -> None:
        self.orders[order_id].status = status

class OrderService:
    def __init__(self, repository: OrderRepository):
        self.repo = repository
        self.observers: List[DeliveryObserver] = []

    def register_partner(self, partner: DeliveryObserver) -> None:
        self.observers.append(partner)

    def place_order(self, order_id: str, restaurant: str) -> Order:
        order = Order(order_id, restaurant)
        self.repo.save(order)
        for observer in self.observers:
            observer.notify(order_id, f"from {restaurant} awaiting pickup")
        return order

    def mark_picked(self, order_id: str) -> None:
        self.repo.update_status(order_id, "PICKED")
        for observer in self.observers:
            observer.notify(order_id, "picked up")

if __name__ == "__main__":
    repo = OrderRepository()
    service = OrderService(repo)
    p1, p2 = DeliveryPartner("DP1"), DeliveryPartner("DP2")
    service.register_partner(p1)
    service.register_partner(p2)
    service.place_order("O1", "Spicy Bites")
    service.mark_picked("O1")

Level 2 — Concurrent Dispatch

Level 2 keeps the same order service but introduces a dispatcher that accepts prioritized jobs via a heap so urgent tickets pre-empt routine ones. We let worker threads pull from the queue while a balancing strategy rotates through available partners, demonstrating how to add concurrency without violating encapsulation. Hold timers, status tracking, and thread-safe registries make sure assignment and acknowledgement race conditions are surfaced during practice.

from __future__ import annotations

import heapq
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Optional, Protocol

class DeliveryObserver(Protocol):
    def notify(self, order_id: str, details: str) -> None:
        ...

@dataclass
class DeliveryPartner:
    partner_id: str
    notifications: List[str] = field(default_factory=list)

    def notify(self, order_id: str, details: str) -> None:
        message = f"Order {order_id}: {details}"
        self.notifications.append(message)
        print(f"Notify {self.partner_id}: {message}")

@dataclass
class Order:
    order_id: str
    restaurant: str
    priority: int = 1
    status: str = "PLACED"

@dataclass(order=True)
class DispatchItem:
    sort_key: int
    created_at: float
    order: Order = field(compare=False)

class OrderRepository:
    def __init__(self) -> None:
        self.orders: Dict[str, Order] = {}

    def save(self, order: Order) -> None:
        self.orders[order.order_id] = order

    def update_status(self, order_id: str, status: str) -> None:
        self.orders[order_id].status = status

    def get(self, order_id: str) -> Order:
        return self.orders[order_id]

class OrderService:
    def __init__(self, repository: OrderRepository) -> None:
        self.repo = repository
        self.observers: List[DeliveryObserver] = []
        self.dispatcher: Optional["PriorityDispatchCoordinator"] = None

    def register_partner(self, partner: DeliveryObserver) -> None:
        self.observers.append(partner)

    def attach_dispatcher(self, dispatcher: "PriorityDispatchCoordinator") -> None:
        self.dispatcher = dispatcher

    def place_order(self, order_id: str, restaurant: str, priority: int = 1) -> Order:
        order = Order(order_id, restaurant, priority)
        self.repo.save(order)
        for observer in self.observers:
            observer.notify(order_id, f"from {restaurant} awaiting pickup")
        if self.dispatcher:
            self.dispatcher.enqueue(order)
        return order

    def mark_picked(self, order_id: str) -> None:
        self.repo.update_status(order_id, "PICKED")
        for observer in self.observers:
            observer.notify(order_id, "picked up")
        if self.dispatcher:
            self.dispatcher.complete(order_id)

    def cancel_order(self, order_id: str, reason: str = "CANCELLED") -> None:
        self.repo.update_status(order_id, reason)
        for observer in self.observers:
            observer.notify(order_id, f"cancelled: {reason}")
        if self.dispatcher:
            self.dispatcher.complete(order_id)

class PriorityDispatchCoordinator:
    def __init__(self, service: OrderService, partners: Dict[str, DeliveryPartner], worker_count: int = 3):
        self.service = service
        self.partners = partners
        self.partner_load: Dict[str, int] = {pid: 0 for pid in partners}
        self.queue: List[DispatchItem] = []
        self.queue_cond = threading.Condition()
        self.load_lock = threading.Lock()
        self.assignments: Dict[str, str] = {}
        self.running = True
        self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(worker_count)]
        for worker in self.workers:
            worker.start()

    def enqueue(self, order: Order) -> None:
        with self.queue_cond:
            sort_key = -order.priority
            heapq.heappush(self.queue, DispatchItem(sort_key, time.time(), order))
            self.queue_cond.notify()

    def _pick_partner(self) -> DeliveryPartner:
        with self.load_lock:
            partner_id = min(self.partner_load, key=self.partner_load.get)
            self.partner_load[partner_id] += 1
        return self.partners[partner_id]

    def complete(self, order_id: str) -> None:
        partner_id = self.assignments.pop(order_id, None)
        if partner_id:
            with self.load_lock:
                self.partner_load[partner_id] -= 1

    def _worker(self) -> None:
        while self.running:
            with self.queue_cond:
                while not self.queue and self.running:
                    self.queue_cond.wait()
                if not self.running:
                    return
                item = heapq.heappop(self.queue)
            partner = self._pick_partner()
            self.assignments[item.order.order_id] = partner.partner_id
            partner.notify(item.order.order_id, f"pickup ready from {item.order.restaurant}")
            time.sleep(0.1)

    def shutdown(self) -> None:
        self.running = False
        with self.queue_cond:
            self.queue_cond.notify_all()
        for worker in self.workers:
            worker.join(timeout=0.2)

def main() -> None:
    repo = OrderRepository()
    service = OrderService(repo)
    partners = {
        "DP1": DeliveryPartner("DP1"),
        "DP2": DeliveryPartner("DP2"),
        "DP3": DeliveryPartner("DP3"),
    }
    for partner in partners.values():
        service.register_partner(partner)
    dispatcher = PriorityDispatchCoordinator(service, partners, worker_count=2)
    service.attach_dispatcher(dispatcher)

    for priority, order_id in enumerate(["O1", "O2", "O3", "O4"], start=1):
        service.place_order(order_id, "Fusion Kitchen", priority)

    time.sleep(0.4)
    for order_id in ["O1", "O2", "O3", "O4"]:
        service.mark_picked(order_id)

    dispatcher.shutdown()

if __name__ == "__main__":
    main()

Level 3 — Resilient Fulfillment

The final food-delivery level treats external partners as unreliable, so the dispatcher now emits commands into a saga that records each step and compensation. We wrap payment and courier integrations with circuit breakers and bounded retries, and we persist intent so that a crash mid-flow can be replayed safely. Learners can observe how the same domain events from earlier levels now feed resilience concerns without rewriting the booking pipeline.

from __future__ import annotations

import heapq
import random
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Callable, Deque, Dict, List, Optional, Protocol

class DeliveryObserver(Protocol):
    def notify(self, order_id: str, details: str) -> None:
        ...

@dataclass
class DeliveryPartner:
    partner_id: str
    notifications: List[str] = field(default_factory=list)

    def notify(self, order_id: str, details: str) -> None:
        message = f"Order {order_id}: {details}"
        self.notifications.append(message)
        print(f"Notify {self.partner_id}: {message}")

@dataclass
class Order:
    order_id: str
    restaurant: str
    priority: int = 1
    status: str = "PLACED"

@dataclass(order=True)
class DispatchItem:
    sort_key: int
    created_at: float
    order: Order = field(compare=False)

class OrderRepository:
    def __init__(self) -> None:
        self.orders: Dict[str, Order] = {}

    def save(self, order: Order) -> None:
        self.orders[order.order_id] = order

    def update_status(self, order_id: str, status: str) -> None:
        self.orders[order_id].status = status

    def get(self, order_id: str) -> Order:
        return self.orders[order_id]

class OrderService:
    def __init__(self, repository: OrderRepository) -> None:
        self.repo = repository
        self.observers: List[DeliveryObserver] = []
        self.dispatcher: Optional["PriorityDispatchCoordinator"] = None

    def register_partner(self, partner: DeliveryObserver) -> None:
        self.observers.append(partner)

    def attach_dispatcher(self, dispatcher: "PriorityDispatchCoordinator") -> None:
        self.dispatcher = dispatcher

    def place_order(self, order_id: str, restaurant: str, priority: int = 1) -> Order:
        order = Order(order_id, restaurant, priority)
        self.repo.save(order)
        for observer in self.observers:
            observer.notify(order_id, f"from {restaurant} awaiting pickup")
        if self.dispatcher:
            self.dispatcher.enqueue(order)
        return order

    def mark_picked(self, order_id: str) -> None:
        self.repo.update_status(order_id, "PICKED")
        for observer in self.observers:
            observer.notify(order_id, "picked up")
        if self.dispatcher:
            self.dispatcher.complete(order_id)

    def cancel_order(self, order_id: str, reason: str = "CANCELLED") -> None:
        self.repo.update_status(order_id, reason)
        for observer in self.observers:
            observer.notify(order_id, f"cancelled: {reason}")
        if self.dispatcher:
            self.dispatcher.complete(order_id)

class PriorityDispatchCoordinator:
    def __init__(self, service: OrderService, partners: Dict[str, DeliveryPartner], worker_count: int = 3):
        self.service = service
        self.partners = partners
        self.partner_load: Dict[str, int] = {pid: 0 for pid in partners}
        self.queue: List[DispatchItem] = []
        self.queue_cond = threading.Condition()
        self.load_lock = threading.Lock()
        self.assignments: Dict[str, str] = {}
        self.running = True
        self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(worker_count)]
        for worker in self.workers:
            worker.start()

    def enqueue(self, order: Order) -> None:
        with self.queue_cond:
            sort_key = -order.priority
            heapq.heappush(self.queue, DispatchItem(sort_key, time.time(), order))
            self.queue_cond.notify()

    def _pick_partner(self) -> DeliveryPartner:
        with self.load_lock:
            partner_id = min(self.partner_load, key=self.partner_load.get)
            self.partner_load[partner_id] += 1
        return self.partners[partner_id]

    def complete(self, order_id: str) -> None:
        partner_id = self.assignments.pop(order_id, None)
        if partner_id:
            with self.load_lock:
                self.partner_load[partner_id] -= 1

    def _worker(self) -> None:
        while self.running:
            with self.queue_cond:
                while not self.queue and self.running:
                    self.queue_cond.wait()
                if not self.running:
                    return
                item = heapq.heappop(self.queue)
            partner = self._pick_partner()
            self.assignments[item.order.order_id] = partner.partner_id
            partner.notify(item.order.order_id, f"pickup ready from {item.order.restaurant}")
            time.sleep(0.1)

    def shutdown(self) -> None:
        self.running = False
        with self.queue_cond:
            self.queue_cond.notify_all()
        for worker in self.workers:
            worker.join(timeout=0.2)

class CircuitBreaker:
    def __init__(self, failure_threshold: int, recovery_timeout: float):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.state = "CLOSED"
        self.lock = threading.Lock()

    def call(self, func: Callable[[], str]) -> str:
        with self.lock:
            if self.state == "OPEN":
                if time.time() - self.last_failure_time >= self.recovery_timeout:
                    self.state = "HALF_OPEN"
                else:
                    raise RuntimeError("Payment service unavailable (circuit open)")
        try:
            result = func()
        except Exception:
            with self.lock:
                self.failure_count += 1
                self.last_failure_time = time.time()
                if self.failure_count >= self.failure_threshold:
                    self.state = "OPEN"
            raise
        else:
            with self.lock:
                self.failure_count = 0
                self.state = "CLOSED"
            return result

class RetryPolicy:
    def __init__(self, attempts: int, base_delay: float):
        self.attempts = attempts
        self.base_delay = base_delay

    def call(self, func: Callable[[], str]) -> str:
        last_err: Optional[Exception] = None
        for attempt in range(self.attempts):
            try:
                return func()
            except Exception as exc:
                last_err = exc
                time.sleep(self.base_delay * (2 ** attempt))
        raise last_err if last_err else RuntimeError("Unknown failure")

class SagaStep:
    def __init__(self, action: Callable[[], str], compensate: Callable[[], None]):
        self.action = action
        self.compensate = compensate

class SagaOrchestrator:
    def __init__(self, steps: List[SagaStep]):
        self.steps = steps

    def execute(self) -> str:
        executed: List[SagaStep] = []
        try:
            for step in self.steps:
                result = step.action()
                executed.append(step)
                print(result)
            return "SAGA_COMPLETED"
        except Exception as exc:
            print(f"Saga failed: {exc}. Rolling back…")
            for step in reversed(executed):
                step.compensate()
            return "SAGA_ROLLED_BACK"

class PaymentGateway:
    def __init__(self, failure_rate: float = 0.5):
        self.failure_rate = failure_rate

    def charge(self, amount: float) -> str:
        if random.random() < self.failure_rate:
            raise RuntimeError("Payment declined")
        return f"receipt-{int(time.time() * 1000)}"

class CourierPool:
    def __init__(self, couriers: List[str]):
        self.available: Deque[str] = deque(couriers)
        self.lock = threading.Lock()

    def acquire(self) -> str:
        with self.lock:
            if not self.available:
                raise RuntimeError("No courier available")
            return self.available.popleft()

    def release(self, courier_id: str) -> None:
        with self.lock:
            self.available.append(courier_id)

@dataclass
class FulfillmentContext:
    order_id: str
    restaurant_reserved: bool = False
    courier_id: Optional[str] = None
    payment_receipt: Optional[str] = None

class ResilientFoodCoordinator:
    def __init__(self,
                 service: OrderService,
                 dispatcher: PriorityDispatchCoordinator,
                 payment_gateway: PaymentGateway,
                 breaker: CircuitBreaker,
                 retry_policy: RetryPolicy,
                 couriers: CourierPool):
        self.service = service
        self.dispatcher = dispatcher
        self.payment_gateway = payment_gateway
        self.breaker = breaker
        self.retry_policy = retry_policy
        self.couriers = couriers
        self.metrics_lock = threading.Lock()
        self.metrics = {"success": 0, "rolled_back": 0}
        self.dead_letter: List[str] = []

    def fulfill(self, order_id: str, restaurant: str, priority: int, amount: float) -> str:
        order = self.service.place_order(order_id, restaurant, priority)
        ctx = FulfillmentContext(order_id=order_id)

        def reserve_restaurant() -> str:
            ctx.restaurant_reserved = True
            return f"Restaurant reserved for {order_id}"

        def cancel_restaurant() -> None:
            if ctx.restaurant_reserved:
                print(f"Compensate: release restaurant for {order_id}")
                ctx.restaurant_reserved = False

        def charge_payment() -> str:
            receipt = self.retry_policy.call(
                lambda: self.breaker.call(lambda: self.payment_gateway.charge(amount))
            )
            ctx.payment_receipt = receipt
            return f"Payment captured {receipt}"

        def refund_payment() -> None:
            if ctx.payment_receipt:
                print(f"Compensate: refund {ctx.payment_receipt} for {order_id}")
                ctx.payment_receipt = None

        def assign_courier() -> str:
            courier_id = self.couriers.acquire()
            ctx.courier_id = courier_id
            return f"Courier {courier_id} assigned to {order_id}"

        def release_courier() -> None:
            if ctx.courier_id:
                print(f"Compensate: release courier {ctx.courier_id} for {order_id}")
                self.couriers.release(ctx.courier_id)
                ctx.courier_id = None

        saga = SagaOrchestrator([
            SagaStep(reserve_restaurant, cancel_restaurant),
            SagaStep(charge_payment, refund_payment),
            SagaStep(assign_courier, release_courier),
        ])

        outcome = saga.execute()
        if outcome == "SAGA_COMPLETED":
            self.service.mark_picked(order_id)
            if ctx.courier_id:
                self.couriers.release(ctx.courier_id)
                ctx.courier_id = None
            with self.metrics_lock:
                self.metrics["success"] += 1
        else:
            self.service.cancel_order(order_id, "FAILED")
            self.dead_letter.append(order_id)
            with self.metrics_lock:
                self.metrics["rolled_back"] += 1
        return outcome

    def snapshot(self) -> Dict[str, int]:
        with self.metrics_lock:
            return dict(self.metrics)

def main() -> None:
    random.seed(9)
    repo = OrderRepository()
    service = OrderService(repo)
    partners = {
        "DP1": DeliveryPartner("DP1"),
        "DP2": DeliveryPartner("DP2"),
    }
    for partner in partners.values():
        service.register_partner(partner)
    dispatcher = PriorityDispatchCoordinator(service, partners, worker_count=2)
    service.attach_dispatcher(dispatcher)

    coordinator = ResilientFoodCoordinator(
        service=service,
        dispatcher=dispatcher,
        payment_gateway=PaymentGateway(failure_rate=0.4),
        breaker=CircuitBreaker(failure_threshold=2, recovery_timeout=0.5),
        retry_policy=RetryPolicy(attempts=3, base_delay=0.05),
        couriers=CourierPool(["C1", "C2"])
    )

    orders = [("FO-101", "Spicy Bites", 3, 25.0), ("FO-102", "Fusion Kitchen", 1, 18.0), ("FO-103", "Veg Delight", 2, 22.0)]
    for order_id, restaurant, priority, amount in orders:
        outcome = coordinator.fulfill(order_id, restaurant, priority, amount)
        print(f"Outcome for {order_id}: {outcome}")
        time.sleep(0.1)

    print("Metrics:", coordinator.snapshot())
    if coordinator.dead_letter:
        print("Dead letters:", coordinator.dead_letter)

    dispatcher.shutdown()

if __name__ == "__main__":
    main()
Machine Coding - Ride Sharing (Levels 1-3)
ride sharing - surge - resiliency
Scope: evolve matching to surge-aware resilient mobilityThemes: strategy, locks, fallbacks

Evolve the ride sharing platform from nearest-driver matching to surge-aware concurrent dispatch and finally wrap it with resilience patterns for mobility services.

Ride Sharing Platform
 ├─ Level 1: Matching Strategy → Driver Repository
 ├─ Level 2: Zone Locks → Surge Manager
 └─ Level 3: Resilient Dispatcher → Cache Fallback

Level 1 — Matching Engine

Level 1 for ride sharing emphasises a clean separation between the repository of drivers and the matching heuristic so we can experiment without rewriting infrastructure. We compute nearest drivers by delegating to a pluggable strategy, returning DTOs instead of raw storage objects which clarifies the boundary. By practising with deterministic repositories learners experience how to unit test the matching flow before concurrency enters the picture.

from dataclasses import dataclass
from typing import Dict, Protocol, Tuple, Optional
import math

@dataclass
class Driver:
    driver_id: str
    location: Tuple[float, float]
    available: bool = True
    zone: str = "default"

class MatchStrategy(Protocol):
    def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
        ...

class NearestMatch(MatchStrategy):
    def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
        best_driver = None
        best_distance = float('inf')
        rx, ry = rider_loc
        for driver in drivers.values():
            if not driver.available:
                continue
            dx, dy = driver.location
            dist = math.hypot(rx - dx, ry - dy)
            if dist < best_distance:
                best_distance, best_driver = dist, driver
        return best_driver

class RideSharingService:
    def __init__(self, strategy: MatchStrategy):
        self.strategy = strategy
        self.drivers: Dict[str, Driver] = {}
        self.assignments: Dict[str, str] = {}

    def add_driver(self, driver: Driver) -> None:
        self.drivers[driver.driver_id] = driver

    def request_ride(self, rider_id: str, location: Tuple[float, float]) -> str:
        driver = self.strategy.pick(self.drivers, location)
        if not driver:
            raise RuntimeError('No drivers available')
        driver.available = False
        self.assignments[rider_id] = driver.driver_id
        return driver.driver_id

    def complete_ride(self, rider_id: str) -> None:
        driver_id = self.assignments.pop(rider_id)
        self.drivers[driver_id].available = True

    def cancel_ride(self, rider_id: str) -> None:
        driver_id = self.assignments.pop(rider_id, None)
        if driver_id:
            self.drivers[driver_id].available = True

if __name__ == "__main__":
    service = RideSharingService(NearestMatch())
    service.add_driver(Driver('D1', (12.9, 77.6), zone="north"))
    service.add_driver(Driver('D2', (12.95, 77.58), zone="south"))
    assigned = service.request_ride('R1', (12.92, 77.59))
    print('Assigned driver:', assigned)
    service.complete_ride('R1')

Level 2 — Concurrent Surge Dispatch

Level 2 acknowledges that requests arrive simultaneously, so we partition the city into zones guarded by locks and audit the flow for potential races. A surge calculator feeds into the assignment process, demonstrating how to enrich pricing while still respecting isolation between zones. The result shows how to stabilise throughput by combining contention control with clear separation of immutable ride requests and mutable driver state.

from __future__ import annotations

import math
import threading
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Optional

@dataclass
class Driver:
    driver_id: str
    location: Tuple[float, float]
    available: bool = True
    zone: str = "default"

class MatchStrategy:
    def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
        raise NotImplementedError

class NearestMatch(MatchStrategy):
    def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
        best_driver = None
        best_distance = float("inf")
        rx, ry = rider_loc
        for driver in drivers.values():
            if not driver.available:
                continue
            dx, dy = driver.location
            dist = math.hypot(rx - dx, ry - dy)
            if dist < best_distance:
                best_distance, best_driver = dist, driver
        return best_driver

class RideSharingService:
    def __init__(self, strategy: MatchStrategy):
        self.strategy = strategy
        self.drivers: Dict[str, Driver] = {}
        self.assignments: Dict[str, str] = {}

    def add_driver(self, driver: Driver) -> None:
        self.drivers[driver.driver_id] = driver

    def request_ride(self, rider_id: str, location: Tuple[float, float]) -> str:
        driver = self.strategy.pick(self.drivers, location)
        if not driver:
            raise RuntimeError("No drivers available")
        driver.available = False
        self.assignments[rider_id] = driver.driver_id
        return driver.driver_id

    def complete_ride(self, rider_id: str) -> None:
        driver_id = self.assignments.pop(rider_id)
        self.drivers[driver_id].available = True

class ZoneLockManager:
    def __init__(self) -> None:
        self._locks: Dict[str, threading.RLock] = defaultdict(threading.RLock)

    def lock(self, zone: str) -> threading.RLock:
        return self._locks[zone]

class SurgeManager:
    def __init__(self, base_multiplier: float = 1.0, step: float = 0.2) -> None:
        self.base_multiplier = base_multiplier
        self.step = step
        self.multipliers: Dict[str, float] = defaultdict(lambda: self.base_multiplier)
        self.lock = threading.Lock()

    def bump(self, zone: str) -> float:
        with self.lock:
            self.multipliers[zone] += self.step
            return round(self.multipliers[zone], 2)

    def relax(self, zone: str) -> float:
        with self.lock:
            self.multipliers[zone] = max(self.base_multiplier, self.multipliers[zone] - self.step)
            return round(self.multipliers[zone], 2)

    def current(self, zone: str) -> float:
        with self.lock:
            return round(self.multipliers[zone], 2)

class ConcurrentRideSharingService(RideSharingService):
    def __init__(self, strategy: MatchStrategy, zone_locks: ZoneLockManager, surge: SurgeManager):
        super().__init__(strategy)
        self.zone_locks = zone_locks
        self.surge = surge
        self.assignment_zones: Dict[str, str] = {}
        self.metrics_lock = threading.Lock()
        self.metrics = {"completed": 0, "contention": 0}

    def request_ride(self, rider_id: str, location: Tuple[float, float], zone: str) -> Tuple[str, float]:
        lock = self.zone_locks.lock(zone)
        acquired = lock.acquire(timeout=0.1)
        if not acquired:
            with self.metrics_lock:
                self.metrics["contention"] += 1
            lock.acquire()
        try:
            driver_id = super().request_ride(rider_id, location)
            self.assignment_zones[rider_id] = zone
            surge_multiplier = self.surge.bump(zone)
            return driver_id, surge_multiplier
        finally:
            lock.release()

    def complete_ride(self, rider_id: str) -> None:
        zone = self.assignment_zones.pop(rider_id, "default")
        super().complete_ride(rider_id)
        self.surge.relax(zone)
        with self.metrics_lock:
            self.metrics["completed"] += 1

    def snapshot(self) -> Dict[str, int]:
        with self.metrics_lock:
            return dict(self.metrics)

def main() -> None:
    zone_locks = ZoneLockManager()
    surge = SurgeManager()
    service = ConcurrentRideSharingService(NearestMatch(), zone_locks, surge)
    service.add_driver(Driver("D1", (12.9, 77.6), zone="central"))
    service.add_driver(Driver("D2", (12.91, 77.61), zone="central"))
    service.add_driver(Driver("D3", (12.95, 77.58), zone="north"))

    def rider_task(rider_id: str) -> None:
        try:
            driver_id, surge_multiplier = service.request_ride(rider_id, (12.9, 77.6), "central")
            print(f"{rider_id} → {driver_id} at surge {surge_multiplier}x")
            time.sleep(0.05)
            service.complete_ride(rider_id)
        except RuntimeError as exc:
            print(f"{rider_id} failed: {exc}")

    threads = [threading.Thread(target=rider_task, args=(f"R{i}",)) for i in range(1, 6)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    print("Metrics:", service.snapshot())
    print("Current surge (central):", surge.current("central"))

if __name__ == "__main__":
    main()

Level 3 — Resilient Mobility

Level 3 wraps the concurrent matcher with caching layers and health probes because live systems must ride out partial outages. We front-load driver availability into a read-through cache, invalidate cautiously, and detect stale regions so dispatch can fall back to a slower, durable store when needed. Telemetry hooks and explicit failure modes encourage learners to think about observability alongside correctness.

from __future__ import annotations

import math
import random
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Deque, Dict, Optional, Tuple

@dataclass
class Driver:
    driver_id: str
    location: Tuple[float, float]
    available: bool = True
    zone: str = "default"

class MatchStrategy:
    def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
        raise NotImplementedError

class NearestMatch(MatchStrategy):
    def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
        best_driver = None
        best_distance = float("inf")
        rx, ry = rider_loc
        for driver in drivers.values():
            if not driver.available:
                continue
            dx, dy = driver.location
            dist = math.hypot(rx - dx, ry - dy)
            if dist < best_distance:
                best_distance, best_driver = dist, driver
        return best_driver

class RideSharingService:
    def __init__(self, strategy: MatchStrategy):
        self.strategy = strategy
        self.drivers: Dict[str, Driver] = {}
        self.assignments: Dict[str, str] = {}

    def add_driver(self, driver: Driver) -> None:
        self.drivers[driver.driver_id] = driver

    def request_ride(self, rider_id: str, location: Tuple[float, float]) -> str:
        driver = self.strategy.pick(self.drivers, location)
        if not driver:
            raise RuntimeError("No drivers available")
        driver.available = False
        self.assignments[rider_id] = driver.driver_id
        return driver.driver_id

    def complete_ride(self, rider_id: str) -> None:
        driver_id = self.assignments.pop(rider_id)
        self.drivers[driver_id].available = True

    def cancel_ride(self, rider_id: str) -> None:
        driver_id = self.assignments.pop(rider_id, None)
        if driver_id:
            self.drivers[driver_id].available = True

class ZoneLockManager:
    def __init__(self) -> None:
        self._locks: Dict[str, threading.RLock] = defaultdict(threading.RLock)

    def lock(self, zone: str) -> threading.RLock:
        return self._locks[zone]

class SurgeManager:
    def __init__(self, base_multiplier: float = 1.0, step: float = 0.2) -> None:
        self.base_multiplier = base_multiplier
        self.step = step
        self.multipliers: Dict[str, float] = defaultdict(lambda: self.base_multiplier)
        self.lock = threading.Lock()

    def bump(self, zone: str) -> float:
        with self.lock:
            self.multipliers[zone] += self.step
            return round(self.multipliers[zone], 2)

    def relax(self, zone: str) -> float:
        with self.lock:
            self.multipliers[zone] = max(self.base_multiplier, self.multipliers[zone] - self.step)
            return round(self.multipliers[zone], 2)

    def current(self, zone: str) -> float:
        with self.lock:
            return round(self.multipliers[zone], 2)

class ConcurrentRideSharingService(RideSharingService):
    def __init__(self, strategy: MatchStrategy, zone_locks: ZoneLockManager, surge: SurgeManager):
        super().__init__(strategy)
        self.zone_locks = zone_locks
        self.surge = surge
        self.assignment_zones: Dict[str, str] = {}

    def request_ride(self, rider_id: str, location: Tuple[float, float], zone: str) -> Tuple[str, float]:
        lock = self.zone_locks.lock(zone)
        with lock:
            driver_id = super().request_ride(rider_id, location)
            self.assignment_zones[rider_id] = zone
            surge_multiplier = self.surge.bump(zone)
            return driver_id, surge_multiplier

    def complete_ride(self, rider_id: str) -> None:
        zone = self.assignment_zones.pop(rider_id, "default")
        super().complete_ride(rider_id)
        self.surge.relax(zone)

    def cancel_ride(self, rider_id: str) -> None:
        zone = self.assignment_zones.pop(rider_id, None)
        super().cancel_ride(rider_id)
        if zone:
            self.surge.relax(zone)

class AvailabilityCache:
    def __init__(self) -> None:
        self.store: Dict[str, Driver] = {}
        self.lock = threading.Lock()

    def upsert(self, driver: Driver) -> None:
        with self.lock:
            self.store[driver.driver_id] = Driver(driver.driver_id, driver.location, driver.available, driver.zone)

    def reserve_by_id(self, driver_id: str) -> None:
        with self.lock:
            if driver_id in self.store:
                self.store[driver_id].available = False

    def reserve_best(self, zone: str, rider_loc: Tuple[float, float]) -> Optional[Driver]:
        with self.lock:
            best_id = None
            best_distance = float("inf")
            rx, ry = rider_loc
            for driver in self.store.values():
                if driver.zone != zone or not driver.available:
                    continue
                dx, dy = driver.location
                dist = math.hypot(rx - dx, ry - dy)
                if dist < best_distance:
                    best_distance, best_id = dist, driver.driver_id
            if best_id is None:
                return None
            driver = self.store[best_id]
            driver.available = False
            return Driver(driver.driver_id, driver.location, False, driver.zone)

    def release(self, driver_id: str) -> None:
        with self.lock:
            if driver_id in self.store:
                self.store[driver_id].available = True

class CircuitBreaker:
    def __init__(self, failure_threshold: int, recovery_timeout: float):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.state = "CLOSED"
        self.lock = threading.Lock()

    def allow(self) -> bool:
        with self.lock:
            if self.state == "OPEN":
                if time.time() - self.last_failure_time >= self.recovery_timeout:
                    self.state = "HALF_OPEN"
                    return True
                return False
            return True

    def record_failure(self) -> None:
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                self.failure_count = 0

    def record_success(self) -> None:
        with self.lock:
            self.failure_count = 0
            self.state = "CLOSED"

class MobilityGateway:
    def __init__(self, failure_rate: float = 0.3) -> None:
        self.failure_rate = failure_rate

    def reserve(self, zone: str) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError(f"Mobility gateway timeout for zone {zone}")

class ResilientRideOrchestrator:
    def __init__(self,
                 service: ConcurrentRideSharingService,
                 primary_cache: AvailabilityCache,
                 fallback_cache: AvailabilityCache,
                 breaker: CircuitBreaker,
                 gateway: MobilityGateway):
        self.service = service
        self.primary_cache = primary_cache
        self.fallback_cache = fallback_cache
        self.breaker = breaker
        self.gateway = gateway
        self.assignments: Dict[str, Tuple[str, str]] = {}
        self.metrics_lock = threading.Lock()
        self.metrics = {"primary": 0, "fallback": 0, "rejected": 0}
        self.dead_letters: Deque[str] = deque()

    def register_driver(self, driver: Driver, *, primary: bool = True) -> None:
        if primary:
            self.service.add_driver(driver)
            self.primary_cache.upsert(driver)
        self.fallback_cache.upsert(driver)

    def request_ride(self, rider_id: str, location: Tuple[float, float], zone: str) -> Tuple[str, float, str]:
        if self.breaker.allow():
            try:
                self.gateway.reserve(zone)
                driver_id, surge = self.service.request_ride(rider_id, location, zone)
                self.primary_cache.reserve_by_id(driver_id)
                self.breaker.record_success()
                with self.metrics_lock:
                    self.metrics["primary"] += 1
                self.assignments[rider_id] = ("primary", driver_id, zone)
                return driver_id, surge, "primary"
            except Exception:
                self.breaker.record_failure()
                self.service.cancel_ride(rider_id)

        fallback_driver = self.fallback_cache.reserve_best(zone, location)
        if fallback_driver:
            with self.metrics_lock:
                self.metrics["fallback"] += 1
            self.assignments[rider_id] = ("fallback", fallback_driver.driver_id, zone)
            return fallback_driver.driver_id, self.service.surge.current(zone), "fallback"

        with self.metrics_lock:
            self.metrics["rejected"] += 1
        self.dead_letters.append(rider_id)
        raise RuntimeError("No drivers available in any pool")

    def complete_ride(self, rider_id: str) -> None:
        source, driver_id, zone = self.assignments.pop(rider_id)
        if source == "primary":
            self.service.complete_ride(rider_id)
            self.primary_cache.release(driver_id)
        else:
            self.fallback_cache.release(driver_id)
            self.service.surge.relax(zone)

    def cancel_ride(self, rider_id: str) -> None:
        record = self.assignments.pop(rider_id, None)
        if not record:
            return
        source, driver_id, zone = record
        if source == "primary":
            self.service.cancel_ride(rider_id)
            self.primary_cache.release(driver_id)
        else:
            self.fallback_cache.release(driver_id)
        self.service.surge.relax(zone)

    def snapshot(self) -> Dict[str, int]:
        with self.metrics_lock:
            return dict(self.metrics)

def main() -> None:
    random.seed(7)
    zone_locks = ZoneLockManager()
    surge = SurgeManager()
    service = ConcurrentRideSharingService(NearestMatch(), zone_locks, surge)
    primary_cache = AvailabilityCache()
    fallback_cache = AvailabilityCache()
    breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=0.4)
    gateway = MobilityGateway(failure_rate=0.35)
    orchestrator = ResilientRideOrchestrator(service, primary_cache, fallback_cache, breaker, gateway)

    orchestrator.register_driver(Driver("D1", (12.9, 77.6), zone="central"))
    orchestrator.register_driver(Driver("D2", (12.91, 77.61), zone="central"))
    orchestrator.register_driver(Driver("FD1", (12.88, 77.58), zone="central"), primary=False)

    def rider_task(rider_id: str) -> None:
        try:
            driver_id, surge_multiplier, source = orchestrator.request_ride(rider_id, (12.9, 77.6), "central")
            print(f"{rider_id} → {driver_id} via {source} at {surge_multiplier}x")
            time.sleep(random.uniform(0.05, 0.15))
            orchestrator.complete_ride(rider_id)
        except RuntimeError as exc:
            print(f"{rider_id} failed: {exc}")

    threads = [threading.Thread(target=rider_task, args=(f"R{i}",)) for i in range(1, 7)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    print("Metrics:", orchestrator.snapshot())
    if orchestrator.dead_letters:
        print("Dead letters:", list(orchestrator.dead_letters))

if __name__ == "__main__":
    main()
Machine Coding - Notification Service (Levels 1-3)
notifications - concurrency - resiliency
Scope: evolve notifications from fan-out to resilient multi-channelThemes: observer, queues, bulkheads

Progressively build the notification platform from synchronous fan-out to prioritized asynchronous delivery and finally to a resilient multi-provider gateway.

Notification Gateway
 ├─ Level 1: Publisher → Channel Observers
 ├─ Level 2: Queue → Worker Pool → Dead Letters
 └─ Level 3: Provider Bulkheads → Circuit Breakers

Level 1 — Event Fan-out

The foundational notification service demonstrates how to keep a publisher ignorant of actual delivery rules by leaning on the observer pattern. We register strategies for email, SMS, or push that translate a common message contract into channel-specific payloads while tracking delivery state. By running through happy-path flows developers learn to reason about fan-out without premature concerns about throughput.

from typing import Protocol, List

class ChannelStrategy(Protocol):
    def send(self, recipient: str, message: str) -> None:
        ...

class EmailStrategy:
    def send(self, recipient: str, message: str) -> None:
        print(f"EMAIL->{recipient}: {message}")

class SmsStrategy:
    def send(self, recipient: str, message: str) -> None:
        print(f"SMS->{recipient}: {message}")

class NotificationService:
    def __init__(self, strategies: List[ChannelStrategy]):
        self.strategies = strategies

    def notify(self, recipient: str, message: str) -> None:
        for strategy in self.strategies:
            strategy.send(recipient, message)

if __name__ == "__main__":
    service = NotificationService([EmailStrategy(), SmsStrategy()])
    service.notify('user@example.com', 'Welcome to the platform!')

Level 2 — Asynchronous Delivery Workers

At Level 2 we surface the realities of bursty traffic, so a queue absorbs inbound notifications and a worker pool pulls tasks to deliver them asynchronously. Retries with exponential backoff and poison message handling illustrate how to preserve ordering and durability when downstream channels misbehave. We keep instrumentation close to enqueue/dequeue points which makes debugging and capacity planning part of the exercise.

from __future__ import annotations

import threading
import time
from dataclasses import dataclass
from queue import Queue
from typing import List, Optional, Protocol

class ChannelStrategy(Protocol):
    def send(self, recipient: str, message: str) -> None:
        ...

class EmailStrategy:
    def send(self, recipient: str, message: str) -> None:
        print(f"EMAIL->{recipient}: {message}")

class SmsStrategy:
    def send(self, recipient: str, message: str) -> None:
        print(f"SMS->{recipient}: {message}")

class NotificationService:
    def __init__(self, strategies: List[ChannelStrategy]):
        self.strategies = strategies

    def notify(self, recipient: str, message: str) -> None:
        for strategy in self.strategies:
            strategy.send(recipient, message)

@dataclass
class NotificationJob:
    recipient: str
    message: str
    attempt: int = 0

class AsyncNotificationGateway:
    def __init__(self, service: NotificationService, max_workers: int = 3, max_retries: int = 2):
        self.service = service
        self.max_retries = max_retries
        self.queue: Queue[NotificationJob] = Queue()
        self.dead_letters: List[NotificationJob] = []
        self.running = True
        self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(max_workers)]
        for worker in self.workers:
            worker.start()

    def enqueue(self, recipient: str, message: str) -> None:
        self.queue.put(NotificationJob(recipient, message))

    def _dispatch(self, job: NotificationJob) -> None:
        if "FAIL" in job.message:
            raise RuntimeError("channel failure")
        self.service.notify(job.recipient, job.message)

    def _worker(self) -> None:
        while self.running:
            try:
                job = self.queue.get(timeout=0.5)
            except Empty:
                continue
            try:
                self._dispatch(job)
            except Exception as exc:
                job.attempt += 1
                if job.attempt <= self.max_retries:
                    print(f"Retry {job.attempt} for {job.recipient}: {exc}")
                    time.sleep(0.1 * job.attempt)
                    self.queue.put(job)
                else:
                    print(f"Dead-lettering {job.recipient}: {job.message}")
                    self.dead_letters.append(job)
            finally:
                self.queue.task_done()

    def shutdown(self) -> None:
        self.running = False
        for worker in self.workers:
            worker.join(timeout=0.2)

def main() -> None:
    base_service = NotificationService([EmailStrategy(), SmsStrategy()])
    gateway = AsyncNotificationGateway(base_service, max_workers=2, max_retries=2)
    gateway.enqueue("user1", "Welcome!")
    gateway.enqueue("user2", "FAIL-SEND")
    gateway.enqueue("user3", "Daily digest")
    time.sleep(1.5)
    gateway.shutdown()
    print("Dead letters:", [(job.recipient, job.message) for job in gateway.dead_letters])

if __name__ == "__main__":
    main()

Level 3 — Multi-Provider Resiliency

Level 3 assumes providers fail independently, so we introduce provider pools isolated by bulkhead limits and guarded by circuit breakers. Fallback routing policies decide when to drain to secondary vendors, and metrics feed dashboards that surface saturation early. The walkthrough shows how to combine structural patterns with runtime safeguards instead of bolting resilience on afterwards.

import threading
import time
import random
from collections import deque
from typing import Callable, Protocol

class Provider(Protocol):
    def send(self, recipient: str, message: str) -> None:
        ...

class PrimaryProvider:
    def __init__(self):
        self.counter = 0

    def send(self, recipient: str, message: str) -> None:
        self.counter += 1
        if self.counter % 2 == 0:
            raise RuntimeError('Primary provider outage')
        print(f"PRIMARY->{recipient}: {message}")

class BackupProvider:
    def send(self, recipient: str, message: str) -> None:
        print(f"BACKUP->{recipient}: {message}")

class Bulkhead:
    def __init__(self, provider: Provider, capacity: int):
        self.provider = provider
        self.semaphore = threading.Semaphore(capacity)

    def execute(self, recipient: str, message: str) -> None:
        with self.semaphore:
            self.provider.send(recipient, message)

class Breaker:
    def __init__(self, failure_threshold: int, recovery_time: float):
        self.failure_threshold = failure_threshold
        self.recovery_time = recovery_time
        self.failures = 0
        self.last_failure = 0.0
        self.state = 'CLOSED'

    def invoke(self, func: Callable[[], None]) -> None:
        now = time.time()
        if self.state == 'OPEN' and now - self.last_failure < self.recovery_time:
            raise RuntimeError('Breaker open')
        try:
            func()
            self.failures = 0
            self.state = 'CLOSED'
        except Exception as exc:
            self.failures += 1
            self.last_failure = now
            if self.failures >= self.failure_threshold:
                self.state = 'OPEN'
            raise exc

class ResilientGateway:
    def __init__(self):
        self.primary = Bulkhead(PrimaryProvider(), capacity=2)
        self.backup = Bulkhead(BackupProvider(), capacity=4)
        self.breaker = Breaker(2, 2.0)

    def send(self, recipient: str, message: str) -> None:
        try:
            self.breaker.invoke(lambda: self.primary.execute(recipient, message))
        except Exception:
            print('Primary failed, using backup…')
            self.backup.execute(recipient, message)

if __name__ == "__main__":
    gateway = ResilientGateway()
    for idx in range(1, 7):
        gateway.send(f'user{idx}', 'Security alert')
        time.sleep(0.4)
Machine Coding - Rate Limiter (Levels 1-3)
rate limiter - distributed - resiliency
Scope: evolve single-node throttling into a distributed, resilient gatewayThemes: token bucket, sliding window, graceful degradation

Progressively enhance the rate limiting system from an in-process token bucket to a distributed sliding window and finally a resilient API gateway that degrades gracefully under failure.

Rate Limiter Stack\n ├─ Level 1: Token Bucket Evaluator\n ├─ Level 2: Distributed Sliding Window + Coordination\n └─ Level 3: Gateway Orchestrator with Circuit Breaker & Fallback

Level 1 — Core Token Bucket

We start the rate limiter series by implementing the classic token bucket and instrumenting it so bursts can be visualised. The design emphasises separating the clock, storage, and policy which makes it easier to validate behaviour with deterministic tests. By replaying the provided sample workload learners can watch how refills, dequeues, and denials interplay over time.

import threading
import time

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time.time()
        self.lock = threading.Lock()

    def _refill(self) -> None:
        now = time.time()
        elapsed = now - self.last_refill
        tokens_to_add = int(elapsed * self.refill_rate)
        if tokens_to_add > 0:
            self.tokens = min(self.capacity, self.tokens + tokens_to_add)
            self.last_refill = now

    def try_consume(self) -> bool:
        with self.lock:
            self._refill()
            if self.tokens > 0:
                self.tokens -= 1
                return True
            return False

class RateLimiter:
    def __init__(self, bucket: TokenBucket):
        self.bucket = bucket

    def allow(self) -> bool:
        return self.bucket.try_consume()

if __name__ == "__main__":
    limiter = RateLimiter(TokenBucket(capacity=5, refill_rate=2))
    for _ in range(10):
        print("Allowed" if limiter.allow() else "Throttled")
        time.sleep(0.3)

Level 2 — Distributed Sliding Window

Level 2 moves from single-threaded arithmetic to shared state by spreading counters across threads and synchronising with locks. We model the rolling window as timestamped buckets so we can evict old events quickly and avoid unbounded memory growth. The sample harness demonstrates how to reason about fairness when multiple application servers rely on the same limiter view.

from __future__ import annotations

import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Deque, Dict

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time.time()
        self.lock = threading.Lock()

    def _refill(self) -> None:
        now = time.time()
        elapsed = now - self.last_refill
        tokens_to_add = int(elapsed * self.refill_rate)
        if tokens_to_add > 0:
            self.tokens = min(self.capacity, self.tokens + tokens_to_add)
            self.last_refill = now

    def try_consume(self) -> bool:
        with self.lock:
            self._refill()
            if self.tokens > 0:
                self.tokens -= 1
                return True
            return False

class RateLimiter:
    def __init__(self, name: str, bucket: TokenBucket):
        self.name = name
        self.bucket = bucket

    def allow(self) -> bool:
        return self.bucket.try_consume()

@dataclass
class WindowState:
    events: Deque[float]

class SlidingWindowCoordinator:
    def __init__(self, window_seconds: float, max_requests: int, replicas: Dict[str, RateLimiter]):
        self.window_seconds = window_seconds
        self.max_requests = max_requests
        self.replicas = replicas
        self.window: Dict[str, WindowState] = defaultdict(lambda: WindowState(deque()))
        self.window_lock = threading.Lock()

    def allow(self, tenant: str) -> bool:
        now = time.time()

        with self.window_lock:
            state = self.window[tenant]
            while state.events and now - state.events[0] > self.window_seconds:
                state.events.popleft()
            if len(state.events) >= self.max_requests:
                return False

        limiter = self.replicas[tenant]
        if not limiter.allow():
            return False

        with self.window_lock:
            state = self.window[tenant]
            state.events.append(now)
        return True

    def release(self, tenant: str) -> None:
        limiter = self.replicas[tenant]
        with self.window_lock:
            state = self.window[tenant]
            if state.events:
                state.events.pop()
        with limiter.bucket.lock:
            limiter.bucket.tokens = min(limiter.bucket.capacity, limiter.bucket.tokens + 1)

def main() -> None:
    replicas = {
        "tenant-a": RateLimiter("tenant-a", TokenBucket(capacity=5, refill_rate=3)),
        "tenant-b": RateLimiter("tenant-b", TokenBucket(capacity=3, refill_rate=2)),
    }
    coordinator = SlidingWindowCoordinator(window_seconds=1.0, max_requests=4, replicas=replicas)
    decisions: list[tuple[str, bool]] = []

    def fire(tenant: str, idx: int) -> None:
        allowed = coordinator.allow(tenant)
        decisions.append((f"{tenant}-{idx}", allowed))

    threads = [
        threading.Thread(target=fire, args=("tenant-a", i))
        for i in range(8)
    ] + [
        threading.Thread(target=fire, args=("tenant-b", i))
        for i in range(6)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    for decision in sorted(decisions):
        print(decision)

if __name__ == "__main__":
    main()

Level 3 — Resilient Gateway

The advanced rate limiter treats downstream failures as a signal, blending fixed quotas with adaptive throttling that reacts to error rates. We integrate a circuit breaker that can trip when a dependency degrades, and the limiter publishes state changes for observability. By stepping through the logs you can see how protecting a partner involves both controlling volume and surfacing intent to operators.

from __future__ import annotations

import random
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, Dict, Optional

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time.time()
        self.lock = threading.Lock()

    def _refill(self) -> None:
        now = time.time()
        elapsed = now - self.last_refill
        tokens_to_add = int(elapsed * self.refill_rate)
        if tokens_to_add > 0:
            self.tokens = min(self.capacity, self.tokens + tokens_to_add)
            self.last_refill = now

    def try_consume(self) -> bool:
        with self.lock:
            self._refill()
            if self.tokens > 0:
                self.tokens -= 1
                return True
            return False

class RateLimiter:
    def __init__(self, name: str, bucket: TokenBucket):
        self.name = name
        self.bucket = bucket

    def allow(self) -> bool:
        return self.bucket.try_consume()

@dataclass
class WindowState:
    events: Deque[float]

class SlidingWindowCoordinator:
    def __init__(self, window_seconds: float, max_requests: int, replicas: Dict[str, RateLimiter]):
        self.window_seconds = window_seconds
        self.max_requests = max_requests
        self.replicas = replicas
        self.window: Dict[str, WindowState] = defaultdict(lambda: WindowState(deque()))
        self.window_lock = threading.Lock()

    def allow(self, tenant: str) -> bool:
        now = time.time()
        with self.window_lock:
            state = self.window[tenant]
            while state.events and now - state.events[0] > self.window_seconds:
                state.events.popleft()
            if len(state.events) >= self.max_requests:
                return False

        limiter = self.replicas[tenant]
        if not limiter.allow():
            return False

        with self.window_lock:
            self.window[tenant].events.append(now)
        return True

    def release(self, tenant: str) -> None:
        limiter = self.replicas[tenant]
        with self.window_lock:
            state = self.window[tenant]
            if state.events:
                state.events.pop()
        with limiter.bucket.lock:
            limiter.bucket.tokens = min(limiter.bucket.capacity, limiter.bucket.tokens + 1)

class CircuitBreaker:
    def __init__(self, failure_threshold: int, recovery_timeout: float):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.state = "CLOSED"
        self.lock = threading.Lock()

    def call(self, func: Callable[[], str]) -> str:
        with self.lock:
            if self.state == "OPEN":
                if time.time() - self.last_failure_time >= self.recovery_timeout:
                    self.state = "HALF_OPEN"
                else:
                    raise RuntimeError("Downstream circuit open")
        try:
            result = func()
        except Exception:
            with self.lock:
                self.failure_count += 1
                self.last_failure_time = time.time()
                if self.failure_count >= self.failure_threshold:
                    self.state = "OPEN"
                    self.failure_count = 0
            raise
        else:
            with self.lock:
                self.failure_count = 0
                self.state = "CLOSED"
            return result

class DownstreamService:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate

    def call(self, payload: str) -> str:
        if random.random() < self.failure_rate:
            raise RuntimeError("backend timeout")
        return f"200 OK for {payload}"

class ProtectiveGateway:
    def __init__(self, coordinator: SlidingWindowCoordinator, breaker: CircuitBreaker, service: DownstreamService):
        self.coordinator = coordinator
        self.breaker = breaker
        self.service = service
        self.metrics_lock = threading.Lock()
        self.metrics = {"throttled": 0, "success": 0, "fallback": 0}

    def handle_request(self, tenant: str, payload: str) -> str:
        if not self.coordinator.allow(tenant):
            with self.metrics_lock:
                self.metrics["throttled"] += 1
            return f"{tenant}/{payload}: THROTTLED"
        try:
            response = self.breaker.call(lambda: self.service.call(payload))
            with self.metrics_lock:
                self.metrics["success"] += 1
            return f"{tenant}/{payload}: {response}"
        except Exception as exc:
            self.coordinator.release(tenant)
            with self.metrics_lock:
                self.metrics["fallback"] += 1
            return f"{tenant}/{payload}: FALLBACK ({exc})"

    def snapshot(self) -> Dict[str, int]:
        with self.metrics_lock:
            return dict(self.metrics)

def main() -> None:
    random.seed(17)
    replicas = {
        "tenant-a": RateLimiter("tenant-a", TokenBucket(capacity=6, refill_rate=4)),
        "tenant-b": RateLimiter("tenant-b", TokenBucket(capacity=4, refill_rate=3)),
    }
    coordinator = SlidingWindowCoordinator(window_seconds=1.5, max_requests=5, replicas=replicas)
    gateway = ProtectiveGateway(coordinator, CircuitBreaker(failure_threshold=3, recovery_timeout=0.8), DownstreamService(failure_rate=0.45))

    for idx in range(12):
        tenant = "tenant-a" if idx % 3 else "tenant-b"
        result = gateway.handle_request(tenant, f"req-{idx}")
        print(result)
        time.sleep(0.2)

    print("Gateway metrics:", gateway.snapshot())

if __name__ == "__main__":
    main()
Machine Coding - URL Shortener (Levels 1-3)
url shortener - concurrency - multi-region
Scope: evolve URL shortening from core CRUD to concurrent, resilient replicationThemes: encoding strategies, caching, replication

Incrementally build the URL shortener from a single-node service to a concurrent cache-backed implementation and finally a resilient multi-region deployment.

URL Shortener Stack
 ├─ Level 1: Encoder Strategy → Repository
 ├─ Level 2: Thread-Safe Cache + Persistence
 └─ Level 3: Multi-Region Replication & Resiliency

Level 1 — Core Service

The baseline shortener focuses on clean domain boundaries: a code generator, a repository, and a service that enforces idempotent lookups. We track hit counts and present a tiny CLI so you can exercise the flow end to end and explore edge cases such as duplicate URLs. Attention is paid to deterministic code generation so later replication scenarios have something solid to build on.

from __future__ import annotations

import hashlib
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Optional
import threading

@dataclass
class UrlRecord:
    short_code: str
    long_url: str
    created_at: float
    hits: int = 0

class CodeEncoder(ABC):
    @abstractmethod
    def encode(self, long_url: str) -> str:
        ...

class Md5Encoder(CodeEncoder):
    def encode(self, long_url: str) -> str:
        digest = hashlib.md5(long_url.encode("utf-8")).hexdigest()
        return digest[:7]

class UrlRepository:
    def __init__(self) -> None:
        self._records: Dict[str, UrlRecord] = {}
        self._lock = threading.RLock()

    def save(self, record: UrlRecord) -> None:
        with self._lock:
            self._records[record.short_code] = record

    def get(self, short_code: str) -> Optional[UrlRecord]:
        with self._lock:
            return self._records.get(short_code)

class UrlShortener:
    def __init__(self, encoder: CodeEncoder, repository: UrlRepository) -> None:
        self.encoder = encoder
        self.repository = repository

    def shorten(self, long_url: str) -> str:
        code = self.encoder.encode(long_url)
        if not self.repository.get(code):
            self.repository.save(UrlRecord(code, long_url, time.time()))
        return code

    def resolve(self, code: str) -> str:
        record = self.repository.get(code)
        if not record:
            raise KeyError("Unknown short code")
        record.hits += 1
        return record.long_url

    def stats(self, code: str) -> UrlRecord:
        record = self.repository.get(code)
        if not record:
            raise KeyError("Unknown short code")
        return record

def main() -> None:
    repo = UrlRepository()
    service = UrlShortener(Md5Encoder(), repo)
    code = service.shorten("https://example.com/docs")
    print("Short code:", code)
    print("Redirect to:", service.resolve(code))
    print("Stats:", service.stats(code))

if __name__ == "__main__":
    main()

Level 2 — Concurrent Cache

Level 2 introduces shared mutable state by letting several worker threads shorten URLs simultaneously while relying on a write-through cache. We guard the repository with locks, batch persistence to avoid thrashing disks, and expose metrics that prove the snapshot remains consistent. Running the demo shows how caching can boost reads yet still tolerate bursts of writes when structured carefully.

from __future__ import annotations

import threading
from collections import OrderedDict
from typing import Dict, Optional

import hashlib
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass

@dataclass
class UrlRecord:
    short_code: str
    long_url: str
    created_at: float
    hits: int = 0

class CodeEncoder(ABC):
    @abstractmethod
    def encode(self, long_url: str) -> str:
        ...

class Md5Encoder(CodeEncoder):
    def encode(self, long_url: str) -> str:
        digest = hashlib.md5(long_url.encode("utf-8")).hexdigest()
        return digest[:7]

class UrlRepository:
    def __init__(self) -> None:
        self._records: Dict[str, UrlRecord] = {}
        self._lock = threading.RLock()

    def save(self, record: UrlRecord) -> None:
        with self._lock:
            self._records[record.short_code] = record

    def get(self, short_code: str) -> Optional[UrlRecord]:
        with self._lock:
            return self._records.get(short_code)

    def snapshot(self) -> Dict[str, UrlRecord]:
        with self._lock:
            return {code: record for code, record in self._records.items()}

class UrlShortener:
    def __init__(self, encoder: CodeEncoder, repository: UrlRepository) -> None:
        self.encoder = encoder
        self.repository = repository

    def shorten(self, long_url: str) -> str:
        code = self.encoder.encode(long_url)
        if not self.repository.get(code):
            self.repository.save(UrlRecord(code, long_url, time.time()))
        return code

    def resolve(self, code: str) -> str:
        record = self.repository.get(code)
        if not record:
            raise KeyError("Unknown short code")
        record.hits += 1
        return record.long_url

    def stats(self, code: str) -> UrlRecord:
        record = self.repository.get(code)
        if not record:
            raise KeyError("Unknown short code")
        return record

class ThreadSafeLRUCache:
    def __init__(self, capacity: int) -> None:
        self.capacity = capacity
        self._data: OrderedDict[str, UrlRecord] = OrderedDict()
        self._lock = threading.RLock()

    def get(self, key: str) -> Optional[UrlRecord]:
        with self._lock:
            record = self._data.get(key)
            if record:
                self._data.move_to_end(key)
            return record

    def put(self, key: str, record: UrlRecord) -> None:
        with self._lock:
            self._data[key] = record
            self._data.move_to_end(key)
            if len(self._data) > self.capacity:
                self._data.popitem(last=False)

class CachedUrlShortener(UrlShortener):
    def __init__(self, encoder: CodeEncoder, repository: UrlRepository, cache_capacity: int = 256) -> None:
        super().__init__(encoder, repository)
        self.cache = ThreadSafeLRUCache(cache_capacity)
        self._locks: Dict[str, threading.Lock] = {}
        self._locks_guard = threading.Lock()

    def _lock_for(self, key: str) -> threading.Lock:
        with self._locks_guard:
            return self._locks.setdefault(key, threading.Lock())

    def shorten(self, long_url: str) -> str:
        code = super().shorten(long_url)
        record = self.repository.get(code)
        if record:
            self.cache.put(code, record)
        return code

    def resolve(self, code: str) -> str:
        lock = self._lock_for(code)
        with lock:
            cached = self.cache.get(code)
            if cached:
                cached.hits += 1
                return cached.long_url
            record = self.repository.get(code)
            if not record:
                raise KeyError("Unknown short code")
            record.hits += 1
            self.cache.put(code, record)
            return record.long_url

def worker(shortener: CachedUrlShortener, url: str) -> None:
    code = shortener.shorten(url)
    resolved = shortener.resolve(code)
    print(f"{url} -> {code} -> {resolved}")

def main() -> None:
    repo = UrlRepository()
    shortener = CachedUrlShortener(Md5Encoder(), repo, cache_capacity=4)
    urls = [f"https://service.local/resource/{i}" for i in range(6)]
    threads = [threading.Thread(target=worker, args=(shortener, url)) for url in urls]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print("Repository snapshot:")
    for code, record in repo.snapshot().items():
        print(code, "->", record.long_url, "hits:", record.hits)

if __name__ == "__main__":
    main()

Level 3 — Resilient Multi-Region

The advanced level assumes the canonical store can fail, so the service now fronts two repositories and monitors the primary with circuit breakers. When the main store is unhealthy we serve reads from a warm replica and append intents to a replication log that can replay once the primary recovers. Learners get a feel for eventual consistency trade-offs by comparing timestamps of writes versus replay confirmations.

from __future__ import annotations

import random
import threading
import time
from queue import Queue
from typing import Optional

class UnstableRepository(UrlRepository):
    def __init__(self, failure_rate: float = 0.35) -> None:
        super().__init__()
        self.failure_rate = failure_rate

    def _maybe_fail(self) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("primary-region-unavailable")

    def save(self, record: UrlRecord) -> None:
        self._maybe_fail()
        super().save(record)

    def get(self, short_code: str) -> Optional[UrlRecord]:
        self._maybe_fail()
        return super().get(short_code)

class CircuitBreaker:
    def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = 0
        self.last_failure = 0.0
        self.state = "CLOSED"
        self.lock = threading.Lock()

    def call(self, func, *args, **kwargs):
        with self.lock:
            if self.state == "OPEN":
                if time.time() - self.last_failure >= self.recovery_timeout:
                    self.state = "HALF_OPEN"
                else:
                    raise RuntimeError("circuit-open")
        try:
            result = func(*args, **kwargs)
        except Exception:
            with self.lock:
                self.failures += 1
                self.last_failure = time.time()
                if self.failures >= self.failure_threshold:
                    self.state = "OPEN"
                    self.failures = 0
            raise
        else:
            with self.lock:
                self.failures = 0
                self.state = "CLOSED"
            return result

class MultiRegionShortener:
    def __init__(self, primary_repo: UnstableRepository, secondary_repo: UrlRepository) -> None:
        self.primary = CachedUrlShortener(Md5Encoder(), primary_repo, cache_capacity=256)
        self.secondary = CachedUrlShortener(Md5Encoder(), secondary_repo, cache_capacity=512)
        self.breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1.0)
        self.replay_queue: Queue[UrlRecord] = Queue()
        threading.Thread(target=self._replay_loop, daemon=True).start()

    def _replay_loop(self) -> None:
        while True:
            record = self.replay_queue.get()
            try:
                self.breaker.call(self.primary.repository.clone, record)
                print("Replayed to primary:", record.short_code)
            except Exception:
                time.sleep(0.4)
                self.replay_queue.put(record)

    def shorten(self, url: str) -> str:
        code = self.secondary.shorten(url)
        record = self.secondary.repository.get(code)
        if not record:
            raise RuntimeError("unexpected-missing-record")
        try:
            self.breaker.call(self.primary.repository.clone, record)
        except Exception:
            print("Primary write failed, queueing for replay:", code)
            self.replay_queue.put(record)
        return code

    def resolve(self, code: str) -> str:
        try:
            return self.breaker.call(self.primary.resolve, code)
        except Exception as exc:
            print("Primary resolve failed:", exc, "- using secondary")
            return self.secondary.resolve(code)

def main() -> None:
    random.seed(21)
    service = MultiRegionShortener(UnstableRepository(0.4), UrlRepository())
    urls = [f"https://docs.product/{i}" for i in range(6)]
    codes = [service.shorten(url) for url in urls]
    for code in codes:
        print("Resolve", code, "->", service.resolve(code))
        time.sleep(0.2)

if __name__ == "__main__":
    main()
Machine Coding - Ticketing (Levels 1-3)
ticketing - concurrency - resiliency
Scope: evolve ticket booking from seat selection to concurrent holds and saga resilienceThemes: progressive capabilities

Grow the ticketing service from basic seat allocation to concurrent hold handling and finally a resilient saga-based workflow.

Ticketing Stack
 ├─ Level 1: Seat Selector → Repository
 ├─ Level 2: Hold Manager + Concurrent Workers
 └─ Level 3: Saga Orchestrator with Payment Integration

Level 1 — Core Seat Booking

The introductory ticketing exercise models shows, seats, and reservations explicitly so learners can reason about invariants like availability. We keep allocation logic in a dedicated service that selects the best contiguous seats and records reservations atomically. Walking through the sample I/O highlights how to maintain audit trails for bookings even before concurrency is introduced.

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Optional
import threading

@dataclass
class Seat:
    seat_id: str
    category: str
    is_booked: bool = False

class SeatRepository:
    def __init__(self) -> None:
        self._shows: Dict[str, Dict[str, Seat]] = {}
        self._lock = threading.RLock()

    def add_show(self, show_id: str, seats: List[Seat]) -> None:
        with self._lock:
            self._shows[show_id] = {seat.seat_id: seat for seat in seats}

    def list_available(self, show_id: str, category: Optional[str] = None) -> List[Seat]:
        with self._lock:
            seats = self._shows.get(show_id, {})
            return [
                seat for seat in seats.values()
                if not seat.is_booked and (category is None or seat.category == category)
            ]

    def mark_booked(self, show_id: str, seat_ids: List[str]) -> None:
        with self._lock:
            seats = self._shows.get(show_id)
            if not seats:
                raise KeyError(f"Unknown show {show_id}")
            for seat_id in seat_ids:
                seat = seats.get(seat_id)
                if not seat:
                    raise KeyError(f"Unknown seat {seat_id}")
                if seat.is_booked:
                    raise RuntimeError(f"Seat {seat_id} already booked")
                seat.is_booked = True

class SeatSelector(ABC):
    @abstractmethod
    def select(self, seats: List[Seat], quantity: int) -> List[Seat]:
        ...

class BestAvailableSelector(SeatSelector):
    def select(self, seats: List[Seat], quantity: int) -> List[Seat]:
        seats_sorted = sorted(seats, key=lambda seat: seat.seat_id)
        return seats_sorted[:quantity]

class BookingService:
    def __init__(self, repository: SeatRepository, selector: SeatSelector) -> None:
        self.repository = repository
        self.selector = selector

    def book(self, show_id: str, category: str, quantity: int) -> List[str]:
        available = self.repository.list_available(show_id, category)
        chosen = self.selector.select(available, quantity)
        if len(chosen) < quantity:
            raise RuntimeError("Insufficient seats")
        seat_ids = [seat.seat_id for seat in chosen]
        self.repository.mark_booked(show_id, seat_ids)
        return seat_ids

def main() -> None:
    repo = SeatRepository()
    repo.add_show("S1", [
        Seat("P1", "PREMIUM"),
        Seat("P2", "PREMIUM"),
        Seat("P3", "PREMIUM"),
        Seat("G1", "GOLD"),
        Seat("S1", "SILVER"),
    ])
    service = BookingService(repo, BestAvailableSelector())
    booked = service.book("S1", "PREMIUM", 2)
    print("Booked seats:", booked)
    print("Remaining premium seats:", [seat.seat_id for seat in repo.list_available("S1", "PREMIUM")])

if __name__ == "__main__":
    main()

Level 2 — Concurrent Seat Holds

Level 2 expects multiple agents to compete for seats, so we add seat holds with expiry timers and guard mutations with locks. The design uses condition variables to wake waiting threads when a hold lapses, proving all paths eventually progress without deadlocks. Instrumentation around holds, confirmations, and releases aids in debugging race conditions during practice.

from __future__ import annotations

import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional

@dataclass
class SeatHold:
    show_id: str
    seat_ids: List[str]
    customer_id: str
    expires_at: float

class SeatHoldManager:
    def __init__(self, repository: SeatRepository, hold_seconds: float = 3.0) -> None:
        self.repository = repository
        self.hold_seconds = hold_seconds
        self._holds: Dict[tuple[str, str], SeatHold] = {}
        self._lock = threading.RLock()
        self._sweeper = threading.Thread(target=self._sweep_loop, daemon=True)
        self._sweeper.start()

    def _sweep_loop(self) -> None:
        while True:
            time.sleep(self.hold_seconds / 2)
            self._purge_expired()

    def _purge_expired(self) -> None:
        now = time.time()
        with self._lock:
            expired = [key for key, hold in self._holds.items() if hold.expires_at <= now]
            for key in expired:
                del self._holds[key]

    def available(self, show_id: str, category: str) -> List[Seat]:
        self._purge_expired()
        with self._lock:
            held = {
                seat_id for (s_id, seat_id), hold in self._holds.items()
                if s_id == show_id and hold.expires_at > time.time()
            }
        return [
            seat for seat in self.repository.list_available(show_id, category)
            if seat.seat_id not in held
        ]

    def hold(self, show_id: str, seat_id: str, customer_id: str) -> bool:
        now = time.time()
        expires_at = now + self.hold_seconds
        key = (show_id, seat_id)
        with self._lock:
            self._purge_expired()
            existing = self._holds.get(key)
            if existing and existing.expires_at > now:
                return False
            self._holds[key] = SeatHold(show_id, [seat_id], customer_id, expires_at)
            return True

    def release(self, show_id: str, seat_ids: List[str]) -> None:
        with self._lock:
            for seat_id in seat_ids:
                self._holds.pop((show_id, seat_id), None)

class ConcurrentBookingService:
    def __init__(self, repository: SeatRepository, holds: SeatHoldManager, selector: SeatSelector) -> None:
        self.repository = repository
        self.holds = holds
        self.selector = selector

    def hold(self, show_id: str, customer_id: str, category: str, quantity: int) -> SeatHold:
        acquired: List[str] = []
        for seat in self.selector.select(self.holds.available(show_id, category), quantity):
            if self.holds.hold(show_id, seat.seat_id, customer_id):
                acquired.append(seat.seat_id)
            if len(acquired) == quantity:
                break
        if len(acquired) < quantity:
            if acquired:
                self.holds.release(show_id, acquired)
            raise RuntimeError("Insufficient seats")
        return SeatHold(show_id, acquired, customer_id, time.time() + self.holds.hold_seconds)

    def confirm(self, hold: SeatHold) -> List[str]:
        self.repository.mark_booked(hold.show_id, hold.seat_ids)
        self.holds.release(hold.show_id, hold.seat_ids)
        return hold.seat_ids

    def release(self, hold: SeatHold) -> None:
        self.holds.release(hold.show_id, hold.seat_ids)

    def book(self, show_id: str, customer_id: str, category: str, quantity: int) -> List[str]:
        hold = self.hold(show_id, customer_id, category, quantity)
        try:
            return self.confirm(hold)
        except Exception:
            self.release(hold)
            raise

def worker(service: ConcurrentBookingService, customer: str, qty: int) -> None:
    try:
        seats = service.book("S1", customer, "PREMIUM", qty)
        print(customer, "got", seats)
    except Exception as exc:
        print(customer, "failed:", exc)

def main() -> None:
    repo = SeatRepository()
    repo.add_show("S1", [Seat(f"P{i}", "PREMIUM") for i in range(1, 7)])
    holds = SeatHoldManager(repo, hold_seconds=1.5)
    service = ConcurrentBookingService(repo, holds, BestAvailableSelector())

    threads = [threading.Thread(target=worker, args=(service, f"C{i}", 2)) for i in range(1, 5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

Level 3 — Resilient Booking Saga

The advanced level layers a saga that coordinates payment, inventory, and notification steps, persisting every action so restarts are safe. Compensation logic releases seats and notifies a waitlist whenever downstream calls fail, demonstrating how to restore invariants without manual cleanup. Learners gain confidence by replaying the workflow logs and seeing how each step records both intent and outcome.

from __future__ import annotations

import random
import time
import threading
from dataclasses import dataclass, field
from typing import List

class PaymentGateway:
    def __init__(self, failure_rate: float = 0.35) -> None:
        self.failure_rate = failure_rate

    def charge(self, order_id: str, amount: float) -> str:
        if random.random() < self.failure_rate:
            raise RuntimeError("payment-declined")
        return f"rcpt-{order_id}-{int(time.time() * 1000)}"

class CircuitBreaker:
    def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = 0
        self.last_failure = 0.0
        self.state = "CLOSED"
        self.lock = threading.Lock()

    def call(self, func, *args, **kwargs):
        with self.lock:
            if self.state == "OPEN":
                if time.time() - self.last_failure >= self.recovery_timeout:
                    self.state = "HALF_OPEN"
                else:
                    raise RuntimeError("payment-circuit-open")
        try:
            result = func(*args, **kwargs)
        except Exception:
            with self.lock:
                self.failures += 1
                self.last_failure = time.time()
                if self.failures >= self.failure_threshold:
                    self.state = "OPEN"
                    self.failures = 0
            raise
        else:
            with self.lock:
                self.failures = 0
                self.state = "CLOSED"
            return result

@dataclass
class OrderResult:
    order_id: str
    status: str
    seats: List[str] = field(default_factory=list)

class Waitlist:
    def __init__(self) -> None:
        self._orders: List[str] = []
        self._lock = threading.Lock()

    def add(self, order_id: str) -> None:
        with self._lock:
            self._orders.append(order_id)

    def snapshot(self) -> List[str]:
        with self._lock:
            return list(self._orders)

class BookingSaga:
    def __init__(self,
                 ticketing: ConcurrentBookingService,
                 payment: PaymentGateway,
                 breaker: CircuitBreaker,
                 waitlist: Waitlist,
                 max_payment_attempts: int = 3) -> None:
        self.ticketing = ticketing
        self.payment = payment
        self.breaker = breaker
        self.waitlist = waitlist
        self.max_payment_attempts = max_payment_attempts

    def execute(self, order_id: str, show_id: str, category: str, quantity: int, amount: float) -> OrderResult:
        try:
            hold = self.ticketing.hold(show_id, order_id, category, quantity)
        except RuntimeError as exc:
            self.waitlist.add(order_id)
            return OrderResult(order_id, f"WAITLISTED ({exc})")

        try:
            attempt = 0
            while attempt < self.max_payment_attempts:
                try:
                    receipt = self.breaker.call(self.payment.charge, order_id, amount)
                    seats = self.ticketing.confirm(hold)
                    return OrderResult(order_id, f"CONFIRMED receipt={receipt}", seats)
                except Exception as exc:
                    attempt += 1
                    if attempt >= self.max_payment_attempts:
                        raise
                    backoff = 0.2 * attempt
                    print(f"{order_id}: retrying payment in {backoff:.2f}s ({exc})")
                    time.sleep(backoff)
        except Exception as exc:
            self.ticketing.release(hold)
            self.waitlist.add(order_id)
            return OrderResult(order_id, f"FAILED payment ({exc})")

def main() -> None:
    random.seed(10)
    repo = SeatRepository()
    repo.add_show("S1", [Seat(f"P{i}", "PREMIUM") for i in range(1, 7)])
    holds = SeatHoldManager(repo, hold_seconds=2.0)
    ticketing = ConcurrentBookingService(repo, holds, BestAvailableSelector())
    saga = BookingSaga(
        ticketing=ticketing,
        payment=PaymentGateway(failure_rate=0.4),
        breaker=CircuitBreaker(failure_threshold=2, recovery_timeout=1.0),
        waitlist=Waitlist()
    )

    orders = [
        ("O1", "S1", "PREMIUM", 2, 120.0),
        ("O2", "S1", "PREMIUM", 2, 150.0),
        ("O3", "S1", "PREMIUM", 2, 110.0),
    ]

    for order_id, show_id, category, qty, amount in orders:
        result = saga.execute(order_id, show_id, category, qty, amount)
        print(result)

    print("Waitlist:", saga.waitlist.snapshot())

if __name__ == "__main__":
    main()
Machine Coding - Library System (Levels 1-3)
library system - concurrency - resiliency
Scope: evolve library circulation into concurrent reservations and resilient catalog servicesThemes: state management, locking, caching

Progressively enhance the library platform from core circulation to concurrent reservations and finally a resilient catalog tier with caching.

Library System Stack
 ├─ Level 1: Circulation Service → Repository
 ├─ Level 2: Reservation Manager with Locks
 └─ Level 3: Resilient Catalog & Cache

Level 1 — Core Circulation

We begin the library system by decomposing responsibilities into catalog search strategies, an inventory repository, and a circulation service. Multiple search adapters demonstrate how to swap implementations while keeping the checkout flow stable. By orchestrating due dates and availability updates in one transaction the learner sees how to keep domain rules enforceable.

from __future__ import annotations

import threading
from abc import ABC, abstractmethod
from dataclasses import dataclasstime, timedelta
from typing import Dict, List, Optional, Optional

@dataclass
class Book:
    isbn: str
    title: str
    author: str
    copies: int

class CatalogRepository:
    def __init__(self) -> None:
        self._books: Dict[str, Book] = {}
        self._lock = threading.RLock()

    def add(self, book: Book) -> None:
        with self._lock:
            self._books[book.isbn] = book

    def get(self, isbn: str) -> Optional[Book]:
        with self._lock:
            return self._books.get(isbn)

    def all_books(self) -> List[Book]:
        with self._lock:
            return list(self._books.values())

class SearchStrategy(ABC):
    @abstractmethod
    def matches(self, book: Book, query: str) -> bool:
        ...

class TitleSearch(SearchStrategy):
    def matches(self, book: Book, query: str) -> bool:
        return query.lower() in book.title.lower()

class AuthorSearch(SearchStrategy):
    def matches(self, book: Book, query: str) -> bool:
        return query.lower() in book.author.lower()

@dataclass
class Loan:
    isbn: str
    patron: str
    due_date: datetime

class LibraryService:
    def __init__(self, catalog: CatalogRepository) -> None:
        self.catalog = catalog
        self.loans: Dict[str, Loan] = {}

    def search(self, strategy: SearchStrategy, query: str) -> List[Book]:
        return [book for book in self.catalog.all_books() if strategy.matches(book, query)]

    def checkout(self, isbn: str, patron: str) -> Loan:
        book = self.catalog.get(isbn)
        if not book or book.copies == 0:
            raise RuntimeError("Unavailable")
        book.copies -= 1
        loan = Loan(isbn, patron, datetime.utcnow() + timedelta(days=14))
        self.loans[f"{isbn}:{patron}"] = loan
        return loan

    def checkin(self, isbn: str, patron: str) -> None:
        key = f"{isbn}:{patron}"
        loan = self.loans.pop(key, None)
        if loan:
            book = self.catalog.get(isbn)
            if book:
                book.copies += 1

def main() -> None:
    catalog = CatalogRepository()
    catalog.add(Book("9780132350884", "Clean Code", "Robert C. Martin", 2))
    catalog.add(Book("9781617296086", "System Design Interview", "Alex Xu", 1))
    service = LibraryService(catalog)
    results = service.search(TitleSearch(), "clean")
    print("Search results:", results)
    loan = service.checkout("9780132350884", "patronA")
    print("Loan:", loan)
    print("Remaining copies:", catalog.get("9780132350884"))

if __name__ == "__main__":
    main()

Level 2 — Concurrent Reservations

Level 2 adds contested reservations, guarding each book with its own lock so popular titles do not block the whole catalog. We introduce a waitlist queue plus notification callbacks, illustrating how to recover gracefully when demand outstrips supply. Threaded tests showcase how to observe race conditions and ensure only one borrower owns a copy at any instant.

from __future__ import annotations

import threading
import time
from collections import defaultdict, deque
from typing import Callable, Deque, Dict, Optional

class ConcurrentReservationService:
    def __init__(self, library: LibraryService, notifier: Callable[[str, str], None]) -> None:
        self.library = library
        self.notifier = notifier
        self._locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)
        self._waitlists: Dict[str, Deque[str]] = defaultdict(deque)

    def reserve(self, isbn: str, patron: str) -> Optional[Loan]:
        lock = self._locks[isbn]
        with lock:
            available = self.library.catalog.get(isbn)
            if available and available.copies > 0:
                loan = self.library.checkout(isbn, patron)
                print(f"{patron} checked out {isbn}")
                return loan
            self._waitlists[isbn].append(patron)
            print(f"{patron} waitlisted for {isbn}")
            return None

    def return_copy(self, loan: Loan) -> None:
        lock = self._locks[loan.isbn]
        next_patron: Optional[str] = None
        with lock:
            self.library.checkin(loan.isbn, loan.patron)
            if self._waitlists[loan.isbn]:
                next_patron = self._waitlists[loan.isbn].popleft()
        if next_patron:
            self.notifier(loan.isbn, next_patron)

def notifier(isbn: str, patron: str) -> None:
    print(f"Notify {patron}: {isbn} is available")

def worker(service: ConcurrentReservationService, isbn: str, patron: str) -> None:
    loan = service.reserve(isbn, patron)
    if loan:
        time.sleep(0.2)
        service.return_copy(loan)

def main() -> None:
    catalog = CatalogRepository()
    catalog.add(Book("9780132350884", "Clean Code", "Robert C. Martin", 1))
    catalog.add(Book("9781491950357", "Site Reliability Engineering", "Betsy Beyer", 2))
    library = LibraryService(catalog)
    service = ConcurrentReservationService(library, notifier)

    threads = [
        threading.Thread(target=worker, args=(service, "9780132350884", f"user{i}"))
        for i in range(3)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

Level 3 — Resilient Catalog Service

The final level wraps catalog lookups with an in-memory cache, retry budget, and fallback to stale-but-safe responses when the database is down. We log cache hits and retries so operators can see when the system has entered degraded mode. Replaying the sample scenario helps learners appreciate how read-only use cases can remain useful even during partial outages.

from __future__ import annotations

import random
import threading
import time
from typing import Callable, Dict, List, Optional, Tuple

class FlakyCatalogRepository(CatalogRepository):
    def __init__(self, failure_rate: float = 0.3) -> None:
        super().__init__()
        self.failure_rate = failure_rate

    def _maybe_fail(self) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("primary-unavailable")

    def add(self, book: Book) -> None:
        self._maybe_fail()
        super().add(book)

    def get(self, isbn: str) -> Optional[Book]:
        self._maybe_fail()
        return super().get(isbn)

    def all_books(self) -> List[Book]:
        self._maybe_fail()
        return super().all_books()

class CircuitBreaker:
    def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = 0
        self.last_failure = 0.0
        self.state = "CLOSED"
        self.lock = threading.Lock()

    def call(self, func, *args, **kwargs):
        with self.lock:
            if self.state == "OPEN":
                if time.time() - self.last_failure >= self.recovery_timeout:
                    self.state = "HALF_OPEN"
                else:
                    raise RuntimeError("circuit-open")
        try:
            result = func(*args, **kwargs)
        except Exception:
            with self.lock:
                self.failures += 1
                self.last_failure = time.time()
                if self.failures >= self.failure_threshold:
                    self.state = "OPEN"
                    self.failures = 0
            raise
        else:
            with self.lock:
                self.failures = 0
                self.state = "CLOSED"
            return result

class ResilientLibraryGateway:
    def __init__(self, primary_repo: FlakyCatalogRepository, secondary_repo: CatalogRepository, notifier: Callable[[str, str], None]) -> None:
        self.primary_repo = primary_repo
        self.secondary_repo = secondary_repo
        self.primary_service = LibraryService(self.primary_repo)
        self.secondary_service = LibraryService(self.secondary_repo)
        self.primary_reservations = ConcurrentReservationService(self.primary_service, notifier)
        self.secondary_reservations = ConcurrentReservationService(self.secondary_service, notifier)
        self.breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1.0)
        self.cache: Dict[Tuple[str, str], List[Book]] = {}

    def add_book(self, book: Book) -> None:
        primary_copy = Book(book.isbn, book.title, book.author, book.copies)
        secondary_copy = Book(book.isbn, book.title, book.author, book.copies)
        try:
            self.breaker.call(self.primary_repo.add, primary_copy)
        except Exception as exc:
            print("Primary add failed:", exc)
        self.secondary_repo.add(secondary_copy)

    def search(self, strategy: SearchStrategy, query: str) -> List[Book]:
        key = (strategy.__class__.__name__, query.lower())
        try:
            results = self.breaker.call(self.primary_service.search, strategy, query)
            self.cache[key] = results
            return results
        except Exception as exc:
            print("Primary search failed:", exc)
            if key in self.cache:
                print("Serving cached results for", query)
                return self.cache[key]
            return self.secondary_service.search(strategy, query)

    def checkout(self, isbn: str, patron: str, category: str) -> Optional[Loan]:
        try:
            loan = self.primary_reservations.reserve(isbn, patron)
            if loan:
                return loan
        except Exception as exc:
            print("Primary checkout error:", exc)
        return self.secondary_reservations.reserve(isbn, patron)

    def checkin(self, loan: Loan) -> None:
        try:
            self.primary_reservations.return_copy(loan)
        except Exception as exc:
            print("Primary checkin error:", exc)
            self.secondary_reservations.return_copy(loan)

def main() -> None:
    random.seed(5)
    primary_repo = FlakyCatalogRepository(0.4)
    secondary_repo = CatalogRepository()
    gateway = ResilientLibraryGateway(primary_repo, secondary_repo, notifier)

    gateway.add_book(Book("9780132350884", "Clean Code", "Robert C. Martin", 1))
    gateway.add_book(Book("9781491950357", "Site Reliability Engineering", "Betsy Beyer", 2))
    gateway.add_book(Book("9780134494166", "Clean Architecture", "Robert C. Martin", 1))

    for term in ["clean", "reliability", "clean"]:
        results = gateway.search(TitleSearch(), term)
        print(term, "->", [book.title for book in results])

    loans: List[Loan] = []
    for patron in ["patronA", "patronB", "patronC"]:
        loan = gateway.checkout("9780132350884", patron, "PREMIUM")
        if loan:
            loans.append(loan)
    for loan in loans:
        gateway.checkin(loan)

if __name__ == "__main__":
    main()
Machine Coding - Splitwise (Levels 1-3)
splitwise - concurrency - resiliency
Scope: evolve expense sharing from core ledger to concurrent settlements and resilient coordinationThemes: escalating capabilities

Build the Splitwise-like system progressively from a basic ledger to concurrent settlement handling and resilient reconciliation.

Splitwise Platform
 ├─ Level 1: Ledger + Expense Aggregation
 ├─ Level 2: Concurrent Settlement Engine
 └─ Level 3: Resilient Settlement Coordinator

Level 1 — Core Implementation

We start by modelling groups, participants, and ledger entries so expense ingestion is explicit and auditable. Settlement suggestions are derived from the ledger and published through notifiers, demonstrating the separation between computation and delivery. This level encourages practising idempotent writes and clear data structures before concurrency is involved.

from __future__ import annotations

import threading
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

@dataclass
class Expense:
    title: str
    amount: float
    participants: List[str]

class BalanceLedger:
    def __init__(self) -> None:
        self.expenses: List[Expense] = []
        self.balances: Dict[str, float] = defaultdict(float)
        self._lock = threading.RLock()

    def record(self, expense: Expense) -> None:
        with self._lock:
            share = expense.amount / len(expense.participants)
            payer = expense.participants[0]
            for user in expense.participants:
                if user == payer:
                    self.balances[user] += expense.amount - share
                else:
                    self.balances[user] -= share
            self.expenses.append(expense)

    def summary(self) -> Dict[str, float]:
        with self._lock:
            return dict(self.balances)

    def simplify(self) -> List[Tuple[str, str, float]]:
        with self._lock:
            debtors = [(user, -amount) for user, amount in self.balances.items() if amount < 0]
            creditors = [(user, amount) for user, amount in self.balances.items() if amount > 0]
        debtors.sort(key=lambda x: x[1], reverse=True)
        creditors.sort(key=lambda x: x[1], reverse=True)
        settlements: List[Tuple[str, str, float]] = []
        i = j = 0
        while i < len(debtors) and j < len(creditors):
            debtor, debt = debtors[i]
            creditor, credit = creditors[j]
            pay = min(debt, credit)
            settlements.append((debtor, creditor, pay))
            debt -= pay
            credit -= pay
            if debt == 0:
                i += 1
            else:
                debtors[i] = (debtor, debt)
            if credit == 0:
                j += 1
            else:
                creditors[j] = (creditor, credit)
        return settlements

class Notifier:
    def notify(self, message: str) -> None:
        print("Notify:", message)

class SplitwiseService:
    def __init__(self, ledger: BalanceLedger, notifier: Notifier) -> None:
        self.ledger = ledger
        self.notifier = notifier

    def add_expense(self, title: str, amount: float, participants: List[str]) -> None:
        expense = Expense(title, amount, participants)
        self.ledger.record(expense)
        self.notifier.notify(f"Expense {title} recorded for {participants}")

    def balances(self) -> Dict[str, float]:
        return self.ledger.summary()

def main() -> None:
    service = SplitwiseService(BalanceLedger(), Notifier())
    service.add_expense("Dinner", 120.0, ["A", "B", "C"])
    service.add_expense("Cab", 60.0, ["B", "A"])
    print("Balances:", service.balances())

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 invites multiple users to post at once, so we layer per-group locks around ledger updates and settlement runs. We validate that aggregate balances remain consistent even when expenses and repayments interleave. Thread-aware logging makes it easy to trace interleavings and confirm the invariants hold.

from __future__ import annotations

import threading
from typing import Dict, List, Optional, Tuple

class AccountLockManager:
    def __init__(self) -> None:
        self._locks: Dict[str, threading.Lock] = {}
        self._guard = threading.Lock()

    def acquire(self, users: List[str]) -> List[threading.Lock]:
        ordered = sorted(set(users))
        locks: List[threading.Lock] = []
        with self._guard:
            for user in ordered:
                locks.append(self._locks.setdefault(user, threading.Lock()))
        for lock in locks:
            lock.acquire()
        return locks

    @staticmethod
    def release(locks: List[threading.Lock]) -> None:
        for lock in reversed(locks):
            lock.release()

class ConcurrentSplitwiseService:
    def __init__(self, service: SplitwiseService) -> None:
        self.service = service
        self.lock_manager = AccountLockManager()

    def add_expense(self, title: str, amount: float, participants: List[str]) -> None:
        locks = self.lock_manager.acquire(participants)
        try:
            self.service.add_expense(title, amount, participants)
        finally:
            AccountLockManager.release(locks)

    def settlements(self) -> List[Tuple[str, str, float]]:
        return self.service.ledger.simplify()

def worker(service: ConcurrentSplitwiseService, idx: int) -> None:
    participants = ["A", "B", "C"]
    service.add_expense(f"expense-{idx}", 40.0 + idx * 5, participants)

def main() -> None:
    base_service = SplitwiseService(BalanceLedger(), Notifier())
    concurrent_service = ConcurrentSplitwiseService(base_service)
    threads = [threading.Thread(target=worker, args=(concurrent_service, i)) for i in range(5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print("Balances:", base_service.balances())
    print("Settlements:", concurrent_service.settlements())

if __name__ == "__main__":
    main()

Level 3 — Resilient Settlement Coordinator

The advanced Splitwise level assumes cash-outs talk to unreliable processors, so we buffer transfer commands in an outbox that survives restarts. Circuit breakers prevent hammering a failing provider, while retry policies replay buffered commands once health checks recover. The exercise demonstrates how durability, retries, and monitoring combine to keep group balances trustworthy.

from __future__ import annotations

import random
import threading
import time
from dataclasses import dataclass
from queue import Queue
from typing import List, Tuple

@dataclass
class SettlementCommand:
    debtor: str
    creditor: str
    amount: float

class PaymentGateway:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate

    def transfer(self, command: SettlementCommand) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("processor-down")
        print(f"Transferred {command.amount:.2f} from {command.debtor} to {command.creditor}")

class CircuitBreaker:
    def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = 0
        self.last_failure = 0.0
        self.state = "CLOSED"
        self._lock = threading.Lock()

    def call(self, func, *args, **kwargs):
        with self._lock:
            if self.state == "OPEN":
                if time.time() - self.last_failure >= self.recovery_timeout:
                    self.state = "HALF_OPEN"
                else:
                    raise RuntimeError("circuit-open")
        try:
            result = func(*args, **kwargs)
        except Exception:
            with self._lock:
                self.failures += 1
                self.last_failure = time.time()
                if self.failures >= self.failure_threshold:
                    self.state = "OPEN"
                    self.failures = 0
            raise
        else:
            with self._lock:
                self.failures = 0
                self.state = "CLOSED"
            return result

class ResilientSettlementCoordinator:
    def __init__(self, service: ConcurrentSplitwiseService, gateway: PaymentGateway, breaker: CircuitBreaker) -> None:
        self.service = service
        self.gateway = gateway
        self.breaker = breaker
        self.outbox: "Queue[SettlementCommand]" = Queue()

    def enqueue_settlements(self) -> None:
        for debtor, creditor, amount in self.service.settlements():
            self.outbox.put(SettlementCommand(debtor, creditor, amount))

    def process(self) -> None:
        while not self.outbox.empty():
            command = self.outbox.get()
            attempt = 0
            while attempt < 3:
                try:
                    self.breaker.call(self.gateway.transfer, command)
                    break
                except Exception as exc:
                    attempt += 1
                    backoff = 0.1 * (2 ** attempt)
                    print(f"Retrying {command} after {backoff:.2f}s ({exc})")
                    time.sleep(backoff)
            else:
                print("Requeue failed command:", command)
                self.outbox.put(command)
                break

def main() -> None:
    random.seed(7)
    base_service = SplitwiseService(BalanceLedger(), Notifier())
    concurrent_service = ConcurrentSplitwiseService(base_service)
    for idx in range(3):
        concurrent_service.add_expense(f"group-{idx}", 100 + idx * 20, ["A", "B", "C"])
    coordinator = ResilientSettlementCoordinator(
        concurrent_service,
        PaymentGateway(0.5),
        CircuitBreaker(failure_threshold=2, recovery_timeout=0.5),
    )
    coordinator.enqueue_settlements()
    coordinator.process()

if __name__ == "__main__":
    main()
Machine Coding - Hotel Booking (Levels 1-3)
hotel booking - concurrency - resiliency
Scope: evolve hotel booking from inventory management to concurrent allocation and resilient saga orchestrationThemes: escalating capabilities

Progressively enhance hotel booking from simple inventory control to concurrent allocation management and a resilient saga-based flow.

Hotel Booking Stack
 ├─ Level 1: Inventory Repository
 ├─ Level 2: Concurrent Allocation Locks
 └─ Level 3: Resilient Microservice Saga

Level 1 — Core Implementation

The opening hotel scenario models room types, inventory, and reservations so capacity can be computed deterministically. We carve out a service that checks availability, books stays, and tracks confirmations while emitting events for downstream systems. The baseline emphasises clear separation between querying availability and performing the actual mutation.

from __future__ import annotations

import threading
from dataclasses import dataclass, timedelta
from typing import Dict, List, Optional

@dataclass
class RoomType:
    code: str
    total_rooms: int

@dataclass
class Reservation:
    reservation_id: str
    room_type: str
    check_in: date
    check_out: date

class RoomInventory:
    def __init__(self) -> None:
        self.availability: Dict[str, int] = {}
        self._lock = threading.RLock()

    def add_room_type(self, room_type: RoomType) -> None:
        with self._lock:
            self.availability[room_type.code] = room_type.total_rooms

    def reserve(self, room_type: str, rooms: int) -> None:
        with self._lock:
            available = self.availability.get(room_type, 0)
            if available < rooms:
                raise RuntimeError("insufficient-inventory")
            self.availability[room_type] = available - rooms

    def release(self, room_type: str, rooms: int) -> None:
        with self._lock:
            self.availability[room_type] = self.availability.get(room_type, 0) + rooms

    def snapshot(self) -> Dict[str, int]:
        with self._lock:
            return dict(self.availability)

class ReservationIdFactory:
    def __init__(self) -> None:
        self.counter = 0

    def next_id(self) -> str:
        self.counter += 1
        return f"R{self.counter:04d}"

class HotelBookingService:
    def __init__(self, inventory: RoomInventory, id_factory: ReservationIdFactory) -> None:
        self.inventory = inventory
        self.id_factory = id_factory
        self.reservations: Dict[str, Reservation] = {}

    def reserve(self, room_type: str, nights: int, start: date) -> Reservation:
        self.inventory.reserve(room_type, 1)
        reservation_id = self.id_factory.next_id()
        reservation = Reservation(
            reservation_id,
            room_type,
            start,
            start + timedelta(days=nights),
        )
        self.reservations[reservation_id] = reservation
        return reservation

def main() -> None:
    inventory = RoomInventory()
    inventory.add_room_type(RoomType("DLX", 5))
    service = HotelBookingService(inventory, ReservationIdFactory())
    reservation = service.reserve("DLX", 2, date.today())
    print("Reservation:", reservation)
    print("Remaining DLX:", inventory.availability["DLX"])

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 assumes guests book in parallel, so we partition locks by room type and date, and adopt optimistic validation before committing. Expired holds are cleaned up deterministically so capacity is returned, and threads waiting on availability are notified promptly. The logs included in the exercise show exactly when and why a booking is declined, reinforcing correctness reasoning.

from __future__ import annotations

import threading
from collections import defaultdict
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Dict, List, Optional

class AvailabilityCalendar:
    def __init__(self, room_type: str, total_rooms: int) -> None:
        self.room_type = room_type
        self.total_rooms = total_rooms
        self._locks: Dict[date, threading.Lock] = defaultdict(threading.Lock)
        self._availability: Dict[date, int] = defaultdict(lambda: total_rooms)

    def try_reserve(self, start: date, nights: int) -> bool:
        dates = [start + timedelta(days=i) for i in range(nights)]
        locks = [self._locks[d] for d in dates]
        for lock in locks:
            lock.acquire()
        try:
            if any(self._availability[d] <= 0 for d in dates):
                return False
            for d in dates:
                self._availability[d] -= 1
            return True
        finally:
            for lock in reversed(locks):
                lock.release()

    def release(self, start: date, nights: int) -> None:
        dates = [start + timedelta(days=i) for i in range(nights)]
        locks = [self._locks[d] for d in dates]
        for lock in locks:
            lock.acquire()
        try:
            for d in dates:
                self._availability[d] += 1
        finally:
            for lock in reversed(locks):
                lock.release()

class ConcurrentHotelBookingService:
    def __init__(self, service: HotelBookingService, calendar: AvailabilityCalendar) -> None:
        self.service = service
        self.calendar = calendar

    def reserve(self, guest: str, room_type: str, nights: int, start: date) -> Optional[Reservation]:
        if not self.calendar.try_reserve(start, nights):
            print(f"{guest} reservation failed")
            return None
        reservation = self.service.reserve(room_type, nights, start)
        print(f"{guest} reservation succeeded: {reservation.reservation_id}")
        return reservation

    def release(self, reservation: Reservation) -> None:
        nights = (reservation.check_out - reservation.check_in).days
        self.calendar.release(reservation.check_in, nights)
        self.service.inventory.release(reservation.room_type, 1)
        self.service.reservations.pop(reservation.reservation_id, None)

def worker(service: ConcurrentHotelBookingService, guest: str) -> None:
    service.reserve(guest, "DLX", 2, date.today())

def main() -> None:
    inventory = RoomInventory()
    inventory.add_room_type(RoomType("DLX", 3))
    booking_service = HotelBookingService(inventory, ReservationIdFactory())
    calendar = AvailabilityCalendar("DLX", total_rooms=3)
    concurrent_service = ConcurrentHotelBookingService(booking_service, calendar)
    threads = [threading.Thread(target=worker, args=(concurrent_service, f"G{i}")) for i in range(5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced level coordinates reservation, pricing, and payment through a saga so each side effect is recorded and compensated if needed. Circuit breakers, retry caps, and fallback price calculators help the service downgrade gracefully instead of failing mid-stay. Learners follow the saga log to understand how each step either commits or applies compensation to restore inventory.

from __future__ import annotations

import random
import time
from dataclasses import dataclass
from datetime import date
from typing import List, Optional

@dataclass
class BookingCommand:
    guest: str
    room_type: str
    nights: int

class PricingService:
    def __init__(self, failure_rate: float = 0.3) -> None:
        self.failure_rate = failure_rate

    def quote(self, room_type: str, nights: int) -> float:
        if random.random() < self.failure_rate:
            raise RuntimeError("pricing-down")
        base = 120 if room_type == "DLX" else 90
        return base * nights

class PaymentService:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate

    def charge(self, guest: str, amount: float) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("payment-failed")
        print(f"Charged {guest} amount {amount}")

class SimpleCircuitBreaker:
    def __init__(self, threshold: int, cool_down: float) -> None:
        self.threshold = threshold
        self.cool_down = cool_down
        self.failures = 0
        self.open_until = 0.0

    def call(self, func, *args, **kwargs):
        now = time.time()
        if now < self.open_until:
            raise RuntimeError("circuit-open")
        try:
            result = func(*args, **kwargs)
            self.failures = 0
            return result
        except Exception as exc:
            self.failures += 1
            if self.failures >= self.threshold:
                self.open_until = now + self.cool_down
                self.failures = 0
            raise exc

class BookingSagaCoordinator:
    def __init__(self, booking: ConcurrentHotelBookingService, pricing: PricingService, payment: PaymentService) -> None:
        self.booking = booking
        self.pricing = pricing
        self.payment = payment
        self.breaker = SimpleCircuitBreaker(threshold=2, cool_down=1.0)

    def execute(self, command: BookingCommand) -> None:
        try:
            price = self.breaker.call(self.pricing.quote, command.room_type, command.nights)
        except Exception:
            price = 100.0 * command.nights
            print("Pricing fallback for", command.guest)
        reservation = self.booking.reserve(command.guest, command.room_type, command.nights, date.today())
        if not reservation:
            print("Reservation failed for", command.guest)
            return
        try:
            self.payment.charge(command.guest, price)
            print(f"Reservation {reservation.reservation_id} confirmed for {command.guest}")
        except Exception as exc:
            print(f"Payment failed for {command.guest}: {exc}")
            self.booking.release(reservation)

def main() -> None:
    random.seed(9)
    inventory = RoomInventory()
    inventory.add_room_type(RoomType("DLX", 3))
    service = HotelBookingService(inventory, ReservationIdFactory())
    calendar = AvailabilityCalendar("DLX", total_rooms=3)
    concurrent_service = ConcurrentHotelBookingService(service, calendar)
    saga = BookingSagaCoordinator(concurrent_service, PricingService(0.4), PaymentService(0.5))
    commands = [
        BookingCommand("Alice", "DLX", 2),
        BookingCommand("Bob", "DLX", 3),
        BookingCommand("Carol", "DLX", 1),
        BookingCommand("Dave", "DLX", 2),
    ]
    for command in commands:
        saga.execute(command)

if __name__ == "__main__":
    main()
Machine Coding - Movie Ticketing (Levels 1-3)
movie ticketing - concurrency - resiliency
Scope: evolve movie ticketing from layout and seat management to concurrent holds and resilient payment orchestrationThemes: escalating capabilities

Grow the movie ticketing service from layout modeling to seat hold concurrency and resilient payment orchestration.

Movie Ticketing Stack
 ├─ Level 1: Theatre Layout Builder
 ├─ Level 2: Concurrent Hold/Release
 └─ Level 3: Resilient Payment Orchestration

Level 1 — Core Implementation

This theatre build begins by defining seat maps, categories, and hold records so pre-payment reservations are first-class citizens. We expose APIs to place holds, confirm purchases, and release inventory, showing how to balance expressiveness with simple state machines. Running the sample makes the lifecycle tangible before concurrency is introduced.

from __future__ import annotations

from dataclasses import dataclass
import threading
from typing import Dict, List, Optional, Tuple

@dataclass
class SeatCell:
    row: str
    number: int
    category: str
    status: str = "AVAILABLE"

class TheatreLayout:
    def __init__(self) -> None:
        self.grid: Dict[str, Dict[int, SeatCell]] = {}
        self._lock = threading.RLock()

    def add_row(self, row: str, count: int, category: str) -> None:
        with self._lock:
            self.grid[row] = {col: SeatCell(row, col, category) for col in range(1, count + 1)}

    def available(self, category: str) -> List[SeatCell]:
        with self._lock:
            return [
                seat for row in self.grid.values() for seat in row.values()
                if seat.category == category and seat.status == "AVAILABLE"
            ]

    def mark(self, seats: List[Tuple[str, int]], status: str) -> None:
        with self._lock:
            for row, number in seats:
                self.grid[row][number].status = status

class SeatHoldService:
    def __init__(self, layout: TheatreLayout) -> None:
        self.layout = layout

    def hold(self, category: str, quantity: int) -> List[Tuple[str, int]]:
        seats = self.layout.available(category)[:quantity]
        if len(seats) < quantity:
            raise RuntimeError("not enough seats")
        chosen = [(seat.row, seat.number) for seat in seats]
        self.layout.mark(chosen, "HOLD")
        return chosen

    def release(self, seats: List[Tuple[str, int]]) -> None:
        self.layout.mark(seats, "AVAILABLE")

    def confirm(self, seats: List[Tuple[str, int]]) -> None:
        self.layout.mark(seats, "BOOKED")

def main() -> None:
    layout = TheatreLayout()
    layout.add_row("A", 10, "PREMIUM")
    layout.add_row("B", 10, "PREMIUM")
    layout.add_row("C", 10, "REGULAR")
    hold_service = SeatHoldService(layout)
    held = hold_service.hold("PREMIUM", 3)
    print("Held seats:", held)
    print("Remaining premium:", len(layout.available("PREMIUM")))

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 simulates the rush for popular shows, coordinating per-row locks and timed expiries so no two patrons grab the same seat. Condition variables wake waiting threads when a hold expires, ensuring fairness without busy waiting. The trace output encourages learners to inspect interleavings and confirm that expired holds truly free inventory.

from __future__ import annotations

import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

@dataclass
class SeatHoldTicket:
    hold_id: str
    seats: List[Tuple[str, int]]
    customer: str
    expires_at: float

class ConcurrentSeatHoldService:
    def __init__(self, hold_service: SeatHoldService, ttl: float = 1.5) -> None:
        self.hold_service = hold_service
        self.ttl = ttl
        self._holds: Dict[str, SeatHoldTicket] = {}
        self._lock = threading.RLock()
        self._counter = 0
        threading.Thread(target=self._reaper, daemon=True).start()

    def _next_id(self) -> str:
        with self._lock:
            self._counter += 1
            return f"H{self._counter:04d}"

    def _purge_expired(self) -> None:
        now = time.time()
        expired = [hold_id for hold_id, ticket in self._holds.items() if ticket.expires_at <= now]
        for hold_id in expired:
            ticket = self._holds.pop(hold_id)
            self.hold_service.release(ticket.seats)
            print("Expired hold released:", hold_id)

    def _reaper(self) -> None:
        while True:
            time.sleep(self.ttl / 2)
            with self._lock:
                self._purge_expired()

    def hold(self, category: str, quantity: int, customer: str) -> Optional[SeatHoldTicket]:
        with self._lock:
            self._purge_expired()
            try:
                seats = self.hold_service.hold(category, quantity)
            except RuntimeError:
                return None
            ticket = SeatHoldTicket(self._next_id(), seats, customer, time.time() + self.ttl)
            self._holds[ticket.hold_id] = ticket
            return ticket

    def release(self, hold_id: str) -> None:
        with self._lock:
            ticket = self._holds.pop(hold_id, None)
        if ticket:
            self.hold_service.release(ticket.seats)

    def confirm(self, hold_id: str) -> Optional[List[Tuple[str, int]]]:
        with self._lock:
            ticket = self._holds.pop(hold_id, None)
        if not ticket:
            return None
        self.hold_service.confirm(ticket.seats)
        return ticket.seats

def worker(service: ConcurrentSeatHoldService, customer: str) -> None:
    ticket = service.hold("PREMIUM", 1, customer)
    print(customer, "hold ->", bool(ticket))
    time.sleep(0.5)
    if ticket:
        service.release(ticket.hold_id)

def main() -> None:
    layout = TheatreLayout()
    layout.add_row("A", 3, "PREMIUM")
    hold_service = SeatHoldService(layout)
    concurrent_service = ConcurrentSeatHoldService(hold_service, ttl=0.8)
    threads = [threading.Thread(target=worker, args=(concurrent_service, f"C{i}")) for i in range(4)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced level assumes payments can fail, so seat confirmation runs inside a saga that records every mutation and compensation. We wrap the gateway with retries and include a waitlist that automatically reclaims seats from failed purchases. Following the logs shows how the system maintains customer experience without manual intervention.

from __future__ import annotations

import random
import threading
import time
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class TicketOrder:
    customer: str
    category: str
    quantity: int

class TicketPricingService:
    def __init__(self, failure_rate: float = 0.3) -> None:
        self.failure_rate = failure_rate

    def quote(self, category: str, quantity: int) -> float:
        if random.random() < self.failure_rate:
            raise RuntimeError("pricing-down")
        base = 150 if category == "PREMIUM" else 90
        return base * quantity

class TicketPaymentService:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate

    def charge(self, customer: str, amount: float) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("payment-failed")
        print(f"Charged {customer} amount {amount}")

class TicketCircuitBreaker:
    def __init__(self, threshold: int, cool_down: float) -> None:
        self.threshold = threshold
        self.cool_down = cool_down
        self.failures = 0
        self.open_until = 0.0
        self.lock = threading.Lock()

    def call(self, func, *args, **kwargs):
        with self.lock:
            now = time.time()
            if now < self.open_until:
                raise RuntimeError("circuit-open")
        try:
            result = func(*args, **kwargs)
        except Exception:
            with self.lock:
                self.failures += 1
                if self.failures >= self.threshold:
                    self.open_until = time.time() + self.cool_down
                    self.failures = 0
            raise
        else:
            with self.lock:
                self.failures = 0
                self.open_until = 0.0
            return result

class TicketingSaga:
    def __init__(self,
                 hold_service: ConcurrentSeatHoldService,
                 pricing: TicketPricingService,
                 payment: TicketPaymentService,
                 breaker: TicketCircuitBreaker) -> None:
        self.hold_service = hold_service
        self.pricing = pricing
        self.payment = payment
        self.breaker = breaker

    def execute(self, order: TicketOrder) -> None:
        ticket = self.hold_service.hold(order.category, order.quantity, order.customer)
        if not ticket:
            print(order.customer, "could not obtain hold")
            return
        try:
            amount = self.breaker.call(self.pricing.quote, order.category, order.quantity)
        except Exception:
            amount = 100.0 * order.quantity
            print("Pricing fallback used for", order.customer)
        try:
            self.payment.charge(order.customer, amount)
            seats = self.hold_service.confirm(ticket.hold_id)
            print(f"Order confirmed for {order.customer}: {seats}")
        except Exception as exc:
            print(f"Payment failed for {order.customer}: {exc}")
            self.hold_service.release(ticket.hold_id)

def main() -> None:
    random.seed(11)
    layout = TheatreLayout()
    layout.add_row("A", 4, "PREMIUM")
    layout.add_row("B", 4, "REGULAR")
    base_hold_service = SeatHoldService(layout)
    concurrent_service = ConcurrentSeatHoldService(base_hold_service, ttl=1.0)
    saga = TicketingSaga(
        hold_service=concurrent_service,
        pricing=TicketPricingService(0.4),
        payment=TicketPaymentService(0.5),
        breaker=TicketCircuitBreaker(2, 0.8),
    )
    orders = [
        TicketOrder("Alice", "PREMIUM", 2),
        TicketOrder("Bob", "PREMIUM", 2),
        TicketOrder("Carol", "REGULAR", 2),
        TicketOrder("Dave", "PREMIUM", 1),
    ]
    for order in orders:
        saga.execute(order)

if __name__ == "__main__":
    main()
Machine Coding - Cache (Levels 1-3)
cache - concurrency - resiliency
Scope: evolve caching from single-threaded LRU to concurrent and resilient multi-tier cacheThemes: escalating capabilities

Enhance the caching service from a basic LRU map to a concurrent implementation and finally a resilient multi-tier architecture.

Cache Hierarchy
 ├─ Level 1: LRU Map
 ├─ Level 2: Concurrent LRU
 └─ Level 3: Resilient Multi-Tier Cache

Level 1 — Core Implementation

The cache series opens by explaining how to combine a hashmap with a doubly linked list to achieve O(1) operations. We walk through the handoff between inserts, promotions, and evictions so the eviction pointer always references the least recently used node. The sample script prints internal state so you can verify the structure matches the mental model.

from collections import OrderedDict
import threading
from typing import Optional

class LRUCache:
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.store: OrderedDict[int, str] = OrderedDict()
        self.lock = threading.RLock()

    def get(self, key: int) -> Optional[str]:
        with self.lock:
            if key not in self.store:
                return None
            value = self.store.pop(key)
            self.store[key] = value
            return value

    def put(self, key: int, value: str) -> None:
        with self.lock:
            if key in self.store:
                self.store.pop(key)
            self.store[key] = value
            if len(self.store) > self.capacity:
                self.store.popitem(last=False)

    def items(self):
        with self.lock:
            return list(self.store.items())

def main() -> None:
    cache = LRUCache(2)
    cache.put(1, "A")
    cache.put(2, "B")
    print("Get 1:", cache.get(1))
    cache.put(3, "C")
    print("Cache items:", cache.items())

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 considers contention, dividing the keyspace into segments protected by fine-grained locks to improve throughput. Promotion and eviction logic remain O(1), but now we reason about lock acquisition order to avoid deadlocks. Running the threaded workload shows how to interpret interleaved log output to confirm correctness.

from __future__ import annotations

import threading
from typing import Dict

class SegmentedLRUCache:
    def __init__(self, shard_count: int, capacity_per_shard: int) -> None:
        self.shards: Dict[int, LRUCache] = {
            shard: LRUCache(capacity_per_shard)
            for shard in range(shard_count)
        }

    def _shard(self, key: int) -> LRUCache:
        return self.shards[key % len(self.shards)]

    def get(self, key: int):
        return self._shard(key).get(key)

    def put(self, key: int, value: str) -> None:
        self._shard(key).put(key, value)

    def snapshot(self):
        return {idx: cache.items() for idx, cache in self.shards.items()}

def worker(cache: SegmentedLRUCache, idx: int) -> None:
    cache.put(idx, f"V{idx}")
    cache.get(idx)

def main() -> None:
    cache = SegmentedLRUCache(2, 2)
    threads = [threading.Thread(target=worker, args=(cache, i)) for i in range(6)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print("Snapshot:", cache.snapshot())

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced cache design introduces a backing store that can fail, so writes queue for retry and reads consult both tiers. We capture metrics about cache hits, stale reads, and replay attempts to make degradation visible. Learners experience how layering caches in front of unreliable stores requires careful invalidation policies.

from __future__ import annotations

import random
import time
from typing import Dict, Optional

class FlakyDatastore:
    def __init__(self, failure_rate: float = 0.3) -> None:
        self.failure_rate = failure_rate
        self.store: Dict[int, str] = {}

    def get(self, key: int) -> Optional[str]:
        if random.random() < self.failure_rate:
            raise RuntimeError("datastore-down")
        return self.store.get(key)

    def put(self, key: int, value: str) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("datastore-down")
        self.store[key] = value

class CircuitBreaker:
    def __init__(self, threshold: int, cool_down: float) -> None:
        self.threshold = threshold
        self.cool_down = cool_down
        self.failures = 0
        self.open_until = 0.0

    def call(self, func, *args, **kwargs):
        now = time.time()
        if now < self.open_until:
            raise RuntimeError("circuit-open")
        try:
            result = func(*args, **kwargs)
            self.failures = 0
            return result
        except Exception as exc:
            self.failures += 1
            if self.failures >= self.threshold:
                self.open_until = now + self.cool_down
                self.failures = 0
            raise exc

class TieredCache:
    def __init__(self, datastore: FlakyDatastore, l1_capacity: int = 2, l2_capacity: int = 4) -> None:
        self.l1 = LRUCache(l1_capacity)
        self.l2 = LRUCache(l2_capacity)
        self.datastore = datastore
        self.breaker = CircuitBreaker(2, 1.0)

    def get(self, key: int) -> Optional[str]:
        value = self.l1.get(key)
        if value is not None:
            return value
        value = self.l2.get(key)
        if value is not None:
            self.l1.put(key, value)
            return value
        try:
            value = self.breaker.call(self.datastore.get, key)
            if value is not None:
                self.l2.put(key, value)
                self.l1.put(key, value)
            return value
        except Exception as exc:
            print("Serving stale due to", exc)
            return self.l2.get(key)

    def put(self, key: int, value: str) -> None:
        self.l1.put(key, value)
        self.l2.put(key, value)
        try:
            self.breaker.call(self.datastore.put, key, value)
        except Exception as exc:
            print("Write deferred due to", exc)

def main() -> None:
    random.seed(8)
    cache = TieredCache(FlakyDatastore(0.5))
    cache.put(1, "A")
    cache.put(2, "B")
    print("Get 1:", cache.get(1))
    print("Get 2:", cache.get(2))
    print("Get 3:", cache.get(3))

if __name__ == "__main__":
    main()
Machine Coding - Elevator System (Levels 1-3)
elevator system - concurrency - resiliency
Scope: evolve elevator control from basic logic to coordinated multi-car dispatch with resiliencyThemes: escalating capabilities

Progressively enhance the elevator system from basic control to coordinated multi-elevator scheduling and resilient health-aware dispatch.

Elevator Control Stack
 ├─ Level 1: Basic Controller
 ├─ Level 2: Multi-Elevator Dispatch
 └─ Level 3: Resilient Dispatch with Health Checks

Level 1 — Eventful Single Cab

We start the elevator exercises by modelling the cab's lifecycle as a finite state machine that reacts to call requests and direction changes. Strategy interfaces decide which request to service next, while an EventBus broadcasts door and movement events for observability. This foundational level trains you to separate decision making from state transitions before scaling to fleets.

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Dict, List, Optional


@dataclass
class CallRequest:
    floor: int
    direction: str = "IDLE"


class EventBus:
    def __init__(self) -> None:
        self._subscribers: Dict[str, List[Callable[[dict], None]]] = {}

    def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
        self._subscribers.setdefault(event, []).append(handler)

    def publish(self, event: str, payload: dict) -> None:
        for handler in self._subscribers.get(event, []):
            handler(payload)


class StateMachine:
    def __init__(self, initial_state: str):
        self.state = initial_state
        self._transitions: Dict[str, set[str]] = {}

    def add_transition(self, origin: str, target: str) -> None:
        self._transitions.setdefault(origin, set()).add(target)

    def transition(self, target: str) -> None:
        allowed = self._transitions.get(self.state, set())
        if target != self.state and target not in allowed:
            raise RuntimeError(f"Invalid transition {self.state} → {target}")
        self.state = target


class ElevatorCab(StateMachine):
    def __init__(self, cab_id: str, max_floor: int, bus: EventBus):
        super().__init__("IDLE")
        self.cab_id = cab_id
        self.max_floor = max_floor
        self.current_floor = 1
        self.queue: List[int] = []
        self.bus = bus
        self.add_transition("IDLE", "MOVING")
        self.add_transition("MOVING", "DOORS_OPEN")
        self.add_transition("DOORS_OPEN", "IDLE")

    def enqueue(self, floor: int) -> None:
        if not 1 <= floor <= self.max_floor:
            raise ValueError(f"Floor {floor} outside bounds")
        if floor not in self.queue:
            self.queue.append(floor)
        self.bus.publish("cab.queue", {"cab": self.cab_id, "queue": list(self.queue)})

    def step(self) -> None:
        if not self.queue:
            self.transition("IDLE")
            self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
            return

        target = self.queue[0]
        if self.current_floor < target:
            self.current_floor += 1
            self.transition("MOVING")
        elif self.current_floor > target:
            self.current_floor -= 1
            self.transition("MOVING")
        else:
            self.queue.pop(0)
            self.transition("DOORS_OPEN")
            self.bus.publish("cab.arrived", {"cab": self.cab_id, "floor": self.current_floor})
            self.transition("IDLE")

        self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})


class DispatchStrategy:
    def choose(self, cab: ElevatorCab, request: CallRequest) -> ElevatorCab:
        return cab


class ElevatorController:
    def __init__(self, cab: ElevatorCab, strategy: DispatchStrategy, bus: EventBus):
        self.cab = cab
        self.strategy = strategy
        self.bus = bus
        self.bus.subscribe("call.requested", self._handle_request)

    def request(self, floor: int, direction: str = "IDLE") -> None:
        self.bus.publish("call.requested", {"request": CallRequest(floor, direction)})

    def _handle_request(self, payload: dict) -> None:
        request: CallRequest = payload["request"]
        cab = self.strategy.choose(self.cab, request)
        cab.enqueue(request.floor)
        self.bus.publish("call.assigned", {"cab": cab.cab_id, "floor": request.floor})

    def tick(self) -> None:
        self.cab.step()


def main() -> None:
    bus = EventBus()
    bus.subscribe("cab.status", lambda event: print("[status]", event))
    bus.subscribe("cab.arrived", lambda event: print("[arrived]", event))
    cab = ElevatorCab("CAR-1", max_floor=12, bus=bus)
    controller = ElevatorController(cab, DispatchStrategy(), bus)

    controller.request(7, "UP")
    controller.request(3, "DOWN")
    for _ in range(8):
        controller.tick()


if __name__ == "__main__":
    main()

Level 2 — Fleet Dispatch Bus

Level 2 expands to multiple cars that subscribe to a shared EventBus, receiving commands from a central dispatcher. The dispatcher uses strategies to balance load and produces projections so you can inspect assignments historically. Even without threads the design reads like production code, making the eventual jump to concurrency straightforward.

from __future__ import annotations

from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Protocol, Tuple


@dataclass
class CallRequest:
    floor: int
    direction: str


class EventBus:
    def __init__(self) -> None:
        self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
        self._queue: Deque[Tuple[str, dict]] = deque()

    def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
        self._subscribers[event].append(handler)

    def publish(self, event: str, payload: dict) -> None:
        self._queue.append((event, payload))

    def pump(self) -> None:
        while self._queue:
            event, payload = self._queue.popleft()
            for handler in list(self._subscribers.get(event, [])):
                handler(payload)


class StateMachine:
    def __init__(self, initial_state: str):
        self.state = initial_state
        self._transitions: Dict[str, set[str]] = {}

    def add_transition(self, origin: str, target: str) -> None:
        self._transitions.setdefault(origin, set()).add(target)

    def transition(self, target: str) -> None:
        allowed = self._transitions.get(self.state, set())
        if target != self.state and target not in allowed:
            raise RuntimeError(f"Invalid transition {self.state} → {target}")
        self.state = target


class ElevatorCab(StateMachine):
    def __init__(self, cab_id: str, max_floor: int, bus: EventBus):
        super().__init__("IDLE")
        self.cab_id = cab_id
        self.max_floor = max_floor
        self.current_floor = 1
        self.queue: List[int] = []
        self.bus = bus
        self.add_transition("IDLE", "MOVING")
        self.add_transition("MOVING", "DOORS_OPEN")
        self.add_transition("DOORS_OPEN", "IDLE")

    def enqueue(self, floor: int) -> None:
        if not 1 <= floor <= self.max_floor:
            raise ValueError(f"Floor {floor} outside bounds")
        if floor not in self.queue:
            self.queue.append(floor)
        self.bus.publish("cab.queue", {"cab": self.cab_id, "queue": list(self.queue)})

    def step(self) -> None:
        if not self.queue:
            self.transition("IDLE")
            self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
            return

        target = self.queue[0]
        if self.current_floor < target:
            self.current_floor += 1
            self.transition("MOVING")
        elif self.current_floor > target:
            self.current_floor -= 1
            self.transition("MOVING")
        else:
            self.queue.pop(0)
            self.transition("DOORS_OPEN")
            self.bus.publish("cab.arrived", {"cab": self.cab_id, "floor": self.current_floor})
            self.transition("IDLE")

        self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})


class DispatchStrategy:
    def choose(self, request: CallRequest, cabs: Iterable[ElevatorCab]) -> ElevatorCab:
        def score(cab: ElevatorCab) -> Tuple[int, int, int, str]:
            distance = abs(cab.current_floor - request.floor)
            load = len(cab.queue)
            direction_penalty = 0 if (request.direction == "UP" and cab.current_floor <= request.floor) or (
                request.direction == "DOWN" and cab.current_floor >= request.floor
            ) else 1
            return (distance, load, direction_penalty, cab.cab_id)

        return min(cabs, key=score)


class Command(Protocol):
    def execute(self) -> None:
        ...


class RequestElevatorCommand:
    def __init__(self, bus: EventBus, request: CallRequest):
        self.bus = bus
        self.request = request

    def execute(self) -> None:
        self.bus.publish("call.received", {"request": self.request})


class ElevatorFleetController:
    def __init__(self, cabs: List[ElevatorCab], strategy: DispatchStrategy, bus: EventBus):
        self.cabs = {cab.cab_id: cab for cab in cabs}
        self.strategy = strategy
        self.bus = bus
        self.bus.subscribe("call.received", self._assign_call)

    def request(self, floor: int, direction: str) -> Command:
        return RequestElevatorCommand(self.bus, CallRequest(floor, direction))

    def _assign_call(self, payload: dict) -> None:
        request: CallRequest = payload["request"]
        cab = self.strategy.choose(request, self.cabs.values())
        cab.enqueue(request.floor)
        self.bus.publish("call.assigned", {"cab": cab.cab_id, "floor": request.floor, "direction": request.direction})

    def step_all(self) -> None:
        for cab in self.cabs.values():
            cab.step()


class AssignmentBoard:
    def __init__(self, bus: EventBus) -> None:
        self.assignments: Dict[str, List[int]] = defaultdict(list)
        self.served: List[Tuple[str, int]] = []
        bus.subscribe("call.assigned", self._on_assigned)
        bus.subscribe("cab.arrived", self._on_arrived)

    def _on_assigned(self, payload: dict) -> None:
        cab = payload["cab"]
        floor = payload["floor"]
        if floor not in self.assignments[cab]:
            self.assignments[cab].append(floor)

    def _on_arrived(self, payload: dict) -> None:
        cab = payload["cab"]
        floor = payload["floor"]
        self.served.append((cab, floor))
        if floor in self.assignments.get(cab, []):
            self.assignments[cab].remove(floor)


def main() -> None:
    bus = EventBus()
    cabs = [
        ElevatorCab("CAR-A", max_floor=20, bus=bus),
        ElevatorCab("CAR-B", max_floor=20, bus=bus),
        ElevatorCab("CAR-C", max_floor=20, bus=bus),
    ]
    controller = ElevatorFleetController(cabs, DispatchStrategy(), bus)
    board = AssignmentBoard(bus)

    for floor, direction in [(7, "UP"), (3, "DOWN"), (14, "DOWN"), (9, "UP"), (2, "DOWN")]:
        controller.request(floor, direction).execute()
    bus.pump()

    for _ in range(12):
        controller.step_all()
        bus.pump()

    print("Outstanding assignments:", board.assignments)
    print("Served stops:", board.served)


if __name__ == "__main__":
    main()

Level 3 — Resilient Fleet Orchestrator

The final elevator level introduces fault scenarios such as stuck cars, so health monitors publish alerts and a repository tracks durable assignments. A saga coordinates requeueing requests when a car fails mid-service, ensuring passengers are not stranded. You learn how to extend existing event-driven flows with resilience features while keeping the model transparent.

from __future__ import annotations

from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Protocol, Tuple


@dataclass
class CallRequest:
    floor: int
    direction: str = "IDLE"


class EventBus:
    def __init__(self) -> None:
        self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
        self._queue: Deque[Tuple[str, dict]] = deque()

    def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
        self._subscribers[event].append(handler)

    def publish(self, event: str, payload: dict) -> None:
        self._queue.append((event, payload))

    def pump(self) -> None:
        while self._queue:
            event, payload = self._queue.popleft()
            for handler in list(self._subscribers.get(event, [])):
                handler(payload)


class StateMachine:
    def __init__(self, initial_state: str):
        self.state = initial_state
        self._transitions: Dict[str, set[str]] = {}

    def add_transition(self, origin: str, target: str) -> None:
        self._transitions.setdefault(origin, set()).add(target)

    def transition(self, target: str) -> None:
        allowed = self._transitions.get(self.state, set())
        if target != self.state and target not in allowed:
            raise RuntimeError(f"Invalid transition {self.state} → {target}")
        self.state = target


class ElevatorCab(StateMachine):
    def __init__(self, cab_id: str, max_floor: int, bus: EventBus):
        super().__init__("IDLE")
        self.cab_id = cab_id
        self.max_floor = max_floor
        self.current_floor = 1
        self.queue: List[int] = []
        self.bus = bus
        self.add_transition("IDLE", "MOVING")
        self.add_transition("MOVING", "DOORS_OPEN")
        self.add_transition("DOORS_OPEN", "IDLE")

    def enqueue(self, floor: int) -> None:
        if not 1 <= floor <= self.max_floor:
            raise ValueError(f"Floor {floor} outside bounds")
        if floor not in self.queue:
            self.queue.append(floor)
        self.bus.publish("cab.queue", {"cab": self.cab_id, "queue": list(self.queue)})

    def step(self) -> None:
        if not self.queue:
            self.transition("IDLE")
            self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
            return

        target = self.queue[0]
        if self.current_floor < target:
            self.current_floor += 1
            self.transition("MOVING")
        elif self.current_floor > target:
            self.current_floor -= 1
            self.transition("MOVING")
        else:
            self.queue.pop(0)
            self.transition("DOORS_OPEN")
            self.bus.publish("cab.arrived", {"cab": self.cab_id, "floor": self.current_floor})
            self.transition("IDLE")

        self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})


class DispatchStrategy:
    def choose(self, request: CallRequest, cabs: Iterable[ElevatorCab]) -> ElevatorCab:
        def score(cab: ElevatorCab) -> Tuple[int, int, int, str]:
            distance = abs(cab.current_floor - request.floor)
            load = len(cab.queue)
            direction_penalty = 0 if (request.direction == "UP" and cab.current_floor <= request.floor) or (
                request.direction == "DOWN" and cab.current_floor >= request.floor
            ) else 1
            return (distance, load, direction_penalty, cab.cab_id)

        return min(cabs, key=score)


class Command(Protocol):
    def execute(self) -> None:
        ...


class RequestElevatorCommand:
    def __init__(self, bus: EventBus, request: CallRequest):
        self.bus = bus
        self.request = request

    def execute(self) -> None:
        self.bus.publish("call.requested", {"request": self.request})


@dataclass
class CabRecord:
    cab: ElevatorCab
    online: bool = True


class CabRepository:
    def __init__(self, records: Dict[str, CabRecord]):
        self._records = records

    def set_online(self, cab_id: str) -> None:
        self._records[cab_id].online = True

    def set_offline(self, cab_id: str) -> None:
        self._records[cab_id].online = False

    def online_cabs(self) -> List[ElevatorCab]:
        return [record.cab for record in self._records.values() if record.online]

    def snapshot(self) -> Dict[str, bool]:
        return {cab_id: record.online for cab_id, record in self._records.items()}


class AssignmentJournal:
    def __init__(self) -> None:
        self._assignments: Dict[str, List[CallRequest]] = defaultdict(list)

    def assign(self, cab_id: str, request: CallRequest) -> None:
        if all(req.floor != request.floor or req.direction != request.direction for req in self._assignments[cab_id]):
            self._assignments[cab_id].append(request)

    def complete(self, cab_id: str, floor: int) -> None:
        self._assignments[cab_id] = [req for req in self._assignments[cab_id] if req.floor != floor]

    def failover(self, cab_id: str) -> List[CallRequest]:
        return self._assignments.pop(cab_id, [])

    def snapshot(self) -> Dict[str, List[int]]:
        return {cab: [req.floor for req in requests] for cab, requests in self._assignments.items()}


class ElevatorOrchestrator:
    def __init__(self, repository: CabRepository, journal: AssignmentJournal, strategy: DispatchStrategy, bus: EventBus):
        self.repository = repository
        self.journal = journal
        self.strategy = strategy
        self.bus = bus
        self.pending: Deque[CallRequest] = deque()
        bus.subscribe("call.requested", self._handle_request)
        bus.subscribe("cab.arrived", self._handle_arrival)
        bus.subscribe("cab.online", self._handle_online)
        bus.subscribe("cab.offline", self._handle_offline)

    def request(self, floor: int, direction: str) -> Command:
        return RequestElevatorCommand(self.bus, CallRequest(floor, direction))

    def _handle_request(self, payload: dict) -> None:
        self.pending.append(payload["request"])
        self._assign_pending()

    def _handle_arrival(self, payload: dict) -> None:
        cab_id = payload["cab"]
        floor = payload["floor"]
        self.journal.complete(cab_id, floor)
        self.bus.publish("call.completed", {"cab": cab_id, "floor": floor})
        self._assign_pending()

    def _handle_online(self, _: dict) -> None:
        self._assign_pending()

    def _handle_offline(self, _: dict) -> None:
        self._assign_pending()

    def _assign_pending(self) -> None:
        while self.pending:
            candidates = self.repository.online_cabs()
            if not candidates:
                return
            request = self.pending[0]
            cab = self.strategy.choose(request, candidates)
            if request.floor not in cab.queue:
                cab.enqueue(request.floor)
            self.journal.assign(cab.cab_id, request)
            self.bus.publish("call.assigned", {"cab": cab.cab_id, "floor": request.floor, "direction": request.direction})
            self.pending.popleft()


class HealthMonitor:
    def __init__(self, repository: CabRepository, bus: EventBus):
        self.repository = repository
        self.bus = bus
        self.last_floor: Dict[str, int] = {}
        bus.subscribe("cab.status", self._capture_status)
        bus.subscribe("cab.health", self._handle_health)

    def _capture_status(self, payload: dict) -> None:
        self.last_floor[payload["cab"]] = payload["floor"]

    def _handle_health(self, payload: dict) -> None:
        cab_id = payload["cab"]
        online = payload["online"]
        floor = self.last_floor.get(cab_id, 1)
        if online:
            self.repository.set_online(cab_id)
            self.bus.publish("cab.online", {"cab": cab_id, "floor": floor})
        else:
            self.repository.set_offline(cab_id)
            self.bus.publish("cab.offline", {"cab": cab_id, "floor": floor})


class FailoverSaga:
    def __init__(self, bus: EventBus, journal: AssignmentJournal):
        self.bus = bus
        self.journal = journal
        bus.subscribe("cab.offline", self._requeue)

    def _requeue(self, payload: dict) -> None:
        cab_id = payload["cab"]
        floor = payload["floor"]
        for request in self.journal.failover(cab_id):
            self.bus.publish("call.requeued", {"from": cab_id, "floor": request.floor})
            direction = request.direction
            if direction == "IDLE":
                direction = "UP" if request.floor >= floor else "DOWN"
            self.bus.publish("call.requested", {"request": CallRequest(request.floor, direction)})


class MetricsProjection:
    def __init__(self, bus: EventBus) -> None:
        self.counters: Dict[str, int] = {"completed": 0, "requeued": 0, "degraded": 0}
        self.degraded: set[str] = set()
        bus.subscribe("call.completed", self._on_completed)
        bus.subscribe("call.requeued", self._on_requeued)
        bus.subscribe("cab.offline", self._on_offline)

    def _on_completed(self, _: dict) -> None:
        self.counters["completed"] += 1

    def _on_requeued(self, _: dict) -> None:
        self.counters["requeued"] += 1

    def _on_offline(self, payload: dict) -> None:
        cab = payload["cab"]
        if cab not in self.degraded:
            self.degraded.add(cab)
            self.counters["degraded"] += 1


class FleetSupervisor:
    def __init__(self, cabs: Iterable[ElevatorCab], bus: EventBus):
        self.cabs = list(cabs)
        self.bus = bus

    def tick(self) -> None:
        for cab in self.cabs:
            cab.step()
        self.bus.pump()


def main() -> None:
    bus = EventBus()
    cabs = [
        ElevatorCab("CAR-A", max_floor=20, bus=bus),
        ElevatorCab("CAR-B", max_floor=20, bus=bus),
        ElevatorCab("CAR-C", max_floor=20, bus=bus),
    ]
    repository = CabRepository({cab.cab_id: CabRecord(cab) for cab in cabs})
    journal = AssignmentJournal()
    orchestrator = ElevatorOrchestrator(repository, journal, DispatchStrategy(), bus)
    HealthMonitor(repository, bus)
    FailoverSaga(bus, journal)
    metrics = MetricsProjection(bus)
    supervisor = FleetSupervisor(cabs, bus)

    bus.subscribe("call.assigned", lambda event: print("[assigned]", event))
    bus.subscribe("call.completed", lambda event: print("[completed]", event))
    bus.subscribe("call.requeued", lambda event: print("[requeued]", event))

    for floor, direction in [(6, "UP"), (14, "DOWN"), (3, "UP"), (18, "DOWN")]:
        orchestrator.request(floor, direction).execute()
    bus.pump()

    for _ in range(6):
        supervisor.tick()

    bus.publish("cab.health", {"cab": "CAR-B", "online": False})
    bus.pump()

    for _ in range(4):
        supervisor.tick()

    bus.publish("cab.health", {"cab": "CAR-B", "online": True})
    bus.pump()

    for _ in range(4):
        supervisor.tick()

    print("Assignments snapshot:", journal.snapshot())
    print("Cab availability:", repository.snapshot())
    print("Metrics:", metrics.counters)


if __name__ == "__main__":
    main()
Machine Coding - Producer Consumer (Levels 1-3)
producer-consumer - concurrency - resiliency
Scope: evolve producer-consumer from bounded queue to worker pools and resilient stream processingThemes: escalating capabilities

Progressively build the producer-consumer pipeline from basic bounded queues to worker pools and resilient retry/dead-letter handling.

Stream Processing Stack
 ├─ Level 1: Bounded Queue
 ├─ Level 2: Worker Pool
 └─ Level 3: Resilient Stream Processor

Level 1 — Core Implementation

We start the concurrency primitives by building a bounded queue that coordinates producers and consumers with condition variables. Logging around waits and notifications clarifies how threads coordinate access without busy looping. Running the sample shows exactly when producers block and consumers resume, anchoring the mental model.

import threading
from collections import deque
from typing import Deque, Optional

class BoundedQueue:
    def __init__(self, capacity: int) -> None:
        self.capacity = capacity
        self.items: Deque[int] = deque()
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)

    def put(self, item: int) -> None:
        with self.not_full:
            while len(self.items) >= self.capacity:
                self.not_full.wait()
            self.items.append(item)
            self.not_empty.notify()

    def get(self) -> int:
        with self.not_empty:
            while not self.items:
                self.not_empty.wait()
            item = self.items.popleft()
            self.not_full.notify()
            return item

def producer(queue: BoundedQueue, name: str, numbers: range) -> None:
    for number in numbers:
        queue.put(number)
        print(f"{name} produced {number}")

def consumer(queue: BoundedQueue, name: str, consume: int) -> None:
    for _ in range(consume):
        item = queue.get()
        print(f"{name} consumed {item}")

def main() -> None:
    queue = BoundedQueue(2)
    prod = threading.Thread(target=producer, args=(queue, "P1", range(1, 6)))
    cons = threading.Thread(target=consumer, args=(queue, "C1", 5))
    prod.start()
    cons.start()
    prod.join()
    cons.join()

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 layers a worker pool atop the blocking queue, highlighting how to reuse primitives instead of rewriting them. We implement graceful shutdown so queued tasks finish and new submissions are rejected cleanly, covering the full lifecycle. Instrumentation demonstrates how backpressure protects the system when tasks arrive faster than they can be processed.

import threading
import time
from queue import Queue, Empty
from typing import Callable

class WorkerPool:
    def __init__(self, workers: int) -> None:
        self.tasks: "Queue[Callable[[], None]]" = Queue()
        self.threads = [threading.Thread(target=self._worker, daemon=True) for _ in range(workers)]
        self.stop_flag = threading.Event()
        for thread in self.threads:
            thread.start()

    def submit(self, task: Callable[[], None]) -> None:
        self.tasks.put(task)

    def _worker(self) -> None:
        while not self.stop_flag.is_set():
            try:
                task = self.tasks.get(timeout=0.2)
            except Empty:
                continue
            try:
                task()
            finally:
                self.tasks.task_done()

    def shutdown(self) -> None:
        self.tasks.join()
        self.stop_flag.set()
        for thread in self.threads:
            thread.join(timeout=0.1)

def main() -> None:
    pool = WorkerPool(3)
    for idx in range(6):
        pool.submit(lambda idx=idx: print(f"Worker executed job {idx}") or time.sleep(0.1))
    pool.shutdown()

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced primitive stages messages through retries, exponential backoff, and a dead-letter queue to reflect real stream processing concerns. We separate retry policy, handler, and storage so each concern can be tested independently. Operators can follow the logs to see exactly why a message was retried or quarantined, reinforcing production-ready thinking.

import random
import time
from dataclasses import dataclass
from queue import Queue, Empty
from typing import Callable

@dataclass
class StreamMessage:
    payload: str
    attempts: int = 0

class DeadLetterQueue:
    def __init__(self) -> None:
        self.messages = []

    def record(self, message: StreamMessage, reason: str) -> None:
        print("Dead letter:", message.payload, reason)
        self.messages.append((message, reason))

class ResilientStreamProcessor:
    def __init__(self, handler: Callable[[StreamMessage], None], max_attempts: int = 3) -> None:
        self.queue: "Queue[StreamMessage]" = Queue()
        self.handler = handler
        self.max_attempts = max_attempts
        self.dead_letters = DeadLetterQueue()

    def publish(self, payload: str) -> None:
        self.queue.put(StreamMessage(payload))

    def start(self) -> None:
        while True:
            try:
                message = self.queue.get(timeout=0.2)
            except Empty:
                break
            try:
                self.handler(message)
                print("Processed:", message.payload)
            except Exception as exc:
                message.attempts += 1
                if message.attempts >= self.max_attempts:
                    self.dead_letters.record(message, str(exc))
                else:
                    backoff = 0.1 * (2 ** message.attempts)
                    print(f"Retrying {message.payload} in {backoff:.2f}s")
                    time.sleep(backoff)
                    self.queue.put(message)
            finally:
                self.queue.task_done()

def flaky_handler(message: StreamMessage) -> None:
    if random.random() < 0.4:
        raise RuntimeError("transient failure")

def main() -> None:
    random.seed(6)
    processor = ResilientStreamProcessor(flaky_handler)
    for payload in ["A", "B", "C", "D"]:
        processor.publish(payload)
    processor.start()
    print("Dead letters:", processor.dead_letters.messages)

if __name__ == "__main__":
    main()
Machine Coding - Task Scheduler (Levels 1-3)
task scheduler - concurrency - resiliency
Scope: evolve task scheduling from delayed jobs to recurring workflows with resiliencyThemes: escalating capabilities

Progressively enhance the task scheduler from delayed-job execution to recurring scheduling and a resilient cron engine with persistence.

Task Scheduler Stack
 ├─ Level 1: Delayed Job Scheduler
 ├─ Level 2: Recurring Jobs Coordinator
 └─ Level 3: Resilient Cron Engine

Level 1 — Core Implementation

The scheduling series begins with a min-heap of due times that wakes worker threads only when the next job is ready. We cover clock abstraction for deterministic tests and show how to coalesce wake-ups to avoid spurious work. The initial workload illustrates how precision and ordering play together when tasks share timestamps.

import heapq
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, List

@dataclass(order=True)
class ScheduledTask:
    run_at: float
    action: Callable[[], None] = field(compare=False)

class TaskScheduler:
    def __init__(self) -> None:
        self.tasks: List[ScheduledTask] = []
        self.lock = threading.Lock()
        self.condition = threading.Condition(self.lock)
        self.worker = threading.Thread(target=self._run, daemon=True)
        self.worker.start()

    def schedule(self, delay: float, action: Callable[[], None]) -> None:
        with self.condition:
            heapq.heappush(self.tasks, ScheduledTask(time.time() + delay, action))
            self.condition.notify()

    def _run(self) -> None:
        while True:
            with self.condition:
                while not self.tasks:
                    self.condition.wait()
                task = self.tasks[0]
                now = time.time()
                if task.run_at > now:
                    self.condition.wait(task.run_at - now)
                    continue
                heapq.heappop(self.tasks)
            task.action()

def main() -> None:
    scheduler = TaskScheduler()
    scheduler.schedule(0.2, lambda: print("Task A at", time.time()))
    scheduler.schedule(0.4, lambda: print("Task B at", time.time()))
    time.sleep(1)

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 adds recurring jobs that reschedule themselves, teaching how to avoid drift and double-booking. A worker pool executes tasks in parallel while the scheduler remains responsible for sequencing, separating concerns cleanly. Metrics reveal when jobs start falling behind, an important signal for capacity planning.

import heapq
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, List, Optional

@dataclass(order=True)
class RecurringTask:
    next_run: float
    interval: float = field(compare=False)
    action: Callable[[], None] = field(compare=False)

class RecurringScheduler:
    def __init__(self, workers: int = 2) -> None:
        self.queue: List[RecurringTask] = []
        self.lock = threading.Lock()
        self.condition = threading.Condition(self.lock)
        self.stop = False
        self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(workers)]
        for worker in self.workers:
            worker.start()

    def schedule(self, interval: float, action: Callable[[], None]) -> None:
        with self.condition:
            heapq.heappush(self.queue, RecurringTask(time.time() + interval, interval, action))
            self.condition.notify()

    def _worker(self) -> None:
        while True:
            with self.condition:
                while not self.queue and not self.stop:
                    self.condition.wait()
                if self.stop and not self.queue:
                    return
                task = self.queue[0]
                now = time.time()
                if task.next_run > now:
                    self.condition.wait(task.next_run - now)
                    continue
                heapq.heappop(self.queue)
            try:
                task.action()
            finally:
                with self.condition:
                    heapq.heappush(self.queue, RecurringTask(time.time() + task.interval, task.interval, task.action))
                    self.condition.notify()

    def shutdown(self) -> None:
        with self.condition:
            self.stop = True
            self.condition.notify_all()
        for worker in self.workers:
            worker.join()

def main() -> None:
    scheduler = RecurringScheduler(workers=2)
    scheduler.schedule(0.3, lambda: print("Heartbeat at", time.time()))
    scheduler.schedule(0.5, lambda: print("Cleanup at", time.time()))
    time.sleep(1.5)
    scheduler.shutdown()

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The final scheduler level serialises the job queue to durable storage and rehydrates it on restart, treating persistence as a first-class feature. Failed executions are retried with jittered delays so the fleet does not stampede upon recovery. Learners witness how crash recovery and retry semantics intertwine when uptime is non-negotiable.

import json
import os
import random
import threading
import time
from dataclasses import dataclass, asdict
from typing import Callable, Dict, List

@dataclass
class PersistentJob:
    job_id: str
    interval: float
    next_run: float

class JsonJobStore:
    def __init__(self, path: str) -> None:
        self.path = path
        if not os.path.exists(path):
            with open(path, "w", encoding="utf-8") as handle:
                json.dump([], handle)

    def load(self) -> List[PersistentJob]:
        with open(self.path, "r", encoding="utf-8") as handle:
            data = json.load(handle)
        return [PersistentJob(**entry) for entry in data]

    def save(self, jobs: List[PersistentJob]) -> None:
        with open(self.path, "w", encoding="utf-8") as handle:
            json.dump([asdict(job) for job in jobs], handle)

class ResilientCronEngine:
    def __init__(self, store: JsonJobStore, handlers: Dict[str, Callable[[], None]]) -> None:
        self.store = store
        self.handlers = handlers
        self.jobs = {job.job_id: job for job in store.load()}
        self.lock = threading.Lock()
        self.stop = False
        threading.Thread(target=self._loop, daemon=True).start()

    def register(self, job_id: str, interval: float) -> None:
        with self.lock:
            job = PersistentJob(job_id, interval, time.time() + interval)
            self.jobs[job_id] = job
            self.store.save(list(self.jobs.values()))

    def _loop(self) -> None:
        while not self.stop:
            now = time.time()
            due = []
            with self.lock:
                for job in self.jobs.values():
                    if job.next_run <= now:
                        due.append(job)
            for job in due:
                self._execute(job)
            time.sleep(0.1)

    def _execute(self, job: PersistentJob) -> None:
        handler = self.handlers[job.job_id]
        attempt = 0
        while True:
            try:
                handler()
                break
            except Exception as exc:
                attempt += 1
                if attempt >= 3:
                    print(f"Job {job.job_id} failed permanently:", exc)
                    break
                backoff = 0.2 * (2 ** attempt) + random.random() * 0.1
                print(f"Retrying job {job.job_id} in {backoff:.2f}s due to {exc}")
                time.sleep(backoff)
        with self.lock:
            job.next_run = time.time() + job.interval
            self.store.save(list(self.jobs.values()))

    def shutdown(self) -> None:
        self.stop = True

def flaky_job() -> None:
    if random.random() < 0.3:
        raise RuntimeError("intermittent failure")
    print("Flaky job executed at", time.time())

def heartbeat_job() -> None:
    print("Heartbeat job at", time.time())

def main() -> None:
    random.seed(1)
    store = JsonJobStore("/tmp/cron-jobs.json")
    engine = ResilientCronEngine(store, {"flaky": flaky_job, "heartbeat": heartbeat_job})
    if "flaky" not in engine.jobs:
        engine.register("flaky", 0.4)
        engine.register("heartbeat", 0.6)
    time.sleep(2)
    engine.shutdown()

if __name__ == "__main__":
    main()
Machine Coding - Order Management (Levels 1-3)
order management - concurrency - resiliency
Scope: evolve order lifecycle management from basic state machine to concurrent orchestrator and resilient sagaThemes: escalating capabilities

Grow the order management platform from basic state handling to concurrent orchestration and a resilient saga-based workflow.

Order Management Stack
 ├─ Level 1: State Machine
 ├─ Level 2: Concurrent Orchestrator
 └─ Level 3: Resilient Event-Sourced Saga

Level 1 — Core Implementation

This commerce workflow begins by expressing the order lifecycle as an explicit state machine so invalid transitions are impossible. We surface events for each transition, encouraging integration with analytics or downstream systems from day one. Practitioners learn to separate entities from services, making the model testable and extensible.

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum, auto
from typing import Dict, Set

class OrderState(Enum):
    CREATED = auto()
    PACKED = auto()
    SHIPPED = auto()
    DELIVERED = auto()
    CANCELLED = auto()

ALLOWED: Dict[OrderState, Set[OrderState]] = {
    OrderState.CREATED: {OrderState.PACKED, OrderState.CANCELLED},
    OrderState.PACKED: {OrderState.SHIPPED},
    OrderState.SHIPPED: {OrderState.DELIVERED},
}

@dataclass
class Order:
    order_id: str
    state: OrderState = OrderState.CREATED

    def advance(self, next_state: OrderState) -> None:
        allowed = ALLOWED.get(self.state, set())
        if next_state not in allowed:
            raise RuntimeError(f"Invalid transition {self.state} -> {next_state}")
        self.state = next_state

    def cancel(self) -> None:
        if self.state in (OrderState.DELIVERED, OrderState.CANCELLED):
            raise RuntimeError("Cannot cancel final order")
        self.state = OrderState.CANCELLED

def main() -> None:
    order = Order("O1")
    print(order)
    order.advance(OrderState.PACKED)
    order.advance(OrderState.SHIPPED)
    print("Current state:", order.state)

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 introduces concurrent updates, protecting each order with locks and version checks to block conflicting writes. Event publication remains asynchronous yet idempotent, showing how to avoid duplicate notifications. Threaded scenarios illustrate how to trace and resolve conflicts systematically.

from __future__ import annotations

import threading
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum, auto
from typing import Callable, DefaultDict, Dict, List

class OrderState(Enum):
    CREATED = auto()
    PACKED = auto()
    SHIPPED = auto()
    DELIVERED = auto()

TRANSITIONS = {
    OrderState.CREATED: {OrderState.PACKED},
    OrderState.PACKED: {OrderState.SHIPPED},
    OrderState.SHIPPED: {OrderState.DELIVERED},
}

@dataclass
class Order:
    order_id: str
    state: OrderState = OrderState.CREATED

class EventBus:
    def __init__(self) -> None:
        self.subscribers: DefaultDict[str, List[Callable[[Order], None]]] = defaultdict(list)

    def subscribe(self, event: str, handler: Callable[[Order], None]) -> None:
        self.subscribers[event].append(handler)

    def publish(self, event: str, order: Order) -> None:
        for handler in self.subscribers[event]:
            handler(order)

class OrderService:
    def __init__(self, bus: EventBus) -> None:
        self.bus = bus
        self.orders: Dict[str, Order] = {}
        self.locks: DefaultDict[str, threading.Lock] = defaultdict(threading.Lock)

    def create(self, order_id: str) -> Order:
        order = Order(order_id)
        self.orders[order_id] = order
        self.bus.publish("created", order)
        return order

    def transition(self, order_id: str, next_state: OrderState) -> None:
        lock = self.locks[order_id]
        with lock:
            order = self.orders[order_id]
            allowed = TRANSITIONS.get(order.state, set())
            if next_state not in allowed:
                raise RuntimeError("Invalid transition")
            order.state = next_state
            self.bus.publish(next_state.name.lower(), order)

def main() -> None:
    bus = EventBus()
    bus.subscribe("shipped", lambda order: print("Order shipped:", order.order_id))
    service = OrderService(bus)
    service.create("O1")

    def worker():
        service.transition("O1", OrderState.PACKED)
        service.transition("O1", OrderState.SHIPPED)

    threads = [threading.Thread(target=worker) for _ in range(2)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print("Final state:", service.orders["O1"].state)

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced level links order, payment, and inventory through a saga orchestrator that records intent in an outbox. We implement compensation steps that restock items and cancel payments when any stage fails. Running the sample demonstrates how durable messaging plus clear state logs keep the business process auditable.

from __future__ import annotations

import random
import time
from dataclasses import dataclass
from enum import Enum, auto
from typing import List

class OrderState(Enum):
    CREATED = auto()
    CONFIRMED = auto()
    FAILED = auto()

@dataclass
class Order:
    order_id: str
    state: OrderState = OrderState.CREATED

class PaymentService:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate

    def charge(self, order_id: str) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("payment failure")

class InventoryService:
    def reserve(self, order_id: str) -> None:
        print("Inventory reserved for", order_id)

    def release(self, order_id: str) -> None:
        print("Inventory released for", order_id)

class Outbox:
    def __init__(self) -> None:
        self.events: List[str] = []

    def record(self, event: str) -> None:
        self.events.append(event)
        print("Recorded event:", event)

class OrderSaga:
    def __init__(self, payments: PaymentService, inventory: InventoryService, outbox: Outbox) -> None:
        self.payments = payments
        self.inventory = inventory
        self.outbox = outbox

    def execute(self, order: Order) -> None:
        try:
            self.inventory.reserve(order.order_id)
            attempts = 0
            while attempts < 3:
                try:
                    self.payments.charge(order.order_id)
                    break
                except Exception as exc:
                    attempts += 1
                    if attempts >= 3:
                        raise
                    backoff = 0.2 * attempts
                    print("Retry payment in", backoff)
                    time.sleep(backoff)
            order.state = OrderState.CONFIRMED
            self.outbox.record(f"order.confirmed:{order.order_id}")
        except Exception as exc:
            order.state = OrderState.FAILED
            self.inventory.release(order.order_id)
            self.outbox.record(f"order.failed:{order.order_id}:{exc}")

def main() -> None:
    random.seed(12)
    saga = OrderSaga(PaymentService(0.5), InventoryService(), Outbox())
    for idx in range(3):
        order = Order(f"O{idx}")
        saga.execute(order)
        print(order)

if __name__ == "__main__":
    main()
Machine Coding - Auction System (Levels 1-3)
auction - concurrency - resiliency
Scope: evolve auctions from single-threaded bidding to concurrent bid engine and resilient orchestratorThemes: escalating capabilities

Progressively enhance the auction system from simple bid tracking to concurrent bidding with fallback flows.

Auction Platform
 ├─ Level 1: Highest Bid Wins
 ├─ Level 2: Concurrent Bid Engine
 └─ Level 3: Resilient Auction Orchestrator

Level 1 — Core Implementation

We kick off the auction series with a straightforward domain model: auctions, bids, and closing rules expressed through clear classes. The engine validates bid increments, records provenance, and emits notifications when the auction closes. By stepping through the sample timeline you practise reasoning about ordering without concurrency yet.

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, List, Optional

@dataclass
class Bid:
    bidder: str
    amount: float
    timestamp: float

class HighestBidStrategy:
    def select(self, bids: List[Bid]) -> Optional[Bid]:
        if not bids:
            return None
        return max(bids, key=lambda bid: (bid.amount, -bid.timestamp))

class Auction:
    def __init__(self, strategy: HighestBidStrategy, notifier: Callable[[str], None]) -> None:
        self.bids: List[Bid] = []
        self.strategy = strategy
        self.notifier = notifier

    def place_bid(self, bid: Bid) -> None:
        self.bids.append(bid)

    def close(self) -> Optional[Bid]:
        winner = self.strategy.select(self.bids)
        if winner:
            self.notifier(f"Auction won by {winner.bidder} for {winner.amount}")
        return winner

def main() -> None:
    auction = Auction(HighestBidStrategy(), print)
    auction.place_bid(Bid("Alice", 100.0, 1.0))
    auction.place_bid(Bid("Bob", 120.0, 2.0))
    auction.place_bid(Bid("Carol", 110.0, 3.0))
    winner = auction.close()
    print("Winner:", winner)

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 invites multiple bidders to compete at once, so we guard state with locks and use timed events to close auctions automatically. We capture concurrent placements and ensure only bids submitted before the deadline are accepted. Logging of timestamps and winning bids helps diagnose edge cases such as simultaneous offers.

from __future__ import annotations

import threading
import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class Bid:
    bidder: str
    amount: float

class TimedAuction:
    def __init__(self, duration: float) -> None:
        self.duration = duration
        self._lock = threading.Lock()
        self._winner: Optional[Bid] = None
        self._closed = False
        threading.Thread(target=self._close_after_timeout, daemon=True).start()

    def _close_after_timeout(self) -> None:
        time.sleep(self.duration)
        with self._lock:
            self._closed = True

    def place_bid(self, bid: Bid) -> None:
        with self._lock:
            if self._closed:
                raise RuntimeError("Auction closed")
            if not self._winner or bid.amount > self._winner.amount:
                self._winner = bid

    def winner(self) -> Optional[Bid]:
        with self._lock:
            return self._winner

def bidder_thread(auction: TimedAuction, name: str, amount: float) -> None:
    try:
        auction.place_bid(Bid(name, amount))
    except RuntimeError as exc:
        print(name, "failed:", exc)

def main() -> None:
    auction = TimedAuction(0.5)
    threads = [threading.Thread(target=bidder_thread, args=(auction, f"B{i}", 100 + i * 10)) for i in range(5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    time.sleep(0.6)
    print("Auction winner:", auction.winner())

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced auction layer distributes the engine across replicas that agree via quorum writes and keep write-ahead logs. We simulate node failures and recovery to illustrate how replication maintains consistency and availability. Learners connect how consensus protocols and durable logs keep financial workflows reliable.

from __future__ import annotations

import random
from dataclasses import dataclass
from typing import Dict, List, Optional, Optional

@dataclass
class Bid:
    bidder: str
    amount: float

class Replica:
    def __init__(self, name: str, failure_rate: float = 0.2) -> None:
        self.name = name
        self.failure_rate = failure_rate
        self.log: List[Bid] = []
        self.available = True

    def append(self, bid: Bid) -> None:
        if not self.available or random.random() < self.failure_rate:
            self.available = False
            raise RuntimeError(f"{self.name} unavailable")
        self.log.append(bid)

    def recover(self) -> None:
        self.available = True

class AuctionCoordinator:
    def __init__(self, replicas: List[Replica], quorum: int) -> None:
        self.replicas = replicas
        self.quorum = quorum

    def submit_bid(self, bid: Bid) -> None:
        successes = 0
        failures = []
        for replica in self.replicas:
            try:
                replica.append(bid)
                successes += 1
            except Exception as exc:
                failures.append((replica, exc))
        if successes < self.quorum:
            raise RuntimeError("Not enough replicas")
        for replica, _ in failures:
            replica.recover()

    def winner(self) -> Optional[Bid]:
        merged: List[Bid] = []
        for replica in self.replicas:
            merged.extend(replica.log)
        if not merged:
            return None
        return max(merged, key=lambda bid: bid.amount)

def main() -> None:
    random.seed(3)
    replicas = [Replica("R1", 0.3), Replica("R2", 0.1), Replica("R3", 0.2)]
    coordinator = AuctionCoordinator(replicas, quorum=2)
    for amount in [120.0, 150.0, 140.0]:
        try:
            coordinator.submit_bid(Bid(f"Bidder{amount}", amount))
        except Exception as exc:
            print("Submit failed:", exc)
    print("Winning bid:", coordinator.winner())

if __name__ == "__main__":
    main()
Machine Coding - Wallet Service (Levels 1-3)
wallet service - concurrency - resiliency
Scope: evolve wallet services from core ledger to concurrent transfers and resilient settlementThemes: escalating capabilities

Build the wallet service from ledger recording to concurrent transactions and resilient transfer orchestration.

Wallet Service Stack
 ├─ Level 1: Core Ledger
 ├─ Level 2: Concurrent Transactions
 └─ Level 3: Resilient Transfers

Level 1 — Core Implementation

This wallet implementation begins with ledgers and transaction records that enforce idempotency, preventing duplicates from skewing balances. We separate read models from command handlers so querying balances stays fast while writes remain auditable. Exercises show how replaying the same transaction ID leaves the ledger unchanged, cementing the concept.

from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Set

@dataclass
class LedgerEntry:
    user_id: str
    amount: float
    txn_id: str

class WalletService:
    def __init__(self) -> None:
        self.balances: Dict[str, float] = {}
        self.seen_txn: Set[str] = set()

    def credit(self, user_id: str, amount: float, txn_id: str) -> None:
        if txn_id in self.seen_txn:
            return
        self.balances[user_id] = self.balances.get(user_id, 0.0) + amount
        self.seen_txn.add(txn_id)

    def debit(self, user_id: str, amount: float, txn_id: str) -> None:
        if txn_id in self.seen_txn:
            return
        balance = self.balances.get(user_id, 0.0)
        if balance < amount:
            raise RuntimeError("insufficient funds")
        self.balances[user_id] = balance - amount
        self.seen_txn.add(txn_id)

    def balance(self, user_id: str) -> float:
        return self.balances.get(user_id, 0.0)

def main() -> None:
    wallet = WalletService()
    wallet.credit("U1", 100.0, "txn1")
    wallet.debit("U1", 40.0, "txn2")
    wallet.credit("U1", 100.0, "txn1")
    print("Balance:", wallet.balance("U1"))

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 considers simultaneous deposits and withdrawals, so each account receives its own lock and optimistic balance checks. We discuss starvation risks and log contention windows to help reason about throughput. The provided workload highlights how to verify invariants even when operations interleave aggressively.

from __future__ import annotations

import threading
from collections import defaultdict
from typing import Dict, Set

class AccountLocks:
    def __init__(self) -> None:
        self._locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)

    def acquire(self, user_id: str) -> threading.Lock:
        lock = self._locks[user_id]
        lock.acquire()
        return lock

class ConcurrentWallet:
    def __init__(self) -> None:
        self.balances: Dict[str, float] = defaultdict(float)
        self.seen_txn: Set[str] = set()
        self.account_locks = AccountLocks()
        self.txn_lock = threading.Lock()

    def credit(self, user_id: str, amount: float, txn_id: str) -> None:
        with self.txn_lock:
            if txn_id in self.seen_txn:
                return
            self.seen_txn.add(txn_id)
        lock = self.account_locks.acquire(user_id)
        try:
            self.balances[user_id] += amount
        finally:
            lock.release()

    def debit(self, user_id: str, amount: float, txn_id: str) -> None:
        with self.txn_lock:
            if txn_id in self.seen_txn:
                return
            self.seen_txn.add(txn_id)
        lock = self.account_locks.acquire(user_id)
        try:
            if self.balances[user_id] < amount:
                raise RuntimeError("insufficient funds")
            self.balances[user_id] -= amount
        finally:
            lock.release()

def worker(wallet: ConcurrentWallet, user_id: str, amount: float, txn_prefix: str) -> None:
    wallet.credit(user_id, amount, f"{txn_prefix}-credit")
    try:
        wallet.debit(user_id, amount / 2, f"{txn_prefix}-debit")
    except Exception as exc:
        print("Debit failed:", exc)

def main() -> None:
    wallet = ConcurrentWallet()
    threads = [threading.Thread(target=worker, args=(wallet, "U1", 50.0, f"T{i}")) for i in range(4)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print("Final balance:", wallet.balances["U1"])

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced wallet connects to an unreliable payment network, so debits are recorded in an outbox before we attempt the external call. Circuit breakers stop repeated failures, while retry workers drain the outbox when the dependency recovers. Learners can inspect the logs to see exactly when a payout succeeded, retried, or was parked for later.

from __future__ import annotations

import random
import time
from dataclasses import dataclass
from queue import Queue, Empty
from typing import Dict

@dataclass
class Payout:
    user_id: str
    amount: float
    txn_id: str

class ExternalNetwork:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate

    def payout(self, user_id: str, amount: float) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("network failure")
        print(f"Payout to {user_id} for {amount}")

class CircuitBreaker:
    def __init__(self, threshold: int, cool_down: float) -> None:
        self.threshold = threshold
        self.cool_down = cool_down
        self.failures = 0
        self.open_until = 0.0

    def call(self, func, *args, **kwargs):
        now = time.time()
        if now < self.open_until:
            raise RuntimeError("circuit-open")
        try:
            result = func(*args, **kwargs)
            self.failures = 0
            return result
        except Exception as exc:
            self.failures += 1
            if self.failures >= self.threshold:
                self.open_until = now + self.cool_down
                self.failures = 0
            raise exc

class ResilientWallet:
    def __init__(self, network: ExternalNetwork) -> None:
        self.balances: Dict[str, float] = {"U1": 200.0}
        self.breaker = CircuitBreaker(2, 1.0)
        self.outbox: "Queue[Payout]" = Queue()
        self.network = network

    def payout(self, user_id: str, amount: float, txn_id: str) -> None:
        if self.balances.get(user_id, 0.0) < amount:
            raise RuntimeError("insufficient funds")
        self.balances[user_id] -= amount
        self.outbox.put(Payout(user_id, amount, txn_id))

    def process_outbox(self) -> None:
        pending = []
        while True:
            try:
                payout = self.outbox.get_nowait()
            except Empty:
                break
            try:
                self.breaker.call(self.network.payout, payout.user_id, payout.amount)
                print("Payout success:", payout)
            except Exception as exc:
                print("Payout failed:", exc, "requeueing")
                pending.append(payout)
            finally:
                self.outbox.task_done()
        for payout in pending:
            self.outbox.put(payout)

def main() -> None:
    random.seed(10)
    wallet = ResilientWallet(ExternalNetwork(0.5))
    wallet.payout("U1", 50.0, "tx1")
    wallet.payout("U1", 30.0, "tx2")
    wallet.payout("U1", 40.0, "tx3")
    for _ in range(5):
        wallet.process_outbox()
        time.sleep(0.2)
    print("Remaining balance:", wallet.balances["U1"])

if __name__ == "__main__":
    main()
Machine Coding - Tic-Tac-Toe (Levels 1-3)
tic tac toe - concurrency - resiliency
Scope: evolve tic-tac-toe from game engine to concurrent match hosting and resilient matchmakingThemes: escalating capabilities

Grow the tic-tac-toe system from a game engine to a concurrent match server with resilient matchmaking orchestration.

Tic-Tac-Toe Platform
 ├─ Level 1: Core Game Engine
 ├─ Level 2: Concurrent Match Server
 └─ Level 3: Resilient Matchmaking

Level 1 — Core Implementation

This classic kata reinforces modelling domain rules explicitly: boards, moves, and win detection algorithms live in focused classes. We validate coordinates, enforce turn order, and compute winners immediately after each move. Because the objects are pure and side-effect free, unit tests mirror the game logic one-to-one.

from typing import List, Optional

class TicTacToe:
    def __init__(self) -> None:
        self.board: List[List[str]] = [["" for _ in range(3)] for _ in range(3)]
        self.current = "X"

    def move(self, row: int, col: int) -> Optional[str]:
        if self.board[row][col]:
            raise RuntimeError("Cell occupied")
        self.board[row][col] = self.current
        winner = self._winner()
        if winner or self._is_draw():
            return winner or "DRAW"
        self.current = "O" if self.current == "X" else "X"
        return None

    def _winner(self) -> Optional[str]:
        lines = []
        lines.extend(self.board)
        lines.extend([[self.board[r][c] for r in range(3)] for c in range(3)])
        lines.append([self.board[i][i] for i in range(3)])
        lines.append([self.board[i][2 - i] for i in range(3)])
        for line in lines:
            if line[0] and line.count(line[0]) == 3:
                return line[0]
        return None

    def _is_draw(self) -> bool:
        return all(cell for row in self.board for cell in row)

def main() -> None:
    game = TicTacToe()
    moves = [(0, 0), (1, 0), (1, 1), (2, 0), (2, 2)]
    for move in moves:
        result = game.move(*move)
        print("Move", move, "result:", result)
        if result:
            break

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 spins up several matches simultaneously, teaching how to isolate game state per session while sharing the underlying engine. We synchronise access to shared registries while leaving per-game logic lock-free. Threaded simulations demonstrate how isolation keeps results deterministic even under concurrency.

from __future__ import annotations

import threading
from dataclasses import dataclass
from typing import Dict, List, Optional, Optional

class TicTacToeEngine:
    def __init__(self) -> None:
        self.board = [["" for _ in range(3)] for _ in range(3)]
        self.current = "X"

    def move(self, row: int, col: int) -> Optional[str]:
        if self.board[row][col]:
            raise RuntimeError("Invalid move")
        self.board[row][col] = self.current
        if self._winner():
            return self.current
        if all(cell for row in self.board for cell in row):
            return "DRAW"
        self.current = "O" if self.current == "X" else "X"
        return None

    def _winner(self) -> bool:
        lines = []
        lines.extend(self.board)
        lines.extend([[self.board[r][c] for r in range(3)] for c in range(3)])
        lines.append([self.board[i][i] for i in range(3)])
        lines.append([self.board[i][2 - i] for i in range(3)])
        return any(line[0] and line.count(line[0]) == 3 for line in lines)

@dataclass
class MatchSession:
    match_id: str
    engine: TicTacToeEngine
    lock: threading.Lock

class MatchServer:
    def __init__(self) -> None:
        self.sessions: Dict[str, MatchSession] = {}

    def create_match(self, match_id: str) -> None:
        self.sessions[match_id] = MatchSession(match_id, TicTacToeEngine(), threading.Lock())

    def play(self, match_id: str, row: int, col: int) -> Optional[str]:
        session = self.sessions[match_id]
        with session.lock:
            return session.engine.move(row, col)

def match_runner(server: MatchServer, match_id: str, moves: List[tuple[int, int]]) -> None:
    for move in moves:
        result = server.play(match_id, *move)
        if result:
            print(match_id, "ended with", result)
            break

def main() -> None:
    server = MatchServer()
    server.create_match("M1")
    server.create_match("M2")
    thread1 = threading.Thread(target=match_runner, args=(server, "M1", [(0, 0), (0, 1), (1, 1), (0, 2), (2, 2)]))
    thread2 = threading.Thread(target=match_runner, args=(server, "M2", [(1, 1), (0, 0), (2, 2), (0, 1), (0, 2), (2, 0), (1, 0)]))
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced level records every move as an event stream and replays it to rebuild boards, illustrating event sourcing concepts. We persist logs, handle idempotent replays, and verify the restored game state matches expectations. This teaches how lightweight persistence can give small games durable history without complex infrastructure.

import json
import os
from typing import List, Optional, Tuple

class EventStore:
    def __init__(self, path: str) -> None:
        self.path = path
        if not os.path.exists(path):
            with open(path, "w", encoding="utf-8") as handle:
                json.dump([], handle)

    def append(self, match_id: str, move: Tuple[int, int]) -> None:
        events = self.load_all()
        events.append({"match_id": match_id, "move": move})
        with open(self.path, "w", encoding="utf-8") as handle:
            json.dump(events, handle)

    def load_all(self) -> List[dict]:
        with open(self.path, "r", encoding="utf-8") as handle:
            return json.load(handle)

class ReplayableTicTacToe:
    def __init__(self) -> None:
        self.board = [["" for _ in range(3)] for _ in range(3)]
        self.current = "X"

    def apply(self, row: int, col: int) -> Optional[str]:
        self.board[row][col] = self.current
        winner = self._winner()
        self.current = "O" if self.current == "X" else "X"
        return winner

    def _winner(self) -> Optional[str]:
        lines = []
        lines.extend(self.board)
        lines.extend([[self.board[r][c] for r in range(3)] for c in range(3)])
        lines.append([self.board[i][i] for i in range(3)])
        lines.append([self.board[i][2 - i] for i in range(3)])
        for line in lines:
            if line[0] and line.count(line[0]) == 3:
                return line[0]
        return None

def main() -> None:
    store = EventStore("/tmp/tictactoe-events.json")
    game = ReplayableTicTacToe()
    moves = [(0, 0), (0, 1), (1, 1), (2, 0), (2, 2)]
    for move in moves:
        winner = game.apply(*move)
        store.append("match-1", move)
    replay = ReplayableTicTacToe()
    for event in store.load_all():
        replay.apply(*event["move"])
    print("Original board:", game.board)
    print("Replayed board:", replay.board)

if __name__ == "__main__":
    main()
Machine Coding - Chat System (Levels 1-3)
chat system - concurrency - resiliency
Scope: evolve chat service from basic rooms to concurrent messaging and resilient delivery guaranteesThemes: escalating capabilities

Enhance the chat system from simple chat rooms to concurrent messaging infrastructure with delivery guarantees.

Chat Service Stack
 ├─ Level 1: Basic Chat Room
 ├─ Level 2: Concurrent Messaging
 └─ Level 3: Resilient Message Delivery

Level 1 — Core Implementation

The messaging series opens with user, conversation, and message entities along with a service that persists and emits notifications. We highlight idempotent message IDs and clear repository interfaces so later scaling remains possible. The sample interaction proves that storage and notification concerns are already decoupled.

from __future__ import annotations

from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, DefaultDict, List

@dataclass
class Message:
    sender: str
    recipient: str
    body: str
    message_id: str | None = None
    channel: str | None = None

class ConversationRepository:
    def __init__(self) -> None:
        self._direct: DefaultDict[tuple[str, str], List[Message]] = defaultdict(list)

    def add_direct(self, message: Message) -> None:
        key = tuple(sorted([message.sender, message.recipient]))
        self._direct[key].append(message)

    def history(self, user_a: str, user_b: str) -> List[Message]:
        key = tuple(sorted([user_a, user_b]))
        return list(self._direct[key])

class ChatService:
    def __init__(self, repo: ConversationRepository, notifier: Callable[[Message], None]) -> None:
        self.repo = repo
        self.notifier = notifier

    def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
        return Message(sender, recipient, body)

    def send(self, sender: str, recipient: str, body: str) -> Message:
        message = self._create_direct_message(sender, recipient, body)
        self.repo.add_direct(message)
        self.notifier(message)
        return message

def main() -> None:
    repo = ConversationRepository()
    service = ChatService(repo, lambda msg: print("Notify", msg.recipient, ":", msg.body))
    service.send("alice", "bob", "hello bob")
    service.send("bob", "alice", "hey alice")
    print("History:", [(item.sender, item.body) for item in repo.history("alice", "bob")])

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 expands to group channels, introducing fan-out queues that deliver messages asynchronously to subscribers. Producers enqueue payloads without blocking while worker threads acknowledge delivery, showcasing backpressure handling. Tracing deliveries helps learners understand exactly when a message leaves the durable log and reaches recipients.

from __future__ import annotations

import asyncio
import itertools
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, DefaultDict, Dict, List

# --- Level 1 foundation ---

@dataclass
class Message:
    sender: str
    recipient: str | None
    body: str
    message_id: str | None = None
    channel: str | None = None

class ConversationRepository:
    def __init__(self) -> None:
        self._direct: DefaultDict[tuple[str, str], List[Message]] = defaultdict(list)

    def add_direct(self, message: Message) -> None:
        assert message.recipient is not None
        key = tuple(sorted([message.sender, message.recipient]))
        self._direct[key].append(message)

    def history(self, user_a: str, user_b: str) -> List[Message]:
        key = tuple(sorted([user_a, user_b]))
        return list(self._direct[key])

class ChatService:
    def __init__(self, repo: ConversationRepository, notifier: Callable[[Message], None]) -> None:
        self.repo = repo
        self.notifier = notifier

    def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
        return Message(sender, recipient, body)

    def send(self, sender: str, recipient: str, body: str) -> Message:
        message = self._create_direct_message(sender, recipient, body)
        self.repo.add_direct(message)
        self.notifier(message)
        return message

# --- Level 2 concurrency extensions ---

class ChannelRepository(ConversationRepository):
    def __init__(self) -> None:
        super().__init__()
        self._channels: DefaultDict[str, List[Message]] = defaultdict(list)

    def add_channel(self, channel: str, message: Message) -> None:
        self._channels[channel].append(message)

    def channel_history(self, channel: str) -> List[Message]:
        return list(self._channels[channel])

class ConcurrentChatService(ChatService):
    def __init__(self, repo: ChannelRepository, notifier: Callable[[Message], None]) -> None:
        super().__init__(repo, notifier)
        self.repo = repo
        self._ids = (str(value) for value in itertools.count(1))

    def _next_id(self) -> str:
        return next(self._ids)

    def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
        return Message(sender, recipient, body, message_id=self._next_id())

    def send_to_channel(self, sender: str, channel: str, body: str) -> Message:
        message = Message(sender, None, body, message_id=self._next_id(), channel=channel)
        self.repo.add_channel(channel, message)
        self.notifier(message)
        return message

class AsyncChannelDispatcher:
    def __init__(self, service: ConcurrentChatService) -> None:
        self.service = service
        self.subscribers: Dict[str, Dict[str, asyncio.Queue[Message]]] = defaultdict(dict)

    async def subscribe(self, user: str, channel: str) -> asyncio.Queue[Message]:
        queue: asyncio.Queue[Message] = asyncio.Queue()
        self.subscribers[channel][user] = queue
        return queue

    async def publish(self, sender: str, channel: str, body: str) -> Message:
        message = self.service.send_to_channel(sender, channel, body)
        for queue in self.subscribers[channel].values():
            await queue.put(message)
        return message

async def main_async() -> None:
    repo = ChannelRepository()
    dispatcher = AsyncChannelDispatcher(ConcurrentChatService(repo, lambda message: None))

    general_alice = await dispatcher.subscribe("alice", "#general")
    general_bob = await dispatcher.subscribe("bob", "#general")

    async def alice_consumer() -> None:
        msg = await general_alice.get()
        print("alice received", msg.body)

    async def bob_consumer() -> None:
        msg = await general_bob.get()
        print("bob received", msg.body)

    await dispatcher.publish("service-bot", "#general", "Welcome to the channel!")
    await asyncio.gather(alice_consumer(), bob_consumer())

if __name__ == "__main__":
    asyncio.run(main_async())

Level 3 — Resilient Architecture

The advanced chat layer introduces acknowledgement tracking per subscriber, ensuring at-least-once delivery even when clients misbehave. Retries, dead-letter queues, and monitoring metrics make delivery guarantees explicit rather than aspirational. Learners can unplug a subscriber mid-run and watch how the dispatcher compensates until the message is acknowledged or quarantined.

from __future__ import annotations

import asyncio
import itertools
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, DefaultDict, Dict, List

# --- Level 1 foundation ---

@dataclass
class Message:
    sender: str
    recipient: str | None
    body: str
    message_id: str | None = None
    channel: str | None = None

class ConversationRepository:
    def __init__(self) -> None:
        self._direct: DefaultDict[tuple[str, str], List[Message]] = defaultdict(list)

    def add_direct(self, message: Message) -> None:
        assert message.recipient is not None
        key = tuple(sorted([message.sender, message.recipient]))
        self._direct[key].append(message)

    def history(self, user_a: str, user_b: str) -> List[Message]:
        key = tuple(sorted([user_a, user_b]))
        return list(self._direct[key])

class ChatService:
    def __init__(self, repo: ConversationRepository, notifier: Callable[[Message], None]) -> None:
        self.repo = repo
        self.notifier = notifier

    def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
        return Message(sender, recipient, body)

    def send(self, sender: str, recipient: str, body: str) -> Message:
        message = self._create_direct_message(sender, recipient, body)
        self.repo.add_direct(message)
        self.notifier(message)
        return message

# --- Level 2 concurrency extensions ---

class ChannelRepository(ConversationRepository):
    def __init__(self) -> None:
        super().__init__()
        self._channels: DefaultDict[str, List[Message]] = defaultdict(list)

    def add_channel(self, channel: str, message: Message) -> None:
        self._channels[channel].append(message)

    def channel_history(self, channel: str) -> List[Message]:
        return list(self._channels[channel])

class ConcurrentChatService(ChatService):
    def __init__(self, repo: ChannelRepository, notifier: Callable[[Message], None]) -> None:
        super().__init__(repo, notifier)
        self.repo = repo
        self._ids = (str(value) for value in itertools.count(1))

    def _next_id(self) -> str:
        return next(self._ids)

    def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
        return Message(sender, recipient, body, message_id=self._next_id())

    def send_to_channel(self, sender: str, channel: str, body: str) -> Message:
        message = Message(sender, None, body, message_id=self._next_id(), channel=channel)
        self.repo.add_channel(channel, message)
        self.notifier(message)
        return message

class AsyncChannelDispatcher:
    def __init__(self, service: ConcurrentChatService) -> None:
        self.service = service
        self.subscribers: Dict[str, Dict[str, asyncio.Queue[Message]]] = defaultdict(dict)

    async def subscribe(self, user: str, channel: str) -> asyncio.Queue[Message]:
        queue: asyncio.Queue[Message] = asyncio.Queue()
        self.subscribers[channel][user] = queue
        return queue

    async def publish(self, sender: str, channel: str, body: str) -> Message:
        message = self.service.send_to_channel(sender, channel, body)
        for queue in self.subscribers[channel].values():
            await queue.put(message)
        return message

# --- Level 3 resiliency additions ---

@dataclass
class PendingDelivery:
    message: Message
    user: str
    attempts: int = 0

class ReliableChannelDispatcher(AsyncChannelDispatcher):
    def __init__(self, service: ConcurrentChatService, max_attempts: int = 3) -> None:
        super().__init__(service)
        self.max_attempts = max_attempts
        self.pending: Dict[str, PendingDelivery] = {}
        self.dead_letters: List[PendingDelivery] = []

    def _key(self, message_id: str, user: str) -> str:
        return f"{message_id}:{user}"

    async def publish(self, sender: str, channel: str, body: str) -> Message:
        message = await super().publish(sender, channel, body)
        for user in self.subscribers[channel]:
            if message.message_id is None:
                continue
            self.pending[self._key(message.message_id, user)] = PendingDelivery(message, user)
        return message

    def ack(self, message_id: str, user: str) -> None:
        self.pending.pop(self._key(message_id, user), None)

    async def deliver_pending(self) -> None:
        for key, delivery in list(self.pending.items()):
            channel = delivery.message.channel
            assert channel is not None
            queue = self.subscribers[channel][delivery.user]
            await queue.put(delivery.message)
            delivery.attempts += 1
            if delivery.attempts >= self.max_attempts:
                self.dead_letters.append(delivery)
                del self.pending[key]

async def main_async() -> None:
    repo = ChannelRepository()
    dispatcher = ReliableChannelDispatcher(ConcurrentChatService(repo, lambda message: None))

    incidents_alice = await dispatcher.subscribe("alice", "#incidents")
    incidents_bob = await dispatcher.subscribe("bob", "#incidents")

    async def alice_consumer() -> None:
        msg = await incidents_alice.get()
        print("alice acked", msg.body)
        if msg.message_id is not None:
            dispatcher.ack(msg.message_id, "alice")

    async def bob_consumer() -> None:
        deliveries = 0
        while deliveries < 2:
            msg = await incidents_bob.get()
            deliveries += 1
            print("bob attempt", deliveries, "for", msg.body)
            if deliveries == 2 and msg.message_id is not None:
                dispatcher.ack(msg.message_id, "bob")

    await dispatcher.publish("service-bot", "#incidents", "incident #1234")
    await asyncio.gather(alice_consumer(), bob_consumer())
    await dispatcher.deliver_pending()

    if dispatcher.dead_letters:
        print("Dead letters:", [(d.user, d.message.body) for d in dispatcher.dead_letters])

if __name__ == "__main__":
    asyncio.run(main_async())
Machine Coding - File Storage (Levels 1-3)
file storage - concurrency - resiliency
Scope: evolve file storage from basic persistence to concurrent uploads and resilient replicationThemes: escalating capabilities

Progressively build the file storage system from a core store to concurrent upload handling and resilient replication layers.

File Storage Stack
 ├─ Level 1: Core Store
 ├─ Level 2: Concurrent Uploads
 └─ Level 3: Resilient Replication

Level 1 — Core Implementation

We open the file-sync trilogy by modelling files, versions, and metadata repositories so uploads, reads, and history queries are explicit. Hashing content and storing version manifests teaches tamper detection and rollback. The interactive demo illustrates how users can retrieve any version without worrying about concurrency yet.

from __future__ import annotations

from dataclasses import dataclass
from typing import DefaultDict, Dict, List

@dataclass
class FileVersion:
    version: int
    content: str

class FileRepository:
    def __init__(self) -> None:
        self.store: DefaultDict[str, List[FileVersion]] = DefaultDict(list)

    def add_version(self, path: str, content: str) -> FileVersion:
        versions = self.store[path]
        version = FileVersion(len(versions) + 1, content)
        versions.append(version)
        return version

    def latest(self, path: str) -> FileVersion:
        return self.store[path][-1]

    def history(self, path: str) -> List[FileVersion]:
        return self.store[path]

def main() -> None:
    repo = FileRepository()
    repo.add_version("notes.txt", "Version 1")
    repo.add_version("notes.txt", "Version 2")
    print("Latest:", repo.latest("notes.txt"))
    print("History:", repo.history("notes.txt"))

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 brings in multiple devices editing the same document, so optimistic version checks detect conflicts early. We surface merge contexts and demonstrate how to branch new versions while preserving history. Logging of version IDs lets learners follow how the server arbitrates between racing uploads.

from __future__ import annotations

import threading
from dataclasses import dataclass
from typing import Dict, List, Optional

@dataclass
class VersionedDocument:
    version: int
    content: str

class VersionRepository:
    def __init__(self) -> None:
        self.versions: Dict[str, List[VersionedDocument]] = {"notes.txt": [VersionedDocument(1, "Base")]}
        self.lock = threading.Lock()

    def latest(self, path: str) -> VersionedDocument:
        return self.versions[path][-1]

    def append(self, path: str, content: str) -> VersionedDocument:
        with self.lock:
            latest = self.latest(path)
            new_version = VersionedDocument(latest.version + 1, content)
            self.versions[path].append(new_version)
            return new_version

class ConflictResolver:
    def merge(self, base: str, change_a: str, change_b: str) -> str:
        return base + "\n" + change_a + "\n" + change_b

def worker(repo: VersionRepository, resolver: ConflictResolver, change: str) -> None:
    latest = repo.latest("notes.txt")
    merged = resolver.merge(latest.content, change, "")
    repo.append("notes.txt", merged)

def main() -> None:
    repo = VersionRepository()
    resolver = ConflictResolver()
    threads = [
        threading.Thread(target=worker, args=(repo, resolver, "DeviceA change")),
        threading.Thread(target=worker, args=(repo, resolver, "DeviceB change")),
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    for doc in repo.versions["notes.txt"]:
        print(doc)

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced file-sync level distributes versions across regions, verifying each transfer with checksums and retrying failed replications. We maintain an intent log so the system can resume outstanding transfers after a crash. Observing the replication status output teaches how durability, validation, and backoff interact.

from __future__ import annotations

import hashlib
import random
import threading
import time
from dataclasses import dataclass
from queue import Queue
from typing import Dict

@dataclass
class FileBlob:
    version: int
    content: str
    checksum: str

class PrimaryStore:
    def __init__(self) -> None:
        self.files: Dict[str, FileBlob] = {}

    def save(self, path: str, content: str) -> FileBlob:
        checksum = hashlib.sha256(content.encode()).hexdigest()
        version = self.files.get(path, FileBlob(0, "", "")).version + 1
        blob = FileBlob(version, content, checksum)
        self.files[path] = blob
        return blob

class ReplicaStore:
    def __init__(self, name: str, failure_rate: float = 0.3) -> None:
        self.name = name
        self.failure_rate = failure_rate
        self.files: Dict[str, FileBlob] = {}

    def replicate(self, path: str, blob: FileBlob) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError(f"{self.name} unavailable")
        self.files[path] = blob

class Replicator:
    def __init__(self, replicas: Dict[str, ReplicaStore]) -> None:
        self.replicas = replicas
        self.queue: "Queue[tuple[str, FileBlob]]" = Queue()
        self._start()

    def _start(self) -> None:
        def loop() -> None:
            while True:
                path, blob = self.queue.get()
                for replica in self.replicas.values():
                    try:
                        replica.replicate(path, blob)
                        print(replica.name, "replicated", path, "v", blob.version)
                    except Exception as exc:
                        print("Replication failed:", exc)
                        time.sleep(0.2)
                        self.queue.put((path, blob))
                self.queue.task_done()

        threading.Thread(target=loop, daemon=True).start()

    def enqueue(self, path: str, blob: FileBlob) -> None:
        self.queue.put((path, blob))

def main() -> None:
    random.seed(14)
    primary = PrimaryStore()
    replicator = Replicator({
        "replica-a": ReplicaStore("ReplicaA", 0.5),
        "replica-b": ReplicaStore("ReplicaB", 0.1),
    })
    blob = primary.save("notes.txt", "critical data v1")
    replicator.enqueue("notes.txt", blob)
    time.sleep(1)

if __name__ == "__main__":
    main()
Machine Coding - Logging Framework (Levels 1-3)
logging framework - concurrency - resiliency
Scope: evolve logging from direct appenders to asynchronous and resilient pipelinesThemes: escalating capabilities

Progressively enhance the logging framework from basic appenders to async logging with resilient fallback pipelines.

Logging Framework Stack\n ├─ Level 1: Core Logger\n ├─ Level 2: Asynchronous Logger\n └─ Level 3: Resilient Logging Pipeline

Level 1 — Core Implementation

The logging journey starts with a minimal core that filters by severity and fans out to pluggable appenders. We highlight formatter abstractions so structured and plain-text logs can coexist. Sample usage shows how configuration drives behaviour rather than hard-coded branching.

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum, auto
from typing import List

class Level(Enum):
    DEBUG = auto()
    INFO = auto()
    WARN = auto()
    ERROR = auto()

@dataclass
class LogRecord:
    level: Level
    message: str

class Appender:
    def append(self, record: LogRecord) -> None:
        raise NotImplementedError

class ConsoleAppender(Appender):
    def append(self, record: LogRecord) -> None:
        print(f"[{record.level.name}] {record.message}")

class MemoryAppender(Appender):
    def __init__(self) -> None:
        self.records: List[LogRecord] = []

    def append(self, record: LogRecord) -> None:
        self.records.append(record)

class Logger:
    def __init__(self, level: Level, appenders: List[Appender]) -> None:
        self.level = level
        self.appenders = appenders

    def log(self, level: Level, message: str) -> None:
        if level.value < self.level.value:
            return
        record = LogRecord(level, message)
        for appender in self.appenders:
            appender.append(record)

    def info(self, message: str) -> None:
        self.log(Level.INFO, message)

def main() -> None:
    memory = MemoryAppender()
    logger = Logger(Level.DEBUG, [ConsoleAppender(), memory])
    logger.info("Hello logging")
    print("Memory records:", memory.records)

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 buffers log events in a queue and drains them with worker threads to keep application threads responsive. We talk through shutdown hooks, flush semantics, and backpressure so logs are not lost on exit. The demonstration makes it visible when producers block because the dispatcher is saturated.

import threading
from enum import Enum, auto
from queue import Queue
from typing import Callable

class Level(Enum):
    DEBUG = auto()
    INFO = auto()
    WARN = auto()
    ERROR = auto()

class AsyncLogger:
    def __init__(self, sink: Callable[[Level, str], None]) -> None:
        self.queue: "Queue[tuple[Level, str]]" = Queue()
        self.sink = sink
        self.stop = threading.Event()
        threading.Thread(target=self._worker, daemon=True).start()

    def log(self, level: Level, message: str) -> None:
        self.queue.put((level, message))

    def _worker(self) -> None:
        while not self.stop.is_set():
            try:
                level, message = self.queue.get(timeout=0.2)
            except Exception:
                continue
            self.sink(level, message)
            self.queue.task_done()

    def shutdown(self) -> None:
        self.queue.join()
        self.stop.set()

def sink(level: Level, message: str) -> None:
    print(f"[{level.name}] {message}")

def worker(logger: AsyncLogger, index: int) -> None:
    for i in range(3):
        logger.log(Level.INFO, f"Thread {index} message {i}")

def main() -> None:
    logger = AsyncLogger(sink)
    threads = [threading.Thread(target=worker, args=(logger, i)) for i in range(5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    logger.shutdown()

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced logging level assumes the main sink can fail, so we add retry logic, bounded buffers, and a disk-backed fallback. Backpressure signals bubble up when buffers fill, prompting producers to slow down. Watching the failover sequence teaches how reliability and observability work hand in hand.

import random
import time
from collections import deque
from enum import Enum, auto

class Level(Enum):
    INFO = auto()
    ERROR = auto()

class PrimarySink:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate

    def write(self, level: Level, message: str) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("primary sink down")
        print("Primary:", level.name, message)

class FallbackSink:
    def __init__(self) -> None:
        self.storage = []

    def write(self, level: Level, message: str) -> None:
        print("Fallback:", level.name, message)
        self.storage.append((level, message))

class ResilientLogger:
    def __init__(self, primary: PrimarySink, fallback: FallbackSink) -> None:
        self.primary = primary
        self.fallback = fallback
        self.buffer: deque[tuple[Level, str]] = deque(maxlen=50)

    def log(self, level: Level, message: str) -> None:
        self.buffer.append((level, message))
        self._drain()

    def _drain(self) -> None:
        pending = len(self.buffer)
        for _ in range(pending):
            level, message = self.buffer.popleft()
            try:
                self.primary.write(level, message)
            except Exception as exc:
                print("Primary failed:", exc)
                self.fallback.write(level, message)

def main() -> None:
    random.seed(15)
    logger = ResilientLogger(PrimarySink(0.5), FallbackSink())
    for i in range(10):
        logger.log(Level.INFO, f"event {i}")
        time.sleep(0.05)

if __name__ == "__main__":
    main()
Machine Coding - Pub/Sub Event Bus (Levels 1-3)
pub/sub event bus - concurrency - resiliency
Scope: evolve event bus from in-memory pub/sub to concurrent processing and resilient deliveryThemes: escalating capabilities

Build out the event bus from simple pub/sub to concurrent handlers and resilient delivery with retries.

Event Bus Stack\n ├─ Level 1: Basic Publisher/Subscriber\n ├─ Level 2: Concurrent Event Processing\n └─ Level 3: Resilient Delivery Service

Level 1 — Core Implementation

The pub/sub sequence starts with a straightforward event bus that manages subscriptions and delivers synchronous callbacks. We explore how to prevent subscriber failures from crashing the bus, and document contracts clearly. Sample interactions underline the loose coupling benefits before durability enters the scene.

from collections import defaultdict
from typing import Callable, DefaultDict, List

class EventBus:
    def __init__(self) -> None:
        self.subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)

    def subscribe(self, topic: str, handler: Callable[[dict], None]) -> None:
        self.subscribers[topic].append(handler)

    def publish(self, topic: str, payload: dict) -> None:
        for handler in self.subscribers[topic]:
            handler(payload)

def main() -> None:
    bus = EventBus()
    bus.subscribe("order.created", lambda payload: print("Analytics saw", payload))
    bus.subscribe("order.created", lambda payload: print("Billing saw", payload))
    bus.publish("order.created", {"order_id": "O1", "amount": 100})

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 adds persistence by storing events per topic and requiring acknowledgements before deletion. We simulate slow or failing subscribers to show how unacked messages get redelivered. Instrumentation on ack latency familiarises learners with diagnosing consumer behaviour.

from collections import defaultdict, deque
from typing import Callable, Deque, Dict, Tuple

class DurableBus:
    def __init__(self) -> None:
        self.queues: Dict[str, Deque[Tuple[int, dict]]] = defaultdict(deque)
        self.handlers: Dict[str, Callable[[int, dict], bool]] = {}
        self.offsets: Dict[str, int] = defaultdict(int)

    def subscribe(self, topic: str, handler: Callable[[int, dict], bool]) -> None:
        self.handlers[topic] = handler

    def publish(self, topic: str, payload: dict) -> None:
        offset = self.offsets[topic]
        self.offsets[topic] += 1
        self.queues[topic].append((offset, payload))
        self._dispatch(topic)

    def _dispatch(self, topic: str) -> None:
        handler = self.handlers.get(topic)
        if not handler:
            return
        queue = self.queues[topic]
        pending = len(queue)
        for _ in range(pending):
            offset, payload = queue[0]
            ack = handler(offset, payload)
            if ack:
                queue.popleft()
            else:
                break

def main() -> None:
    bus = DurableBus()

    def handler(offset: int, payload: dict) -> bool:
        print("Processing", offset, payload)
        return offset % 2 == 0

    bus.subscribe("metrics", handler)
    for idx in range(5):
        bus.publish("metrics", {"value": idx})
    print("Remaining queue:", list(bus.queues["metrics"]))

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced bus introduces retry policies and dead-letter topics for messages that repeatedly fail. We support exponential backoff so retries do not overwhelm handlers, and we surface metrics to alert on poison messages. By observing the event flow you see how to maintain both reliability and transparency.

import random
import time
from collections import defaultdict, deque
from typing import Callable, Deque, Dict, Tuple

class ResilientBroker:
    def __init__(self) -> None:
        self.handlers: Dict[str, Callable[[dict], None]] = {}
        self.queues: Dict[str, Deque[Tuple[int, dict, int]]] = defaultdict(deque)
        self.dead_letters: Dict[str, list] = defaultdict(list)

    def subscribe(self, topic: str, handler: Callable[[dict], None]) -> None:
        self.handlers[topic] = handler

    def publish(self, topic: str, payload: dict) -> None:
        queue = self.queues[topic]
        queue.append((len(queue), payload, 0))
        self._drain(topic)

    def _drain(self, topic: str) -> None:
        queue = self.queues[topic]
        handler = self.handlers.get(topic)
        pending = len(queue)
        for _ in range(pending):
            offset, payload, attempt = queue.popleft()
            try:
                handler(payload)
            except Exception as exc:
                attempt += 1
                if attempt >= 3:
                    print("Dead-lettering", payload)
                    self.dead_letters[topic].append((payload, str(exc)))
                else:
                    backoff = 0.1 * (2 ** attempt)
                    print("Retrying", payload, "in", backoff)
                    time.sleep(backoff)
                    queue.append((offset, payload, attempt))

def main() -> None:
    random.seed(16)
    broker = ResilientBroker()

    def handler(payload: dict) -> None:
        if random.random() < 0.5:
            raise RuntimeError("handler failure")
        print("Handled", payload)

    broker.subscribe("notifications", handler)
    for idx in range(4):
        broker.publish("notifications", {"id": idx})
    print("Dead letters:", broker.dead_letters["notifications"])

if __name__ == "__main__":
    main()
Machine Coding - Workflow Orchestrator (Levels 1-3)
workflow orchestrator - concurrency - resiliency
Scope: evolve workflow orchestration from step sequencing to concurrent coordination with resiliencyThemes: escalating capabilities

Grow the workflow orchestrator from basic sequencing to concurrent coordination and resilient recovery.

Workflow Orchestrator Stack\n ├─ Level 1: Core Orchestrator\n ├─ Level 2: Concurrent Workflow Engine\n └─ Level 3: Resilient Workflow Service

Level 1 — Core Implementation

The workflow series begins with representing tasks and dependencies as a directed acyclic graph and performing a topological traversal. We emit lifecycle events for start and completion so orchestration is observable. Stepping through the sample illustrates how prerequisite tracking keeps execution honest.

from collections import defaultdict, deque
from typing import Callable, Dict, List, Set

class Workflow:
    def __init__(self) -> None:
        self.graph: Dict[str, Set[str]] = defaultdict(set)
        self.reverse: Dict[str, Set[str]] = defaultdict(set)
        self.tasks: Dict[str, Callable[[], None]] = {}

    def add_task(self, name: str, fn: Callable[[], None]) -> None:
        self.tasks[name] = fn

    def add_edge(self, prereq: str, task: str) -> None:
        self.graph[prereq].add(task)
        self.reverse[task].add(prereq)

    def run(self) -> None:
        indegree: Dict[str, int] = {task: len(self.reverse[task]) for task in self.tasks}
        ready = deque([task for task, deg in indegree.items() if deg == 0])
        while ready:
            task = ready.popleft()
            self.tasks[task]()
            for neighbor in self.graph[task]:
                indegree[neighbor] -= 1
                if indegree[neighbor] == 0:
                    ready.append(neighbor)

def main() -> None:
    wf = Workflow()
    wf.add_task("A", lambda: print("A"))
    wf.add_task("B", lambda: print("B"))
    wf.add_task("C", lambda: print("C"))
    wf.add_task("D", lambda: print("D"))
    wf.add_edge("A", "B")
    wf.add_edge("A", "C")
    wf.add_edge("B", "D")
    wf.run()

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 brings concurrency by dispatching ready tasks to a worker pool while still guarding dependency checks. We synchronise updates to shared dependency counters and capture traces to visualise parallel execution. The exercise highlights how concurrency can coexist with determinism when data structures are chosen carefully.

from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from typing import Callable, Dict, Set

class ConcurrentWorkflow:
    def __init__(self) -> None:
        self.dependencies: Dict[str, Set[str]] = defaultdict(set)
        self.dependents: Dict[str, Set[str]] = defaultdict(set)
        self.tasks: Dict[str, Callable[[], None]] = {}

    def add_task(self, name: str, fn: Callable[[], None]) -> None:
        self.tasks[name] = fn

    def add_edge(self, prereq: str, task: str) -> None:
        self.dependencies[task].add(prereq)
        self.dependents[prereq].add(task)

    def run(self) -> None:
        completed: Set[str] = set()
        executor = ThreadPoolExecutor(max_workers=4)

        def schedule(task: str) -> None:
            executor.submit(execute_task, task)

        def execute_task(task: str) -> None:
            self.tasks[task]()
            completed.add(task)
            for dependent in self.dependents[task]:
                if self.dependencies[dependent].issubset(completed):
                    schedule(dependent)

        for task in self.tasks:
            if not self.dependencies[task]:
                schedule(task)
        executor.shutdown(wait=True)

def main() -> None:
    wf = ConcurrentWorkflow()
    wf.add_task("A", lambda: print("A"))
    wf.add_task("B", lambda: print("B"))
    wf.add_task("C", lambda: print("C"))
    wf.add_task("D", lambda: print("D"))
    wf.add_edge("A", "B")
    wf.add_edge("A", "C")
    wf.add_edge("B", "D")
    wf.add_edge("C", "D")
    wf.run()

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced workflow layer persists execution state so runs can resume after a crash and keeps history for auditing. We add retry budgets and compensation handlers to roll back or remediate partial work. Learners see how durable state and careful bookkeeping turn an in-memory runner into an operations-friendly system.

import json
import os
import random
import time
from typing import Callable, Dict

class StateStore:
    def __init__(self, path: str) -> None:
        self.path = path
        if not os.path.exists(path):
            self.save({})

    def load(self) -> Dict[str, str]:
        with open(self.path, "r", encoding="utf-8") as handle:
            return json.load(handle)

    def save(self, state: Dict[str, str]) -> None:
        with open(self.path, "w", encoding="utf-8") as handle:
            json.dump(state, handle)

class SagaStep:
    def __init__(self, execute: Callable[[], None], compensate: Callable[[], None]) -> None:
        self.execute = execute
        self.compensate = compensate

class ResilientSaga:
    def __init__(self, steps: Dict[str, SagaStep], store: StateStore) -> None:
        self.steps = steps
        self.store = store
        self.state = store.load()

    def run(self) -> None:
        for name, step in self.steps.items():
            status = self.state.get(name)
            if status == "done":
                continue
            attempt = 0
            while attempt < 3:
                try:
                    step.execute()
                    self.state[name] = "done"
                    self.store.save(self.state)
                    break
                except Exception as exc:
                    attempt += 1
                    if attempt >= 3:
                        self.state[name] = "failed"
                        self.store.save(self.state)
                        step.compensate()
                        raise
                    time.sleep(0.2 * attempt)

def main() -> None:
    random.seed(17)

    def flaky_step():
        if random.random() < 0.5:
            raise RuntimeError("flaky")
        print("Executed flaky step")

    steps = {
        "reserve": SagaStep(lambda: print("Reserved inventory"), lambda: print("Released inventory")),
        "pay": SagaStep(flaky_step, lambda: print("Refunded payment")),
        "notify": SagaStep(lambda: print("Notification sent"), lambda: print("Notification compensating")),
    }
    saga = ResilientSaga(steps, StateStore("/tmp/workflow-state.json"))
    try:
        saga.run()
    except Exception as exc:
        print("Saga ended with failure:", exc)

if __name__ == "__main__":
    main()
Machine Coding - Feed Aggregator (Levels 1-3)
feed aggregator - concurrency - resiliency
Scope: evolve feed aggregation from a single puller to concurrent pipelines and resilient aggregation.Themes: escalating capabilities

Progressively enhance the feed aggregation system from simple RSS pulling to concurrent pipelines and resilient aggregation with fallbacks.

Feed Aggregator Stack
 ├─ Level 1: RSS Puller
 ├─ Level 2: Concurrent Fetch Pipeline
 └─ Level 3: Resilient Aggregation Service

Level 1 — Core Implementation

The feed exercises start with normalising posts into append-only timelines and merging them efficiently for each user. We focus on clear storage abstractions and deterministic ordering so pagination becomes predictable. Sample data shows how following relationships translate into the final feed without caching yet.

import heapq
from typing import Dict, List, Optional, Tuple

class FeedService:
    def __init__(self, follows: Dict[str, List[str]], posts: Dict[str, List[Tuple[int, str]]]) -> None:
        self.follows = follows
        self.posts = posts

    def feed(self, user: str) -> List[Tuple[int, str]]:
        heap = []
        for followee in self.follows.get(user, []):
            for timestamp, content in self.posts.get(followee, []):
                heapq.heappush(heap, (-timestamp, content))
        return [(-ts, content) for ts, content in heapq.nsmallest(len(heap), heap)]

def main() -> None:
    follows = {"alice": ["bob", "carol"]}
    posts = {
        "bob": [(3, "bob post 1"), (1, "bob post 0")],
        "carol": [(2, "carol post")],
    }
    service = FeedService(follows, posts)
    print("Feed:", service.feed("alice"))

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 introduces fan-out-on-write where publishing a post updates follower caches immediately while a background refresher keeps them warm. We discuss cache invalidation, TTLs, and fallbacks so stale entries are minimised. Observing cache hit logs highlights the trade-offs between write amplification and read latency.

import threading
import time
from collections import defaultdict, deque
from typing import Deque, Dict, List, Tuple

class FanoutFeed:
    def __init__(self, follows: Dict[str, List[str]]) -> None:
        self.follows = follows
        self.cache: Dict[str, Deque[Tuple[int, str]]] = defaultdict(deque)
        self.lock = threading.Lock()

    def publish(self, author: str, content: str, timestamp: int) -> None:
        followers = [user for user, followees in self.follows.items() if author in followees]
        for follower in followers:
            with self.lock:
                self.cache[follower].appendleft((timestamp, content))
                while len(self.cache[follower]) > 10:
                    self.cache[follower].pop()

    def timeline(self, user: str) -> List[Tuple[int, str]]:
        with self.lock:
            return list(self.cache[user])

def main() -> None:
    follows = {"alice": ["bob"], "eve": ["bob", "alice"]}
    feed = FanoutFeed(follows)
    feed.publish("bob", "hello world", 3)
    feed.publish("alice", "hi there", 4)
    print("Alice timeline:", feed.timeline("alice"))
    print("Eve timeline:", feed.timeline("eve"))

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced feed level assumes outages, so requests hit caches first, fall back to stale-but-safe copies, and enqueue refresh jobs once storage recovers. We surface degradation warnings to operators and include reconciliation steps to heal caches afterwards. Learners understand how maintaining a good user experience often means transparently serving older data with clear telemetry.

import random
import time
from typing import Dict, List, Optional, Tuple

class PrimaryFeedStore:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate
        self.feeds: Dict[str, List[Tuple[int, str]]] = {}

    def get(self, user: str) -> List[Tuple[int, str]]:
        if random.random() < self.failure_rate:
            raise RuntimeError("feed store down")
        return self.feeds.get(user, [])

class ResilientFeedService:
    def __init__(self, primary: PrimaryFeedStore) -> None:
        self.primary = primary
        self.cache: Dict[str, List[Tuple[int, str]]] = {}
        self.stale: Dict[str, List[Tuple[int, str]]] = {}

    def feed(self, user: str) -> List[Tuple[int, str]]:
        try:
            feed = self.primary.get(user)
            self.cache[user] = feed
            self.stale[user] = feed
            return feed
        except Exception as exc:
            print("Primary failed, serving stale feed:", exc)
            return self.cache.get(user) or self.stale.get(user, [])

def main() -> None:
    random.seed(19)
    store = PrimaryFeedStore(0.6)
    store.feeds["alice"] = [(5, "new post"), (3, "older post")]
    service = ResilientFeedService(store)
    for _ in range(4):
        print("Feed response:", service.feed("alice"))
        time.sleep(0.1)

if __name__ == "__main__":
    main()
Machine Coding - Metrics Collector (Levels 1-3)
metrics collector - concurrency - resiliency
Scope: evolve metrics collection from in-memory to sharded concurrent collectors and resilient pipelines.Themes: escalating capabilities

Improve the metrics collector from a simple in-memory store to concurrent shards and a resilient pipeline with buffering/fallback.

Metrics Collector Stack
 ├─ Level 1: In-Memory Collector
 ├─ Level 2: Concurrent Sharded Collector
 └─ Level 3: Resilient Metrics Pipeline

Level 1 — Core Implementation

The metrics series starts with a minimal counter registry that tracks named metrics and exposes read-friendly snapshots. We design the API to be idempotent and safe for rapid increments, laying groundwork for richer aggregations. Sample interactions emphasise clarity of contract before concurrency or distribution is introduced.

from collections import defaultdict
from typing import Dict

class CounterService:
    def __init__(self) -> None:
        self.counters: Dict[str, int] = defaultdict(int)

    def increment(self, name: str, value: int = 1) -> None:
        self.counters[name] += value

    def get(self, name: str) -> int:
        return self.counters[name]

def main() -> None:
    counters = CounterService()
    counters.increment("requests")
    counters.increment("requests", 2)
    print("Requests:", counters.get("requests"))

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 tracks metrics over sliding windows, partitioning counts by timestamp buckets and updating them under locks. We prune stale buckets efficiently to keep memory bounded, even at high write rates. Threaded tests show how to trust rolling aggregations when events interleave across CPUs.

from __future__ import annotations

import threading
import time
from collections import deque
from typing import Deque, Tuple

class RollingCounter:
    def __init__(self, window_seconds: int) -> None:
        self.window = window_seconds
        self.events: Deque[Tuple[float, int]] = deque()
        self.lock = threading.Lock()

    def increment(self, value: int = 1) -> None:
        now = time.time()
        with self.lock:
            self.events.append((now, value))
            self._trim(now)

    def total(self) -> int:
        now = time.time()
        with self.lock:
            self._trim(now)
            return sum(value for _, value in self.events)

    def _trim(self, now: float) -> None:
        while self.events and now - self.events[0][0] > self.window:
            self.events.popleft()

def worker(counter: RollingCounter) -> None:
    for _ in range(5):
        counter.increment()
        time.sleep(0.05)

def main() -> None:
    counter = RollingCounter(1)
    threads = [threading.Thread(target=worker, args=(counter,)) for _ in range(3)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print("Rolling total:", counter.total())

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced level models many edge collectors pushing to central storage, so we batch payloads, retry with backoff, and spill to disk if the network is unreachable. We annotate batches with metadata so reconciliations can deduplicate after recovery. Running the scenario demonstrates how resilient telemetry pipelines keep observability intact.

import json
import os
import random
import time
from typing import Dict, List, Optional

class CentralTransport:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate

    def ship(self, batch: List[Dict[str, int]]) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("transport failure")
        print("Shipped batch:", batch)

class FallbackDisk:
    def __init__(self, path: str) -> None:
        self.path = path

    def persist(self, batch: List[Dict[str, int]]) -> None:
        with open(self.path, "a", encoding="utf-8") as handle:
            handle.write(json.dumps(batch) + "\n")

class ResilientAggregator:
    def __init__(self, transport: CentralTransport, fallback: FallbackDisk, batch_size: int = 3) -> None:
        self.transport = transport
        self.fallback = fallback
        self.batch_size = batch_size
        self.buffer: List[Dict[str, int]] = []

    def record(self, metrics: Dict[str, int]) -> None:
        self.buffer.append(metrics)
        if len(self.buffer) >= self.batch_size:
            self._flush()

    def _flush(self) -> None:
        batch = self.buffer[: self.batch_size]
        try:
            self.transport.ship(batch)
        except Exception as exc:
            print("Ship failed:", exc, "persisting locally")
            self.fallback.persist(batch)
        finally:
            self.buffer = self.buffer[self.batch_size :]

def main() -> None:
    random.seed(20)
    aggregator = ResilientAggregator(CentralTransport(0.5), FallbackDisk("/tmp/metrics.log"))
    for idx in range(7):
        aggregator.record({"requests": idx})
    aggregator._flush()

if __name__ == "__main__":
    main()
Machine Coding - Document Collaboration (Levels 1-3)
document collaboration - concurrency - resiliency
Scope: evolve document collaboration from core storage to concurrent editing and resilient synchronization.Themes: escalating capabilities

Build the document collaboration platform from basic storage to concurrent sessions and resilient synchronization.

Doc Collaboration Stack
 ├─ Level 1: Core Document Store
 ├─ Level 2: Concurrent Editing Sessions
 └─ Level 3: Resilient Collaboration Service

Level 1 — Core Implementation

The collaboration series begins with an operational log that applies inserts and deletes in order, emphasising deterministic replay. We track cursor positions and authorship so concurrent editing feels tangible even before conflicts arise. Practitioners get comfortable with representing text updates as operations rather than raw buffers.

from dataclasses import dataclass
from typing import List

@dataclass
class Operation:
    kind: str
    index: int
    payload: str = ""

class Document:
    def __init__(self) -> None:
        self.text = ""
        self.log: List[Operation] = []

    def insert(self, index: int, payload: str) -> None:
        self.text = self.text[:index] + payload + self.text[index:]
        self.log.append(Operation("insert", index, payload))

    def delete(self, index: int) -> None:
        self.text = self.text[:index] + self.text[index + 1 :]
        self.log.append(Operation("delete", index))

    def replay(self) -> str:
        text = ""
        for op in self.log:
            if op.kind == "insert":
                text = text[:op.index] + op.payload + text[op.index:]
            elif op.kind == "delete":
                text = text[:op.index] + text[op.index + 1 :]
        return text

def main() -> None:
    doc = Document()
    doc.insert(0, "H")
    doc.insert(1, "i")
    doc.insert(2, "!")
    doc.delete(2)
    print("Text:", doc.text)
    print("Replay:", doc.replay())

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 introduces a character-wise CRDT so edits from different clients merge without central coordination. We explain how identifiers and tombstones guarantee convergence and highlight trade-offs in memory usage. The demo shows identical final documents regardless of edit ordering, grounding the theory in practice.

from __future__ import annotations

import threading
from dataclasses import dataclass
from typing import Dict, Tuple

@dataclass
class CharacterVersion:
    char: str
    clock: Tuple[str, int]
    tombstone: bool = False

class CRDTDocument:
    def __init__(self) -> None:
        self.characters: Dict[int, CharacterVersion] = {}
        self.lock = threading.Lock()

    def apply(self, position: int, char: str, clock: Tuple[str, int]) -> None:
        with self.lock:
            existing = self.characters.get(position)
            if not existing or clock > existing.clock:
                self.characters[position] = CharacterVersion(char, clock)

    def delete(self, position: int, clock: Tuple[str, int]) -> None:
        with self.lock:
            existing = self.characters.get(position)
            if not existing or clock >= existing.clock:
                self.characters[position] = CharacterVersion("", clock, tombstone=True)

    def materialize(self) -> str:
        with self.lock:
            return "".join(
                version.char for pos, version in sorted(self.characters.items()) if not version.tombstone
            )

def worker(doc: CRDTDocument, author: str, edits: Tuple[int, str]) -> None:
    pos, char = edits
    doc.apply(pos, char, (author, pos))

def main() -> None:
    doc = CRDTDocument()
    threads = [
        threading.Thread(target=worker, args=(doc, "siteA", (0, "H"))),
        threading.Thread(target=worker, args=(doc, "siteB", (1, "i"))),
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print("Merged doc:", doc.materialize())

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced level accepts network partitions, buffering operations locally and replaying them with proper causal ordering once connectivity returns. We persist outbound queues to disk so power loss does not destroy unsynced work. Learners observe how acknowledgements and retries maintain a consistent shared document despite offline stretches.

import random
import time
from dataclasses import dataclass
from queue import Queue, Empty

@dataclass
class EditOp:
    op_id: str
    position: int
    payload: str

class RemoteStore:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate
        self.buffer = ""

    def apply(self, op: EditOp) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("network error")
        self.buffer = self.buffer[: op.position] + op.payload + self.buffer[op.position :]
        print("Remote applied", op.op_id, "->", self.buffer)

class OfflineClient:
    def __init__(self, store: RemoteStore) -> None:
        self.store = store
        self.queue: "Queue[EditOp]" = Queue()

    def edit(self, op: EditOp) -> None:
        print("Queued", op)
        self.queue.put(op)

    def sync(self) -> None:
        pending = []
        while True:
            try:
                op = self.queue.get_nowait()
            except Empty:
                break
            try:
                self.store.apply(op)
            except Exception as exc:
                print("Sync failed for", op.op_id, exc)
                pending.append(op)
            finally:
                self.queue.task_done()
        for op in pending:
            self.queue.put(op)

def main() -> None:
    random.seed(21)
    store = RemoteStore(0.5)
    client = OfflineClient(store)
    client.edit(EditOp("op1", 0, "Hi"))
    client.edit(EditOp("op2", 2, "!"))
    for _ in range(5):
        client.sync()
        time.sleep(0.1)

if __name__ == "__main__":
    main()
Machine Coding - Rule Engine (Levels 1-3)
rule engine - concurrency - resiliency
Scope: evolve rule evaluation from basic engine to concurrent processing and resilient service.Themes: escalating capabilities

Progressively enhance the rule engine from core evaluation to concurrent processing and a resilient rules service.

Rule Engine Stack
 ├─ Level 1: Core Rule Evaluator
 ├─ Level 2: Concurrent Rule Processing
 └─ Level 3: Resilient Rules Service

Level 1 — Core Implementation

We launch the rule-engine series by modelling conditions and actions as first-class objects and executing them against sample payloads. Evaluation contexts expose facts and collect side effects, making it straightforward to test rules in isolation. The walkthrough explains how to debug rule ordering and short-circuit behaviour.

from dataclasses import dataclass
from typing import Callable, Dict, List

@dataclass
class Rule:
    name: str
    condition: Callable[[Dict], bool]
    action: Callable[[Dict], None]

class RuleEngine:
    def __init__(self) -> None:
        self.rules: List[Rule] = []

    def add_rule(self, rule: Rule) -> None:
        self.rules.append(rule)

    def evaluate(self, context: Dict) -> None:
        for rule in self.rules:
            if rule.condition(context):
                rule.action(context)

def main() -> None:
    engine = RuleEngine()
    engine.add_rule(Rule("high-value", lambda ctx: ctx["amount"] > 100, lambda ctx: print("Flag high value")))
    engine.add_rule(Rule("vip", lambda ctx: ctx["customer"] == "vip", lambda ctx: print("Apply vip perks")))
    engine.evaluate({"amount": 150, "customer": "vip"})

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 enables hot reloading by guarding rule sets with reader-writer locks so evaluations keep using a consistent snapshot. We stage updates, swap them atomically, and expose version metadata so operators know which rules executed. The exercise showcases how to mix responsiveness with safety when configuration evolves live.

from __future__ import annotations

import threading
from dataclasses import dataclass
from typing import Callable, Dict, List

@dataclass
class Rule:
    name: str
    condition: Callable[[Dict], bool]
    action: Callable[[Dict], None]

class HotReloadEngine:
    def __init__(self) -> None:
        self.rules: List[Rule] = []
        self.lock = threading.RLock()

    def evaluate(self, context: Dict) -> None:
        with self.lock:
            for rule in self.rules:
                if rule.condition(context):
                    rule.action(context)

    def reload(self, rules: List[Rule]) -> None:
        with self.lock:
            self.rules = rules

def evaluator(engine: HotReloadEngine, context: Dict) -> None:
    engine.evaluate(context)

def main() -> None:
    engine = HotReloadEngine()
    engine.reload([Rule("gt100", lambda ctx: ctx["amount"] > 100, lambda ctx: print("Large order"))])
    threads = [
        threading.Thread(target=evaluator, args=(engine, {"amount": 150})),
        threading.Thread(target=evaluator, args=(engine, {"amount": 80})),
    ]
    for thread in threads:
        thread.start()
    engine.reload([Rule("gt50", lambda ctx: ctx["amount"] > 50, lambda ctx: print("Order > 50"))])
    for thread in threads:
        thread.join()
    engine.evaluate({"amount": 60})

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced rule engine treats rule definitions as remote resources, layering retries, backoff, and local caching around fetches. We log degradation events and refresh successes to keep operations transparent. Learners see how to maintain deterministic behaviour even while rule sources flicker.

import random
import time
from dataclasses import dataclass
from typing import Callable, Dict, List

@dataclass
class Rule:
    name: str
    condition: Callable[[Dict], bool]
    action: Callable[[Dict], None]

class RemoteRuleSource:
    def __init__(self, failure_rate: float = 0.5) -> None:
        self.failure_rate = failure_rate

    def fetch(self) -> List[Rule]:
        if random.random() < self.failure_rate:
            raise RuntimeError("rule source down")
        return [
            Rule("gt200", lambda ctx: ctx["amount"] > 200, lambda ctx: print("Audit order")),
        ]

class ResilientRuleEngine:
    def __init__(self, source: RemoteRuleSource) -> None:
        self.source = source
        self.cache: List[Rule] = []

    def refresh(self) -> None:
        try:
            rules = self.source.fetch()
            self.cache = rules
            print("Rule cache refreshed")
        except Exception as exc:
            print("Rule refresh failed:", exc)

    def evaluate(self, context: Dict) -> None:
        for rule in self.cache:
            if rule.condition(context):
                rule.action(context)

def main() -> None:
    random.seed(22)
    engine = ResilientRuleEngine(RemoteRuleSource(0.6))
    for _ in range(4):
        engine.refresh()
        engine.evaluate({"amount": 250})
        time.sleep(0.2)

if __name__ == "__main__":
    main()
Machine Coding - Leaderboard (Levels 1-3)
leaderboard - concurrency - resiliency
Scope: evolve leaderboard from core ranking to concurrent updates and resilient aggregation.Themes: escalating capabilities

Grow the leaderboard functionality from single-threaded ranking to concurrent updates and resilient aggregation with caches.

Leaderboard Stack
 ├─ Level 1: Core Leaderboard
 ├─ Level 2: Concurrent Leaderboard Updates
 └─ Level 3: Resilient Leaderboard Service

Level 1 — Core Implementation

The leaderboard journey begins with a balanced tree or heap to maintain ordering while updates arrive. We keep player profiles separate from ranking logic so lookups stay cheap and expressive. Sample commands demonstrate how updates rebalance positions instantly.

import heapq
from typing import Dict, List, Optional, Tuple

class Leaderboard:
    def __init__(self) -> None:
        self.scores: Dict[str, int] = {}

    def update(self, player: str, score: int) -> None:
        self.scores[player] = score

    def top(self, k: int) -> List[Tuple[int, str]]:
        return heapq.nlargest(k, [(score, player) for player, score in self.scores.items()])

def main() -> None:
    lb = Leaderboard()
    lb.update("alice", 100)
    lb.update("bob", 90)
    lb.update("carol", 120)
    print("Top2:", lb.top(2))

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 handles rapid writes by sharding state and protecting each shard with its own lock, improving throughput. We add score decay jobs to simulate games where ranking should drift over time, highlighting maintenance tasks. Threaded stress tests make it clear how to spot hotspots and tune shard counts.

from __future__ import annotations

import threading
from collections import defaultdict
from typing import Dict, List, Optional, Tuple

class ShardedLeaderboard:
    def __init__(self, shards: int = 4) -> None:
        self.shards = shards
        self.segment_scores: Dict[int, Dict[str, int]] = defaultdict(dict)
        self.locks: Dict[int, threading.Lock] = defaultdict(threading.Lock)

    def _shard(self, player: str) -> int:
        return hash(player) % self.shards

    def update(self, player: str, delta: int) -> None:
        shard = self._shard(player)
        with self.locks[shard]:
            self.segment_scores[shard][player] = self.segment_scores[shard].get(player, 0) + delta

    def snapshot(self) -> Dict[str, int]:
        combined: Dict[str, int] = {}
        for shard, scores in self.segment_scores.items():
            with self.locks[shard]:
                for player, score in scores.items():
                    combined[player] = combined.get(player, 0) + score
        return combined

def worker(lb: ShardedLeaderboard, player: str) -> None:
    for _ in range(10):
        lb.update(player, 1)

def main() -> None:
    lb = ShardedLeaderboard()
    threads = [threading.Thread(target=worker, args=(lb, f"player{i}")) for i in range(5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print("Snapshot:", lb.snapshot())

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced level layers a distributed cache over a durable store so reads stay fast while writes remain authoritative. Cache misses fall back to the store, and successful reads repopulate the cache, all while instrumentation reports hit rates. Learners gain intuition for balancing latency and correctness in highly read-heavy workloads.

import random
from typing import Dict, List, Optional, Tuple

class ShardedLeaderboard:
    def __init__(self, shards: int = 4) -> None:
        self.shards = shards
        self.segment_scores: Dict[int, Dict[str, int]] = defaultdict(dict)
        self.locks: Dict[int, threading.Lock] = defaultdict(threading.Lock)

    def _shard(self, player: str) -> int:
        return hash(player) % self.shards

    def update(self, player: str, delta: int) -> None:
        shard = self._shard(player)
        with self.locks[shard]:
            self.segment_scores[shard][player] = self.segment_scores[shard].get(player, 0) + delta

    def snapshot(self) -> Dict[str, int]:
        combined: Dict[str, int] = {}
        for shard, scores in self.segment_scores.items():
            with self.locks[shard]:
                for player, score in scores.items():
                    combined[player] = combined.get(player, 0) + score
        return combined

class CacheLayer:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate
        self.cache: Dict[str, List[Tuple[int, str]]] = {}

    def get(self, key: str) -> List[Tuple[int, str]]:
        if random.random() < self.failure_rate:
            raise RuntimeError("cache down")
        return self.cache.get(key, [])

    def set(self, key: str, value: List[Tuple[int, str]]) -> None:
        self.cache[key] = value

class ResilientLeaderboard:
    def __init__(self, store: ShardedLeaderboard, cache: CacheLayer) -> None:
        self.store = store
        self.cache = cache

    def update(self, player: str, score: int) -> None:
        self.store.update(player, score)

    def top(self, k: int) -> List[Tuple[int, str]]:
        key = f"top:{k}"
        try:
            result = self.cache.get(key)
            if result:
                return result
        except Exception as exc:
            print("Cache miss due to", exc)
        
        scores = self.store.snapshot()
        result = heapq.nlargest(k, [(score, player) for player, score in scores.items()])
        try:
            self.cache.set(key, result)
        except Exception as exc:
            print("Cache set failed:", exc)
        return result

def main() -> None:
    random.seed(23)
    store = ShardedLeaderboard()
    cache = CacheLayer(0.5)
    lb = ResilientLeaderboard(store, cache)
    lb.update("alice", 120)
    lb.update("bob", 110)
    lb.update("carol", 140)
    for _ in range(3):
        print("Top:", lb.top(2))

if __name__ == "__main__":
    main()
Machine Coding - Matchmaking (Levels 1-3)
matchmaking - concurrency - resiliency
Scope: evolve matchmaking from basic queue to concurrent matchmakers and resilient services.Themes: escalating capabilities

Progressively build the matchmaking system from simple queues to concurrent matchmakers and resilient session allocation.

Matchmaking Stack
 ├─ Level 1: Core Queue
 ├─ Level 2: Concurrent Matchmaker
 └─ Level 3: Resilient Match Service

Level 1 — Core Implementation

The matchmaking track starts by placing players into skill-bucketed queues and popping them in FIFO order to create fair matches. We define match objects and callbacks so downstream services can react to creation events. The sample session clarifies how queue discipline keeps latency and fairness predictable.

from collections import defaultdict, deque
from typing import Deque, Dict, List, Tuple

class Matchmaker:
    def __init__(self) -> None:
        self.buckets: Dict[int, Deque[str]] = defaultdict(deque)

    def enqueue(self, player: str, skill: int) -> None:
        bucket = skill // 100
        self.buckets[bucket].append(player)

    def match(self) -> List[Tuple[str, str]]:
        matches = []
        for queue in self.buckets.values():
            while len(queue) >= 2:
                matches.append((queue.popleft(), queue.popleft()))
        return matches

def main() -> None:
    mm = Matchmaker()
    mm.enqueue("alice", 120)
    mm.enqueue("bob", 130)
    mm.enqueue("carol", 205)
    mm.enqueue("dave", 215)
    print("Matches:", mm.match())

if __name__ == "__main__":
    main()

Level 2 — Concurrent Enhancements

Level 2 introduces background workers that drain queues while multiple threads enqueue players, relying on locks and conditions to stay consistent. We expose metrics for queue length and match throughput so you can see how load affects wait times. Running the concurrent simulation demonstrates how to keep shared queues healthy under pressure.

from __future__ import annotations

import threading
import time
from collections import defaultdict, deque
from typing import Deque, Dict

class ConcurrentMatchmaker:
    def __init__(self) -> None:
        self.buckets: Dict[int, Deque[str]] = defaultdict(deque)
        self.locks: Dict[int, threading.Lock] = defaultdict(threading.Lock)
        self.stop = False
        threading.Thread(target=self._loop, daemon=True).start()

    def enqueue(self, player: str, skill: int) -> None:
        bucket = skill // 100
        lock = self.locks[bucket]
        with lock:
            self.buckets[bucket].append(player)

    def _loop(self) -> None:
        while not self.stop:
            for bucket, queue in list(self.buckets.items()):
                lock = self.locks[bucket]
                with lock:
                    while len(queue) >= 2:
                        p1 = queue.popleft()
                        p2 = queue.popleft()
                        print("Match:", p1, p2)
            time.sleep(0.1)

    def shutdown(self) -> None:
        self.stop = True

def worker(mm: ConcurrentMatchmaker, player: str, skill: int) -> None:
    mm.enqueue(player, skill)

def main() -> None:
    mm = ConcurrentMatchmaker()
    threads = [threading.Thread(target=worker, args=(mm, f"player{i}", 100 + i * 10)) for i in range(6)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    time.sleep(0.5)
    mm.shutdown()

if __name__ == "__main__":
    main()

Level 3 — Resilient Architecture

The advanced level guards the external session allocator with retry policies, backoff, and a fallback queue for matches awaiting resources. We persist pending matches so they survive restarts and surface alarms when retries exhaust. Learners watch how graceful degradation keeps player experience stable even while dependencies recover.

import random
import time
from collections import deque
from typing import Deque, Tuple

class SessionAllocator:
    def __init__(self, failure_rate: float = 0.4) -> None:
        self.failure_rate = failure_rate

    def allocate(self, players: Tuple[str, str]) -> None:
        if random.random() < self.failure_rate:
            raise RuntimeError("allocation failed")
        print("Session allocated for", players)

class ResilientMatchmaker:
    def __init__(self, allocator: SessionAllocator) -> None:
        self.allocator = allocator
        self.pending: Deque[Tuple[str, str]] = deque()

    def submit_match(self, players: Tuple[str, str]) -> None:
        self.pending.append(players)
        self._process()

    def _process(self) -> None:
        retries = []
        while self.pending:
            players = self.pending.popleft()
            try:
                self.allocator.allocate(players)
            except Exception as exc:
                print("Allocation failed:", exc)
                retries.append(players)
        for players in retries:
            self.pending.append(players)
            time.sleep(0.1)

def main() -> None:
    random.seed(24)
    matchmaker = ResilientMatchmaker(SessionAllocator(0.5))
    matches = [("alice", "bob"), ("carol", "dave"), ("eve", "frank")]
    for match in matches:
        matchmaker.submit_match(match)
        matchmaker._process()

if __name__ == "__main__":
    main()