Incrementally build a parking lot platform: begin with a clean single-floor design, extend to multi-entry concurrent operations, and finally wrap it with resiliency patterns that protect external integrations.
Parking Campus ├─ Level 1: Floor F0 → Slots S1..Sn ├─ Level 2: Entry Dispatchers → Floor Locks → Assignments └─ Level 3: Entry Service → CircuitBreaker(Payment) → Ticketing Fallback
Level 1 — Core Design
Level 1 lays the foundations by modelling spots, vehicles, and allocation requests as explicit dataclasses so the domain vocabulary is clear. We compose a repository abstraction, a guard-chain for validations, and a selector strategy before a single state mutation happens, which keeps business rules honest. Finishing with EventBus notifications proves the baseline service can already integrate outward and sets up later extensions without rewiring the core.
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Dict, Iterable, Optional, Protocol
class SpotRepository(Protocol):
def available(self, *, level: Optional[int] = None) -> Iterable["Spot"]:
...
def get(self, spot_id: str) -> Optional["Spot"]:
...
def update(self, spot_id: str, spot: "Spot") -> None:
...
@dataclass
class Spot:
spot_id: str
level: int
size: str
state: str = "VACANT"
vehicle_id: Optional[str] = None
@dataclass
class AllocationRequest:
vehicle_id: str
size: str
duration_hours: int
preferred_level: Optional[int] = None
@dataclass
class AllocationReceipt:
spot_id: str
fee: float
class EventBus:
def __init__(self) -> None:
self._subscribers: Dict[str, list[Callable[[dict], None]]] = {}
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers.setdefault(event, []).append(handler)
def publish(self, event: str, payload: dict) -> None:
for handler in self._subscribers.get(event, []):
handler(payload)
class BaseGuard:
def __init__(self) -> None:
self._next: Optional["BaseGuard"] = None
def set_next(self, nxt: "BaseGuard") -> "BaseGuard":
self._next = nxt
return nxt
def check(self, context: dict) -> Optional[str]:
return None
def handle(self, context: dict) -> Optional[str]:
failure = self.check(context)
if failure or not self._next:
return failure
return self._next.handle(context)
class VehicleSizeGuard(BaseGuard):
ORDER = {"compact": 0, "regular": 1, "large": 2}
def check(self, context: dict) -> Optional[str]:
request: AllocationRequest = context["request"]
spot: Spot = context["spot"]
try:
if self.ORDER[spot.size] < self.ORDER[request.size]:
return "SIZE_MISMATCH"
except KeyError:
return "UNKNOWN_SIZE"
return None
class SpotAvailabilityGuard(BaseGuard):
def check(self, context: dict) -> Optional[str]:
spot: Spot = context["spot"]
if spot.state != "VACANT":
return "TAKEN"
return None
class SpotSelector(Protocol):
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
...
class NearestLevelSelector:
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
preferred_level = request.preferred_level
best: Optional[Spot] = None
best_distance = float("inf")
for spot in candidates:
target = preferred_level if preferred_level is not None else spot.level
distance = abs(target - spot.level)
if distance < best_distance:
best = spot
best_distance = distance
return best
class PricingStrategy(Protocol):
def compute(self, request: AllocationRequest) -> float:
...
class FlatRatePricing(PricingStrategy):
RATES = {"compact": 5, "regular": 8, "large": 12}
def compute(self, request: AllocationRequest) -> float:
return self.RATES[request.size] * max(1, request.duration_hours)
class SpotLifecycle:
def __init__(self, spot: Spot) -> None:
self.spot = spot
def reserve(self) -> bool:
if self.spot.state != "VACANT":
return False
self.spot.state = "HELD"
return True
def occupy(self, vehicle_id: str) -> bool:
if self.spot.state != "HELD":
return False
self.spot.state = "OCCUPIED"
self.spot.vehicle_id = vehicle_id
return True
def vacate(self) -> bool:
if self.spot.state != "OCCUPIED":
return False
self.spot.state = "VACANT"
self.spot.vehicle_id = None
return True
class InMemorySpotRepository(SpotRepository):
def __init__(self, spots: Iterable[Spot]):
self._spots: Dict[str, Spot] = {spot.spot_id: spot for spot in spots}
def available(self, *, level: Optional[int] = None) -> Iterable[Spot]:
return [
spot
for spot in self._spots.values()
if spot.state == "VACANT" and (level is None or spot.level == level)
]
def get(self, spot_id: str) -> Optional[Spot]:
return self._spots.get(spot_id)
def update(self, spot_id: str, spot: Spot) -> None:
self._spots[spot_id] = spot
def all(self) -> Iterable[Spot]:
return self._spots.values()
class ParkingLotService:
def __init__(self, repo: SpotRepository, selector: SpotSelector, pricing: PricingStrategy, bus: EventBus) -> None:
self.repo = repo
self.selector = selector
self.pricing = pricing
self.bus = bus
self.guard = VehicleSizeGuard()
self.guard.set_next(SpotAvailabilityGuard())
def park(self, request: AllocationRequest) -> Optional[AllocationReceipt]:
candidates = self.repo.available(level=request.preferred_level)
spot = self.selector.choose(request, candidates)
if not spot:
return None
failure = self.guard.handle({"request": request, "spot": spot})
if failure:
return None
lifecycle = SpotLifecycle(spot)
if not lifecycle.reserve() or not lifecycle.occupy(request.vehicle_id):
return None
self.repo.update(spot.spot_id, spot)
fee = self.pricing.compute(request)
receipt = AllocationReceipt(spot_id=spot.spot_id, fee=fee)
self.bus.publish("parking.allocated", {"vehicle": request.vehicle_id, "spot": spot.spot_id, "fee": fee})
return receipt
def release(self, spot_id: str) -> bool:
spot = self.repo.get(spot_id)
if not spot:
return False
lifecycle = SpotLifecycle(spot)
if not lifecycle.vacate():
return False
self.repo.update(spot.spot_id, spot)
self.bus.publish("parking.released", {"spot": spot.spot_id})
return True
class StdoutBus(EventBus):
def publish(self, event: str, payload: dict) -> None:
super().publish(event, payload)
print(f"[event] {event}: {payload}")
def main() -> None:
repo = InMemorySpotRepository(
[
Spot("C1", level=1, size="compact"),
Spot("R1", level=1, size="regular"),
Spot("L2", level=2, size="large"),
]
)
service = ParkingLotService(repo, NearestLevelSelector(), FlatRatePricing(), StdoutBus())
request = AllocationRequest(vehicle_id="CAR-42", size="regular", duration_hours=3, preferred_level=1)
receipt = service.park(request)
print("Receipt:", receipt)
if receipt:
service.release(receipt.spot_id)
print("Inventory:", [(spot.spot_id, spot.state, spot.vehicle_id) for spot in repo.all()])
if __name__ == "__main__":
main()
Level 2 — Event-Driven Campus
Level 2 grows the design into a campus by inserting entry dispatchers that publish intents onto an EventBus instead of grabbing locks directly. We add command objects and floor-specific locks so each doorway can enqueue work without colliding, while a round-robin selector keeps allocation fair across levels. Release commands and reconciliation handlers make recovery explicit so missed acknowledgements do not strand occupied spots.
from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Optional, Protocol, Tuple
class EventBus:
def __init__(self) -> None:
self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
self._queue: Deque[Tuple[str, dict]] = deque()
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers[event].append(handler)
def publish(self, event: str, payload: dict) -> None:
self._queue.append((event, payload))
def pump(self) -> None:
while self._queue:
event, payload = self._queue.popleft()
for handler in list(self._subscribers.get(event, [])):
handler(payload)
class Command(Protocol):
def execute(self) -> None:
...
class PublishCommand:
def __init__(self, bus: EventBus, event: str, payload: dict) -> None:
self.bus = bus
self.event = event
self.payload = payload
def execute(self) -> None:
self.bus.publish(self.event, self.payload)
class SpotRepository(Protocol):
def available(self, *, level: Optional[int] = None) -> Iterable["Spot"]:
...
def get(self, spot_id: str) -> Optional["Spot"]:
...
def update(self, spot_id: str, spot: "Spot") -> None:
...
def levels(self) -> Iterable[int]:
...
@dataclass
class Spot:
spot_id: str
level: int
size: str
state: str = "VACANT"
vehicle_id: Optional[str] = None
@dataclass
class AllocationRequest:
vehicle_id: str
size: str
duration_hours: int
preferred_level: Optional[int] = None
@dataclass
class AllocationReceipt:
spot_id: str
fee: float
class BaseGuard:
def __init__(self) -> None:
self._next: Optional["BaseGuard"] = None
def set_next(self, nxt: "BaseGuard") -> "BaseGuard":
self._next = nxt
return nxt
def check(self, context: dict) -> Optional[str]:
return None
def handle(self, context: dict) -> Optional[str]:
failure = self.check(context)
if failure or not self._next:
return failure
return self._next.handle(context)
class VehicleSizeGuard(BaseGuard):
ORDER = {"compact": 0, "regular": 1, "large": 2}
def check(self, context: dict) -> Optional[str]:
request: AllocationRequest = context["request"]
spot: Spot = context["spot"]
try:
if self.ORDER[spot.size] < self.ORDER[request.size]:
return "SIZE_MISMATCH"
except KeyError:
return "UNKNOWN_SIZE"
return None
class SpotAvailabilityGuard(BaseGuard):
def check(self, context: dict) -> Optional[str]:
spot: Spot = context["spot"]
if spot.state != "VACANT":
return "TAKEN"
return None
class SpotLifecycle:
def __init__(self, spot: Spot) -> None:
self.spot = spot
def reserve(self) -> bool:
if self.spot.state != "VACANT":
return False
self.spot.state = "HELD"
return True
def occupy(self, vehicle_id: str) -> bool:
if self.spot.state != "HELD":
return False
self.spot.state = "OCCUPIED"
self.spot.vehicle_id = vehicle_id
return True
def vacate(self) -> bool:
if self.spot.state != "OCCUPIED":
return False
self.spot.state = "VACANT"
self.spot.vehicle_id = None
return True
class InMemorySpotRepository(SpotRepository):
def __init__(self, spots: Iterable[Spot]):
self._spots: Dict[str, Spot] = {spot.spot_id: spot for spot in spots}
def available(self, *, level: Optional[int] = None) -> Iterable[Spot]:
return [
spot
for spot in self._spots.values()
if spot.state == "VACANT" and (level is None or spot.level == level)
]
def get(self, spot_id: str) -> Optional[Spot]:
return self._spots.get(spot_id)
def update(self, spot_id: str, spot: Spot) -> None:
self._spots[spot_id] = spot
def levels(self) -> Iterable[int]:
return sorted({spot.level for spot in self._spots.values()})
class SpotSelector(Protocol):
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
...
class RoundRobinLevelSelector:
def __init__(self, levels: Iterable[int]):
self.order = list(levels)
self._cursor = 0
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
pools: DefaultDict[int, List[Spot]] = defaultdict(list)
for spot in candidates:
pools[spot.level].append(spot)
if not pools:
return None
if not self.order:
self.order = sorted(pools.keys())
for _ in range(len(self.order)):
level = self.order[self._cursor % len(self.order)]
self._cursor += 1
bucket = pools.get(level)
if bucket:
return bucket[0]
fallback_origin = request.preferred_level if request.preferred_level is not None else self.order[0]
fallback = sorted(
(spot for level_spots in pools.values() for spot in level_spots),
key=lambda s: abs(fallback_origin - s.level),
)
return fallback[0] if fallback else None
class PricingStrategy(Protocol):
def compute(self, request: AllocationRequest) -> float:
...
class FlatRatePricing(PricingStrategy):
RATES = {"compact": 5, "regular": 8, "large": 12}
def compute(self, request: AllocationRequest) -> float:
return self.RATES[request.size] * max(1, request.duration_hours)
class ParkingCampusService:
def __init__(self, repo: SpotRepository, selector: SpotSelector, pricing: PricingStrategy, bus: EventBus):
self.repo = repo
self.selector = selector
self.pricing = pricing
self.bus = bus
self.guard = VehicleSizeGuard()
self.guard.set_next(SpotAvailabilityGuard())
bus.subscribe("parking.requested", self._handle_request)
bus.subscribe("parking.checkout", self._handle_checkout)
def _handle_request(self, payload: dict) -> None:
request: AllocationRequest = payload["request"]
candidates = list(self.repo.available(level=request.preferred_level))
spot = self.selector.choose(request, candidates)
if not spot:
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "NO_SPOT"})
return
failure = self.guard.handle({"request": request, "spot": spot})
if failure:
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": failure})
return
lifecycle = SpotLifecycle(spot)
if not lifecycle.reserve() or not lifecycle.occupy(request.vehicle_id):
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "STATE"})
return
self.repo.update(spot.spot_id, spot)
fee = self.pricing.compute(request)
receipt = AllocationReceipt(spot_id=spot.spot_id, fee=fee)
self.bus.publish(
"parking.allocated",
{"vehicle": request.vehicle_id, "spot": spot.spot_id, "fee": fee, "gate": payload.get("gate"), "receipt": receipt},
)
def _handle_checkout(self, payload: dict) -> None:
spot = self.repo.get(payload["spot_id"])
if not spot:
return
lifecycle = SpotLifecycle(spot)
if lifecycle.vacate():
self.repo.update(spot.spot_id, spot)
self.bus.publish("parking.released", {"spot": spot.spot_id})
class EntryGate:
def __init__(self, gate_id: str, bus: EventBus) -> None:
self.gate_id = gate_id
self.bus = bus
def enqueue(self, request: AllocationRequest) -> Command:
payload = {"request": request, "gate": self.gate_id}
return PublishCommand(self.bus, "parking.requested", payload)
class ReleaseCommand(Command):
def __init__(self, bus: EventBus, spot_id: str):
self.bus = bus
self.spot_id = spot_id
def execute(self) -> None:
self.bus.publish("parking.checkout", {"spot_id": self.spot_id})
class AllocationProjector:
def __init__(self, bus: EventBus) -> None:
self.assignments: Dict[str, str] = {}
self.rejections: List[Tuple[str, str]] = []
bus.subscribe("parking.allocated", self._on_allocated)
bus.subscribe("parking.rejected", self._on_rejected)
bus.subscribe("parking.released", self._on_released)
def _on_allocated(self, payload: dict) -> None:
self.assignments[payload["vehicle"]] = payload["spot"]
def _on_rejected(self, payload: dict) -> None:
self.rejections.append((payload["vehicle"], payload["reason"]))
def _on_released(self, payload: dict) -> None:
released_spot = payload["spot"]
for vehicle, spot in list(self.assignments.items()):
if spot == released_spot:
del self.assignments[vehicle]
break
def main() -> None:
spots = [
Spot("C1", level=0, size="compact"),
Spot("C2", level=0, size="compact"),
Spot("R1", level=1, size="regular"),
Spot("R2", level=1, size="regular"),
Spot("L1", level=2, size="large"),
]
repo = InMemorySpotRepository(spots)
bus = EventBus()
selector = RoundRobinLevelSelector(repo.levels())
campus = ParkingCampusService(repo, selector, FlatRatePricing(), bus)
projector = AllocationProjector(bus)
gates = [EntryGate("G1", bus), EntryGate("G2", bus), EntryGate("G3", bus)]
requests = [
AllocationRequest(vehicle_id="CAR-1", size="regular", duration_hours=2, preferred_level=1),
AllocationRequest(vehicle_id="CAR-2", size="regular", duration_hours=1),
AllocationRequest(vehicle_id="BIKE-3", size="compact", duration_hours=3, preferred_level=0),
AllocationRequest(vehicle_id="SUV-4", size="large", duration_hours=4, preferred_level=2),
]
for gate, request in zip(gates * 2, requests):
gate.enqueue(request).execute()
bus.pump()
allocated = list(projector.assignments.items())
if allocated:
_, spot_id = allocated[0]
ReleaseCommand(bus, spot_id).execute()
bus.pump()
print("Assignments:", projector.assignments)
print("Rejections:", projector.rejections)
if __name__ == "__main__":
main()
Level 3 — Resilient Orchestration
Level 3 stress-tests the system by assuming external payments and ticketing can fail, so the dispatcher is wrapped in a saga coordinator that tracks every side effect. We guard external calls with circuit breakers and retry policies, and we queue compensating commands that can unwind reservations if a downstream dependency stays unhealthy. This level highlights how the event log plus explicit state machines let us recover deterministically even when part of the campus is degraded.
from __future__ import annotations
import random
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Optional, Protocol, Tuple
class EventBus:
def __init__(self) -> None:
self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
self._queue: Deque[Tuple[str, dict]] = deque()
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers[event].append(handler)
def publish(self, event: str, payload: dict) -> None:
self._queue.append((event, payload))
def pump(self) -> None:
while self._queue:
event, payload = self._queue.popleft()
for handler in list(self._subscribers.get(event, [])):
handler(payload)
class Command(Protocol):
def execute(self) -> None:
...
class PublishCommand:
def __init__(self, bus: EventBus, event: str, payload: dict) -> None:
self.bus = bus
self.event = event
self.payload = payload
def execute(self) -> None:
self.bus.publish(self.event, self.payload)
class ReleaseCommand(Command):
def __init__(self, bus: EventBus, spot_id: str):
self.bus = bus
self.spot_id = spot_id
def execute(self) -> None:
self.bus.publish("parking.checkout", {"spot_id": self.spot_id})
class SpotRepository(Protocol):
def available(self, *, level: Optional[int] = None) -> Iterable["Spot"]:
...
def get(self, spot_id: str) -> Optional["Spot"]:
...
def update(self, spot_id: str, spot: "Spot") -> None:
...
def levels(self) -> Iterable[int]:
...
@dataclass
class Spot:
spot_id: str
level: int
size: str
state: str = "VACANT"
vehicle_id: Optional[str] = None
@dataclass
class AllocationRequest:
vehicle_id: str
size: str
duration_hours: int
preferred_level: Optional[int] = None
@dataclass
class AllocationReceipt:
spot_id: str
fee: float
class BaseGuard:
def __init__(self) -> None:
self._next: Optional["BaseGuard"] = None
def set_next(self, nxt: "BaseGuard") -> "BaseGuard":
self._next = nxt
return nxt
def check(self, context: dict) -> Optional[str]:
return None
def handle(self, context: dict) -> Optional[str]:
failure = self.check(context)
if failure or not self._next:
return failure
return self._next.handle(context)
class VehicleSizeGuard(BaseGuard):
ORDER = {"compact": 0, "regular": 1, "large": 2}
def check(self, context: dict) -> Optional[str]:
request: AllocationRequest = context["request"]
spot: Spot = context["spot"]
try:
if self.ORDER[spot.size] < self.ORDER[request.size]:
return "SIZE_MISMATCH"
except KeyError:
return "UNKNOWN_SIZE"
return None
class SpotAvailabilityGuard(BaseGuard):
def check(self, context: dict) -> Optional[str]:
spot: Spot = context["spot"]
if spot.state != "VACANT":
return "TAKEN"
return None
class SpotLifecycle:
def __init__(self, spot: Spot) -> None:
self.spot = spot
def reserve(self) -> bool:
if self.spot.state != "VACANT":
return False
self.spot.state = "HELD"
return True
def occupy(self, vehicle_id: str) -> bool:
if self.spot.state != "HELD":
return False
self.spot.state = "OCCUPIED"
self.spot.vehicle_id = vehicle_id
return True
def vacate(self) -> bool:
if self.spot.state != "OCCUPIED":
return False
self.spot.state = "VACANT"
self.spot.vehicle_id = None
return True
class InMemorySpotRepository(SpotRepository):
def __init__(self, spots: Iterable[Spot]):
self._spots: Dict[str, Spot] = {spot.spot_id: spot for spot in spots}
def available(self, *, level: Optional[int] = None) -> Iterable[Spot]:
return [
spot
for spot in self._spots.values()
if spot.state == "VACANT" and (level is None or spot.level == level)
]
def get(self, spot_id: str) -> Optional[Spot]:
return self._spots.get(spot_id)
def update(self, spot_id: str, spot: Spot) -> None:
self._spots[spot_id] = spot
def levels(self) -> Iterable[int]:
return sorted({spot.level for spot in self._spots.values()})
class SpotSelector(Protocol):
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
...
class RoundRobinLevelSelector:
def __init__(self, levels: Iterable[int]):
self.order = list(levels)
self._cursor = 0
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
pools: DefaultDict[int, List[Spot]] = defaultdict(list)
for spot in candidates:
pools[spot.level].append(spot)
if not pools:
return None
if not self.order:
self.order = sorted(pools.keys())
for _ in range(len(self.order)):
level = self.order[self._cursor % len(self.order)]
self._cursor += 1
bucket = pools.get(level)
if bucket:
return bucket[0]
fallback_origin = request.preferred_level if request.preferred_level is not None else self.order[0]
fallback = sorted(
(spot for level_spots in pools.values() for spot in level_spots),
key=lambda s: abs(fallback_origin - s.level),
)
return fallback[0] if fallback else None
class PricingStrategy(Protocol):
def compute(self, request: AllocationRequest) -> float:
...
class FlatRatePricing(PricingStrategy):
RATES = {"compact": 5, "regular": 8, "large": 12}
def compute(self, request: AllocationRequest) -> float:
return self.RATES[request.size] * max(1, request.duration_hours)
class ParkingCampusService:
def __init__(self, repo: SpotRepository, selector: SpotSelector, pricing: PricingStrategy, bus: EventBus):
self.repo = repo
self.selector = selector
self.pricing = pricing
self.bus = bus
self.guard = VehicleSizeGuard()
self.guard.set_next(SpotAvailabilityGuard())
bus.subscribe("parking.requested", self._handle_request)
bus.subscribe("parking.checkout", self._handle_checkout)
def _handle_request(self, payload: dict) -> None:
request: AllocationRequest = payload["request"]
candidates = list(self.repo.available(level=request.preferred_level))
spot = self.selector.choose(request, candidates)
if not spot:
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "NO_SPOT"})
return
failure = self.guard.handle({"request": request, "spot": spot})
if failure:
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": failure})
return
lifecycle = SpotLifecycle(spot)
if not lifecycle.reserve() or not lifecycle.occupy(request.vehicle_id):
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "STATE"})
return
self.repo.update(spot.spot_id, spot)
fee = self.pricing.compute(request)
receipt = AllocationReceipt(spot_id=spot.spot_id, fee=fee)
self.bus.publish(
"parking.allocated",
{"vehicle": request.vehicle_id, "spot": spot.spot_id, "fee": fee, "gate": payload.get("gate"), "receipt": receipt},
)
def _handle_checkout(self, payload: dict) -> None:
spot = self.repo.get(payload["spot_id"])
if not spot:
return
lifecycle = SpotLifecycle(spot)
if lifecycle.vacate():
self.repo.update(spot.spot_id, spot)
self.bus.publish("parking.released", {"spot": spot.spot_id})
class PaymentGateway:
def __init__(self, failure_rate: float = 0.35):
self.failure_rate = failure_rate
def charge(self, vehicle_id: str, amount: float) -> str:
if random.random() < self.failure_rate:
raise RuntimeError("payment timeout")
return f"receipt::{vehicle_id}::{int(time.time() * 1000)}::{amount:.2f}"
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "CLOSED"
def call(self, func: Callable[[], str]) -> str:
now = time.time()
if self.state == "OPEN":
if now - self.last_failure_time < self.recovery_timeout:
raise RuntimeError("Circuit open")
self.state = "HALF_OPEN"
try:
result = func()
except Exception:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
else:
self.failure_count = 0
self.state = "CLOSED"
return result
class RetryPolicy:
def __init__(self, attempts: int, backoff: float):
self.attempts = attempts
self.backoff = backoff
def call(self, func: Callable[[], str]) -> str:
last_error: Optional[Exception] = None
for attempt in range(self.attempts):
try:
return func()
except Exception as exc:
last_error = exc
time.sleep(self.backoff * (attempt + 1))
if last_error:
raise last_error
raise RuntimeError("retry exhausted")
class PaymentProcessor:
def __init__(self, bus: EventBus, gateway: PaymentGateway, breaker: CircuitBreaker, retry: RetryPolicy):
self.bus = bus
self.gateway = gateway
self.breaker = breaker
self.retry = retry
bus.subscribe("parking.allocated", self._handle_allocated)
def _handle_allocated(self, payload: dict) -> None:
vehicle = payload["vehicle"]
spot_id = payload["spot"]
amount = payload["fee"]
def attempt() -> str:
return self.breaker.call(lambda: self.gateway.charge(vehicle, amount))
try:
receipt = self.retry.call(attempt)
self.bus.publish("parking.confirmed", {"vehicle": vehicle, "spot": spot_id, "receipt": receipt})
except Exception as exc:
self.bus.publish("parking.payment_failed", {"vehicle": vehicle, "spot": spot_id, "reason": str(exc)})
class ManualTicketCommand(Command):
def __init__(self, vehicle_id: str, spot_id: str):
self.vehicle_id = vehicle_id
self.spot_id = spot_id
self.ticket: Optional[str] = None
def execute(self) -> None:
self.ticket = f"manual-ticket::{self.vehicle_id}@{self.spot_id}"
class ParkingSaga:
def __init__(self, bus: EventBus):
self.bus = bus
bus.subscribe("parking.payment_failed", self._compensate)
def _compensate(self, payload: dict) -> None:
vehicle = payload["vehicle"]
spot_id = payload["spot"]
ticket_cmd = ManualTicketCommand(vehicle, spot_id)
ticket_cmd.execute()
ReleaseCommand(self.bus, spot_id).execute()
self.bus.publish(
"parking.fallback",
{"vehicle": vehicle, "spot": spot_id, "ticket": ticket_cmd.ticket, "reason": payload["reason"]},
)
class EntryGate:
def __init__(self, gate_id: str, bus: EventBus) -> None:
self.gate_id = gate_id
self.bus = bus
def enqueue(self, request: AllocationRequest) -> Command:
payload = {"request": request, "gate": self.gate_id}
return PublishCommand(self.bus, "parking.requested", payload)
class AllocationProjection:
def __init__(self, bus: EventBus) -> None:
self.pending: Dict[str, str] = {}
self.confirmed: Dict[str, str] = {}
self.fallbacks: Dict[str, str] = {}
self.rejections: List[Tuple[str, str]] = []
bus.subscribe("parking.allocated", self._on_allocated)
bus.subscribe("parking.confirmed", self._on_confirmed)
bus.subscribe("parking.fallback", self._on_fallback)
bus.subscribe("parking.released", self._on_released)
bus.subscribe("parking.rejected", self._on_rejected)
def _on_allocated(self, payload: dict) -> None:
self.pending[payload["vehicle"]] = payload["spot"]
def _on_confirmed(self, payload: dict) -> None:
vehicle = payload["vehicle"]
spot = payload["spot"]
self.confirmed[vehicle] = spot
self.pending.pop(vehicle, None)
def _on_fallback(self, payload: dict) -> None:
vehicle = payload["vehicle"]
self.fallbacks[vehicle] = payload["ticket"]
self.pending.pop(vehicle, None)
def _on_released(self, payload: dict) -> None:
released_spot = payload["spot"]
for bucket in (self.pending, self.confirmed):
for vehicle, spot in list(bucket.items()):
if spot == released_spot:
del bucket[vehicle]
def _on_rejected(self, payload: dict) -> None:
self.rejections.append((payload["vehicle"], payload["reason"]))
class ParkingMetrics:
def __init__(self, bus: EventBus) -> None:
self.snapshot: Dict[str, int] = {"confirmed": 0, "fallback": 0, "rejected": 0}
bus.subscribe("parking.confirmed", self._on_confirmed)
bus.subscribe("parking.fallback", self._on_fallback)
bus.subscribe("parking.rejected", self._on_rejected)
def _on_confirmed(self, _: dict) -> None:
self.snapshot["confirmed"] += 1
def _on_fallback(self, _: dict) -> None:
self.snapshot["fallback"] += 1
def _on_rejected(self, _: dict) -> None:
self.snapshot["rejected"] += 1
def main() -> None:
random.seed(42)
spots = [
Spot("C1", level=0, size="compact"),
Spot("R1", level=1, size="regular"),
Spot("R2", level=1, size="regular"),
Spot("L1", level=2, size="large"),
Spot("L2", level=2, size="large"),
]
repo = InMemorySpotRepository(spots)
bus = EventBus()
selector = RoundRobinLevelSelector(repo.levels())
pricing = FlatRatePricing()
ParkingCampusService(repo, selector, pricing, bus)
PaymentProcessor(bus, PaymentGateway(failure_rate=0.4), CircuitBreaker(3, 1.5), RetryPolicy(3, 0.05))
ParkingSaga(bus)
projection = AllocationProjection(bus)
metrics = ParkingMetrics(bus)
gates = [EntryGate("Gate-A", bus), EntryGate("Gate-B", bus)]
requests = [
AllocationRequest("CAR-101", "regular", 2, preferred_level=1),
AllocationRequest("SUV-202", "large", 4, preferred_level=2),
AllocationRequest("BIKE-303", "compact", 1, preferred_level=0),
AllocationRequest("CAR-404", "regular", 3),
AllocationRequest("SUV-505", "large", 5, preferred_level=2),
AllocationRequest("TRUCK-606", "large", 2),
]
for gate, request in zip(gates * 3, requests):
gate.enqueue(request).execute()
bus.pump()
confirmed_snapshot = dict(projection.confirmed)
for _, spot in list(projection.confirmed.items())[:1]:
ReleaseCommand(bus, spot).execute()
bus.pump()
print("Confirmed:", confirmed_snapshot)
print("Fallback:", projection.fallbacks)
print("Pending:", projection.pending)
print("Rejections:", projection.rejections)
print("Metrics:", metrics.snapshot)
if __name__ == "__main__":
main()
Incrementally deliver a food ordering platform: start with observer-driven notifications, scale to prioritized concurrent dispatch, and finally orchestrate resilient fulfillment with sagas and circuit breakers.
Food Delivery Platform ├─ Level 1: OrderService → Observers ├─ Level 2: Priority Queue Dispatchers → Partner Utilization └─ Level 3: Saga Orchestrator → Payment Breaker → Courier Assignment
Level 1 — Core Ordering Flow
The introductory food-delivery level focuses on getting the happy path crisp: capture orders in a repository abstraction and broadcast updates through observer callbacks. We track status transitions such as PLACED and PICKED explicitly, which keeps restaurant and courier notifications loosely coupled. Because dependencies sit behind protocols the core service stays easy to test and primes us for scheduling upgrades later.
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Protocol
class DeliveryObserver(Protocol):
def notify(self, order_id: str, details: str) -> None:
...
@dataclass
class DeliveryPartner:
partner_id: str
notifications: List[str] = field(default_factory=list)
def notify(self, order_id: str, details: str) -> None:
message = f"Order {order_id}: {details}"
self.notifications.append(message)
print(f"Notify {self.partner_id}: {message}")
@dataclass
class Order:
order_id: str
restaurant: str
status: str = "PLACED"
class OrderRepository:
def __init__(self):
self.orders: Dict[str, Order] = {}
def save(self, order: Order) -> None:
self.orders[order.order_id] = order
def update_status(self, order_id: str, status: str) -> None:
self.orders[order_id].status = status
class OrderService:
def __init__(self, repository: OrderRepository):
self.repo = repository
self.observers: List[DeliveryObserver] = []
def register_partner(self, partner: DeliveryObserver) -> None:
self.observers.append(partner)
def place_order(self, order_id: str, restaurant: str) -> Order:
order = Order(order_id, restaurant)
self.repo.save(order)
for observer in self.observers:
observer.notify(order_id, f"from {restaurant} awaiting pickup")
return order
def mark_picked(self, order_id: str) -> None:
self.repo.update_status(order_id, "PICKED")
for observer in self.observers:
observer.notify(order_id, "picked up")
if __name__ == "__main__":
repo = OrderRepository()
service = OrderService(repo)
p1, p2 = DeliveryPartner("DP1"), DeliveryPartner("DP2")
service.register_partner(p1)
service.register_partner(p2)
service.place_order("O1", "Spicy Bites")
service.mark_picked("O1")
Level 2 — Concurrent Dispatch
Level 2 keeps the same order service but introduces a dispatcher that accepts prioritized jobs via a heap so urgent tickets pre-empt routine ones. We let worker threads pull from the queue while a balancing strategy rotates through available partners, demonstrating how to add concurrency without violating encapsulation. Hold timers, status tracking, and thread-safe registries make sure assignment and acknowledgement race conditions are surfaced during practice.
from __future__ import annotations
import heapq
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Optional, Protocol
class DeliveryObserver(Protocol):
def notify(self, order_id: str, details: str) -> None:
...
@dataclass
class DeliveryPartner:
partner_id: str
notifications: List[str] = field(default_factory=list)
def notify(self, order_id: str, details: str) -> None:
message = f"Order {order_id}: {details}"
self.notifications.append(message)
print(f"Notify {self.partner_id}: {message}")
@dataclass
class Order:
order_id: str
restaurant: str
priority: int = 1
status: str = "PLACED"
@dataclass(order=True)
class DispatchItem:
sort_key: int
created_at: float
order: Order = field(compare=False)
class OrderRepository:
def __init__(self) -> None:
self.orders: Dict[str, Order] = {}
def save(self, order: Order) -> None:
self.orders[order.order_id] = order
def update_status(self, order_id: str, status: str) -> None:
self.orders[order_id].status = status
def get(self, order_id: str) -> Order:
return self.orders[order_id]
class OrderService:
def __init__(self, repository: OrderRepository) -> None:
self.repo = repository
self.observers: List[DeliveryObserver] = []
self.dispatcher: Optional["PriorityDispatchCoordinator"] = None
def register_partner(self, partner: DeliveryObserver) -> None:
self.observers.append(partner)
def attach_dispatcher(self, dispatcher: "PriorityDispatchCoordinator") -> None:
self.dispatcher = dispatcher
def place_order(self, order_id: str, restaurant: str, priority: int = 1) -> Order:
order = Order(order_id, restaurant, priority)
self.repo.save(order)
for observer in self.observers:
observer.notify(order_id, f"from {restaurant} awaiting pickup")
if self.dispatcher:
self.dispatcher.enqueue(order)
return order
def mark_picked(self, order_id: str) -> None:
self.repo.update_status(order_id, "PICKED")
for observer in self.observers:
observer.notify(order_id, "picked up")
if self.dispatcher:
self.dispatcher.complete(order_id)
def cancel_order(self, order_id: str, reason: str = "CANCELLED") -> None:
self.repo.update_status(order_id, reason)
for observer in self.observers:
observer.notify(order_id, f"cancelled: {reason}")
if self.dispatcher:
self.dispatcher.complete(order_id)
class PriorityDispatchCoordinator:
def __init__(self, service: OrderService, partners: Dict[str, DeliveryPartner], worker_count: int = 3):
self.service = service
self.partners = partners
self.partner_load: Dict[str, int] = {pid: 0 for pid in partners}
self.queue: List[DispatchItem] = []
self.queue_cond = threading.Condition()
self.load_lock = threading.Lock()
self.assignments: Dict[str, str] = {}
self.running = True
self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(worker_count)]
for worker in self.workers:
worker.start()
def enqueue(self, order: Order) -> None:
with self.queue_cond:
sort_key = -order.priority
heapq.heappush(self.queue, DispatchItem(sort_key, time.time(), order))
self.queue_cond.notify()
def _pick_partner(self) -> DeliveryPartner:
with self.load_lock:
partner_id = min(self.partner_load, key=self.partner_load.get)
self.partner_load[partner_id] += 1
return self.partners[partner_id]
def complete(self, order_id: str) -> None:
partner_id = self.assignments.pop(order_id, None)
if partner_id:
with self.load_lock:
self.partner_load[partner_id] -= 1
def _worker(self) -> None:
while self.running:
with self.queue_cond:
while not self.queue and self.running:
self.queue_cond.wait()
if not self.running:
return
item = heapq.heappop(self.queue)
partner = self._pick_partner()
self.assignments[item.order.order_id] = partner.partner_id
partner.notify(item.order.order_id, f"pickup ready from {item.order.restaurant}")
time.sleep(0.1)
def shutdown(self) -> None:
self.running = False
with self.queue_cond:
self.queue_cond.notify_all()
for worker in self.workers:
worker.join(timeout=0.2)
def main() -> None:
repo = OrderRepository()
service = OrderService(repo)
partners = {
"DP1": DeliveryPartner("DP1"),
"DP2": DeliveryPartner("DP2"),
"DP3": DeliveryPartner("DP3"),
}
for partner in partners.values():
service.register_partner(partner)
dispatcher = PriorityDispatchCoordinator(service, partners, worker_count=2)
service.attach_dispatcher(dispatcher)
for priority, order_id in enumerate(["O1", "O2", "O3", "O4"], start=1):
service.place_order(order_id, "Fusion Kitchen", priority)
time.sleep(0.4)
for order_id in ["O1", "O2", "O3", "O4"]:
service.mark_picked(order_id)
dispatcher.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Fulfillment
The final food-delivery level treats external partners as unreliable, so the dispatcher now emits commands into a saga that records each step and compensation. We wrap payment and courier integrations with circuit breakers and bounded retries, and we persist intent so that a crash mid-flow can be replayed safely. Learners can observe how the same domain events from earlier levels now feed resilience concerns without rewriting the booking pipeline.
from __future__ import annotations
import heapq
import random
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Callable, Deque, Dict, List, Optional, Protocol
class DeliveryObserver(Protocol):
def notify(self, order_id: str, details: str) -> None:
...
@dataclass
class DeliveryPartner:
partner_id: str
notifications: List[str] = field(default_factory=list)
def notify(self, order_id: str, details: str) -> None:
message = f"Order {order_id}: {details}"
self.notifications.append(message)
print(f"Notify {self.partner_id}: {message}")
@dataclass
class Order:
order_id: str
restaurant: str
priority: int = 1
status: str = "PLACED"
@dataclass(order=True)
class DispatchItem:
sort_key: int
created_at: float
order: Order = field(compare=False)
class OrderRepository:
def __init__(self) -> None:
self.orders: Dict[str, Order] = {}
def save(self, order: Order) -> None:
self.orders[order.order_id] = order
def update_status(self, order_id: str, status: str) -> None:
self.orders[order_id].status = status
def get(self, order_id: str) -> Order:
return self.orders[order_id]
class OrderService:
def __init__(self, repository: OrderRepository) -> None:
self.repo = repository
self.observers: List[DeliveryObserver] = []
self.dispatcher: Optional["PriorityDispatchCoordinator"] = None
def register_partner(self, partner: DeliveryObserver) -> None:
self.observers.append(partner)
def attach_dispatcher(self, dispatcher: "PriorityDispatchCoordinator") -> None:
self.dispatcher = dispatcher
def place_order(self, order_id: str, restaurant: str, priority: int = 1) -> Order:
order = Order(order_id, restaurant, priority)
self.repo.save(order)
for observer in self.observers:
observer.notify(order_id, f"from {restaurant} awaiting pickup")
if self.dispatcher:
self.dispatcher.enqueue(order)
return order
def mark_picked(self, order_id: str) -> None:
self.repo.update_status(order_id, "PICKED")
for observer in self.observers:
observer.notify(order_id, "picked up")
if self.dispatcher:
self.dispatcher.complete(order_id)
def cancel_order(self, order_id: str, reason: str = "CANCELLED") -> None:
self.repo.update_status(order_id, reason)
for observer in self.observers:
observer.notify(order_id, f"cancelled: {reason}")
if self.dispatcher:
self.dispatcher.complete(order_id)
class PriorityDispatchCoordinator:
def __init__(self, service: OrderService, partners: Dict[str, DeliveryPartner], worker_count: int = 3):
self.service = service
self.partners = partners
self.partner_load: Dict[str, int] = {pid: 0 for pid in partners}
self.queue: List[DispatchItem] = []
self.queue_cond = threading.Condition()
self.load_lock = threading.Lock()
self.assignments: Dict[str, str] = {}
self.running = True
self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(worker_count)]
for worker in self.workers:
worker.start()
def enqueue(self, order: Order) -> None:
with self.queue_cond:
sort_key = -order.priority
heapq.heappush(self.queue, DispatchItem(sort_key, time.time(), order))
self.queue_cond.notify()
def _pick_partner(self) -> DeliveryPartner:
with self.load_lock:
partner_id = min(self.partner_load, key=self.partner_load.get)
self.partner_load[partner_id] += 1
return self.partners[partner_id]
def complete(self, order_id: str) -> None:
partner_id = self.assignments.pop(order_id, None)
if partner_id:
with self.load_lock:
self.partner_load[partner_id] -= 1
def _worker(self) -> None:
while self.running:
with self.queue_cond:
while not self.queue and self.running:
self.queue_cond.wait()
if not self.running:
return
item = heapq.heappop(self.queue)
partner = self._pick_partner()
self.assignments[item.order.order_id] = partner.partner_id
partner.notify(item.order.order_id, f"pickup ready from {item.order.restaurant}")
time.sleep(0.1)
def shutdown(self) -> None:
self.running = False
with self.queue_cond:
self.queue_cond.notify_all()
for worker in self.workers:
worker.join(timeout=0.2)
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func: Callable[[], str]) -> str:
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("Payment service unavailable (circuit open)")
try:
result = func()
except Exception:
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
else:
with self.lock:
self.failure_count = 0
self.state = "CLOSED"
return result
class RetryPolicy:
def __init__(self, attempts: int, base_delay: float):
self.attempts = attempts
self.base_delay = base_delay
def call(self, func: Callable[[], str]) -> str:
last_err: Optional[Exception] = None
for attempt in range(self.attempts):
try:
return func()
except Exception as exc:
last_err = exc
time.sleep(self.base_delay * (2 ** attempt))
raise last_err if last_err else RuntimeError("Unknown failure")
class SagaStep:
def __init__(self, action: Callable[[], str], compensate: Callable[[], None]):
self.action = action
self.compensate = compensate
class SagaOrchestrator:
def __init__(self, steps: List[SagaStep]):
self.steps = steps
def execute(self) -> str:
executed: List[SagaStep] = []
try:
for step in self.steps:
result = step.action()
executed.append(step)
print(result)
return "SAGA_COMPLETED"
except Exception as exc:
print(f"Saga failed: {exc}. Rolling back…")
for step in reversed(executed):
step.compensate()
return "SAGA_ROLLED_BACK"
class PaymentGateway:
def __init__(self, failure_rate: float = 0.5):
self.failure_rate = failure_rate
def charge(self, amount: float) -> str:
if random.random() < self.failure_rate:
raise RuntimeError("Payment declined")
return f"receipt-{int(time.time() * 1000)}"
class CourierPool:
def __init__(self, couriers: List[str]):
self.available: Deque[str] = deque(couriers)
self.lock = threading.Lock()
def acquire(self) -> str:
with self.lock:
if not self.available:
raise RuntimeError("No courier available")
return self.available.popleft()
def release(self, courier_id: str) -> None:
with self.lock:
self.available.append(courier_id)
@dataclass
class FulfillmentContext:
order_id: str
restaurant_reserved: bool = False
courier_id: Optional[str] = None
payment_receipt: Optional[str] = None
class ResilientFoodCoordinator:
def __init__(self,
service: OrderService,
dispatcher: PriorityDispatchCoordinator,
payment_gateway: PaymentGateway,
breaker: CircuitBreaker,
retry_policy: RetryPolicy,
couriers: CourierPool):
self.service = service
self.dispatcher = dispatcher
self.payment_gateway = payment_gateway
self.breaker = breaker
self.retry_policy = retry_policy
self.couriers = couriers
self.metrics_lock = threading.Lock()
self.metrics = {"success": 0, "rolled_back": 0}
self.dead_letter: List[str] = []
def fulfill(self, order_id: str, restaurant: str, priority: int, amount: float) -> str:
order = self.service.place_order(order_id, restaurant, priority)
ctx = FulfillmentContext(order_id=order_id)
def reserve_restaurant() -> str:
ctx.restaurant_reserved = True
return f"Restaurant reserved for {order_id}"
def cancel_restaurant() -> None:
if ctx.restaurant_reserved:
print(f"Compensate: release restaurant for {order_id}")
ctx.restaurant_reserved = False
def charge_payment() -> str:
receipt = self.retry_policy.call(
lambda: self.breaker.call(lambda: self.payment_gateway.charge(amount))
)
ctx.payment_receipt = receipt
return f"Payment captured {receipt}"
def refund_payment() -> None:
if ctx.payment_receipt:
print(f"Compensate: refund {ctx.payment_receipt} for {order_id}")
ctx.payment_receipt = None
def assign_courier() -> str:
courier_id = self.couriers.acquire()
ctx.courier_id = courier_id
return f"Courier {courier_id} assigned to {order_id}"
def release_courier() -> None:
if ctx.courier_id:
print(f"Compensate: release courier {ctx.courier_id} for {order_id}")
self.couriers.release(ctx.courier_id)
ctx.courier_id = None
saga = SagaOrchestrator([
SagaStep(reserve_restaurant, cancel_restaurant),
SagaStep(charge_payment, refund_payment),
SagaStep(assign_courier, release_courier),
])
outcome = saga.execute()
if outcome == "SAGA_COMPLETED":
self.service.mark_picked(order_id)
if ctx.courier_id:
self.couriers.release(ctx.courier_id)
ctx.courier_id = None
with self.metrics_lock:
self.metrics["success"] += 1
else:
self.service.cancel_order(order_id, "FAILED")
self.dead_letter.append(order_id)
with self.metrics_lock:
self.metrics["rolled_back"] += 1
return outcome
def snapshot(self) -> Dict[str, int]:
with self.metrics_lock:
return dict(self.metrics)
def main() -> None:
random.seed(9)
repo = OrderRepository()
service = OrderService(repo)
partners = {
"DP1": DeliveryPartner("DP1"),
"DP2": DeliveryPartner("DP2"),
}
for partner in partners.values():
service.register_partner(partner)
dispatcher = PriorityDispatchCoordinator(service, partners, worker_count=2)
service.attach_dispatcher(dispatcher)
coordinator = ResilientFoodCoordinator(
service=service,
dispatcher=dispatcher,
payment_gateway=PaymentGateway(failure_rate=0.4),
breaker=CircuitBreaker(failure_threshold=2, recovery_timeout=0.5),
retry_policy=RetryPolicy(attempts=3, base_delay=0.05),
couriers=CourierPool(["C1", "C2"])
)
orders = [("FO-101", "Spicy Bites", 3, 25.0), ("FO-102", "Fusion Kitchen", 1, 18.0), ("FO-103", "Veg Delight", 2, 22.0)]
for order_id, restaurant, priority, amount in orders:
outcome = coordinator.fulfill(order_id, restaurant, priority, amount)
print(f"Outcome for {order_id}: {outcome}")
time.sleep(0.1)
print("Metrics:", coordinator.snapshot())
if coordinator.dead_letter:
print("Dead letters:", coordinator.dead_letter)
dispatcher.shutdown()
if __name__ == "__main__":
main()
Evolve the ride sharing platform from nearest-driver matching to surge-aware concurrent dispatch and finally wrap it with resilience patterns for mobility services.
Ride Sharing Platform ├─ Level 1: Matching Strategy → Driver Repository ├─ Level 2: Zone Locks → Surge Manager └─ Level 3: Resilient Dispatcher → Cache Fallback
Level 1 — Matching Engine
Level 1 for ride sharing emphasises a clean separation between the repository of drivers and the matching heuristic so we can experiment without rewriting infrastructure. We compute nearest drivers by delegating to a pluggable strategy, returning DTOs instead of raw storage objects which clarifies the boundary. By practising with deterministic repositories learners experience how to unit test the matching flow before concurrency enters the picture.
from dataclasses import dataclass
from typing import Dict, Protocol, Tuple, Optional
import math
@dataclass
class Driver:
driver_id: str
location: Tuple[float, float]
available: bool = True
zone: str = "default"
class MatchStrategy(Protocol):
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
...
class NearestMatch(MatchStrategy):
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
best_driver = None
best_distance = float('inf')
rx, ry = rider_loc
for driver in drivers.values():
if not driver.available:
continue
dx, dy = driver.location
dist = math.hypot(rx - dx, ry - dy)
if dist < best_distance:
best_distance, best_driver = dist, driver
return best_driver
class RideSharingService:
def __init__(self, strategy: MatchStrategy):
self.strategy = strategy
self.drivers: Dict[str, Driver] = {}
self.assignments: Dict[str, str] = {}
def add_driver(self, driver: Driver) -> None:
self.drivers[driver.driver_id] = driver
def request_ride(self, rider_id: str, location: Tuple[float, float]) -> str:
driver = self.strategy.pick(self.drivers, location)
if not driver:
raise RuntimeError('No drivers available')
driver.available = False
self.assignments[rider_id] = driver.driver_id
return driver.driver_id
def complete_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id)
self.drivers[driver_id].available = True
def cancel_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id, None)
if driver_id:
self.drivers[driver_id].available = True
if __name__ == "__main__":
service = RideSharingService(NearestMatch())
service.add_driver(Driver('D1', (12.9, 77.6), zone="north"))
service.add_driver(Driver('D2', (12.95, 77.58), zone="south"))
assigned = service.request_ride('R1', (12.92, 77.59))
print('Assigned driver:', assigned)
service.complete_ride('R1')
Level 2 — Concurrent Surge Dispatch
Level 2 acknowledges that requests arrive simultaneously, so we partition the city into zones guarded by locks and audit the flow for potential races. A surge calculator feeds into the assignment process, demonstrating how to enrich pricing while still respecting isolation between zones. The result shows how to stabilise throughput by combining contention control with clear separation of immutable ride requests and mutable driver state.
from __future__ import annotations
import math
import threading
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Optional
@dataclass
class Driver:
driver_id: str
location: Tuple[float, float]
available: bool = True
zone: str = "default"
class MatchStrategy:
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
raise NotImplementedError
class NearestMatch(MatchStrategy):
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
best_driver = None
best_distance = float("inf")
rx, ry = rider_loc
for driver in drivers.values():
if not driver.available:
continue
dx, dy = driver.location
dist = math.hypot(rx - dx, ry - dy)
if dist < best_distance:
best_distance, best_driver = dist, driver
return best_driver
class RideSharingService:
def __init__(self, strategy: MatchStrategy):
self.strategy = strategy
self.drivers: Dict[str, Driver] = {}
self.assignments: Dict[str, str] = {}
def add_driver(self, driver: Driver) -> None:
self.drivers[driver.driver_id] = driver
def request_ride(self, rider_id: str, location: Tuple[float, float]) -> str:
driver = self.strategy.pick(self.drivers, location)
if not driver:
raise RuntimeError("No drivers available")
driver.available = False
self.assignments[rider_id] = driver.driver_id
return driver.driver_id
def complete_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id)
self.drivers[driver_id].available = True
class ZoneLockManager:
def __init__(self) -> None:
self._locks: Dict[str, threading.RLock] = defaultdict(threading.RLock)
def lock(self, zone: str) -> threading.RLock:
return self._locks[zone]
class SurgeManager:
def __init__(self, base_multiplier: float = 1.0, step: float = 0.2) -> None:
self.base_multiplier = base_multiplier
self.step = step
self.multipliers: Dict[str, float] = defaultdict(lambda: self.base_multiplier)
self.lock = threading.Lock()
def bump(self, zone: str) -> float:
with self.lock:
self.multipliers[zone] += self.step
return round(self.multipliers[zone], 2)
def relax(self, zone: str) -> float:
with self.lock:
self.multipliers[zone] = max(self.base_multiplier, self.multipliers[zone] - self.step)
return round(self.multipliers[zone], 2)
def current(self, zone: str) -> float:
with self.lock:
return round(self.multipliers[zone], 2)
class ConcurrentRideSharingService(RideSharingService):
def __init__(self, strategy: MatchStrategy, zone_locks: ZoneLockManager, surge: SurgeManager):
super().__init__(strategy)
self.zone_locks = zone_locks
self.surge = surge
self.assignment_zones: Dict[str, str] = {}
self.metrics_lock = threading.Lock()
self.metrics = {"completed": 0, "contention": 0}
def request_ride(self, rider_id: str, location: Tuple[float, float], zone: str) -> Tuple[str, float]:
lock = self.zone_locks.lock(zone)
acquired = lock.acquire(timeout=0.1)
if not acquired:
with self.metrics_lock:
self.metrics["contention"] += 1
lock.acquire()
try:
driver_id = super().request_ride(rider_id, location)
self.assignment_zones[rider_id] = zone
surge_multiplier = self.surge.bump(zone)
return driver_id, surge_multiplier
finally:
lock.release()
def complete_ride(self, rider_id: str) -> None:
zone = self.assignment_zones.pop(rider_id, "default")
super().complete_ride(rider_id)
self.surge.relax(zone)
with self.metrics_lock:
self.metrics["completed"] += 1
def snapshot(self) -> Dict[str, int]:
with self.metrics_lock:
return dict(self.metrics)
def main() -> None:
zone_locks = ZoneLockManager()
surge = SurgeManager()
service = ConcurrentRideSharingService(NearestMatch(), zone_locks, surge)
service.add_driver(Driver("D1", (12.9, 77.6), zone="central"))
service.add_driver(Driver("D2", (12.91, 77.61), zone="central"))
service.add_driver(Driver("D3", (12.95, 77.58), zone="north"))
def rider_task(rider_id: str) -> None:
try:
driver_id, surge_multiplier = service.request_ride(rider_id, (12.9, 77.6), "central")
print(f"{rider_id} → {driver_id} at surge {surge_multiplier}x")
time.sleep(0.05)
service.complete_ride(rider_id)
except RuntimeError as exc:
print(f"{rider_id} failed: {exc}")
threads = [threading.Thread(target=rider_task, args=(f"R{i}",)) for i in range(1, 6)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Metrics:", service.snapshot())
print("Current surge (central):", surge.current("central"))
if __name__ == "__main__":
main()
Level 3 — Resilient Mobility
Level 3 wraps the concurrent matcher with caching layers and health probes because live systems must ride out partial outages. We front-load driver availability into a read-through cache, invalidate cautiously, and detect stale regions so dispatch can fall back to a slower, durable store when needed. Telemetry hooks and explicit failure modes encourage learners to think about observability alongside correctness.
from __future__ import annotations
import math
import random
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Deque, Dict, Optional, Tuple
@dataclass
class Driver:
driver_id: str
location: Tuple[float, float]
available: bool = True
zone: str = "default"
class MatchStrategy:
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
raise NotImplementedError
class NearestMatch(MatchStrategy):
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
best_driver = None
best_distance = float("inf")
rx, ry = rider_loc
for driver in drivers.values():
if not driver.available:
continue
dx, dy = driver.location
dist = math.hypot(rx - dx, ry - dy)
if dist < best_distance:
best_distance, best_driver = dist, driver
return best_driver
class RideSharingService:
def __init__(self, strategy: MatchStrategy):
self.strategy = strategy
self.drivers: Dict[str, Driver] = {}
self.assignments: Dict[str, str] = {}
def add_driver(self, driver: Driver) -> None:
self.drivers[driver.driver_id] = driver
def request_ride(self, rider_id: str, location: Tuple[float, float]) -> str:
driver = self.strategy.pick(self.drivers, location)
if not driver:
raise RuntimeError("No drivers available")
driver.available = False
self.assignments[rider_id] = driver.driver_id
return driver.driver_id
def complete_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id)
self.drivers[driver_id].available = True
def cancel_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id, None)
if driver_id:
self.drivers[driver_id].available = True
class ZoneLockManager:
def __init__(self) -> None:
self._locks: Dict[str, threading.RLock] = defaultdict(threading.RLock)
def lock(self, zone: str) -> threading.RLock:
return self._locks[zone]
class SurgeManager:
def __init__(self, base_multiplier: float = 1.0, step: float = 0.2) -> None:
self.base_multiplier = base_multiplier
self.step = step
self.multipliers: Dict[str, float] = defaultdict(lambda: self.base_multiplier)
self.lock = threading.Lock()
def bump(self, zone: str) -> float:
with self.lock:
self.multipliers[zone] += self.step
return round(self.multipliers[zone], 2)
def relax(self, zone: str) -> float:
with self.lock:
self.multipliers[zone] = max(self.base_multiplier, self.multipliers[zone] - self.step)
return round(self.multipliers[zone], 2)
def current(self, zone: str) -> float:
with self.lock:
return round(self.multipliers[zone], 2)
class ConcurrentRideSharingService(RideSharingService):
def __init__(self, strategy: MatchStrategy, zone_locks: ZoneLockManager, surge: SurgeManager):
super().__init__(strategy)
self.zone_locks = zone_locks
self.surge = surge
self.assignment_zones: Dict[str, str] = {}
def request_ride(self, rider_id: str, location: Tuple[float, float], zone: str) -> Tuple[str, float]:
lock = self.zone_locks.lock(zone)
with lock:
driver_id = super().request_ride(rider_id, location)
self.assignment_zones[rider_id] = zone
surge_multiplier = self.surge.bump(zone)
return driver_id, surge_multiplier
def complete_ride(self, rider_id: str) -> None:
zone = self.assignment_zones.pop(rider_id, "default")
super().complete_ride(rider_id)
self.surge.relax(zone)
def cancel_ride(self, rider_id: str) -> None:
zone = self.assignment_zones.pop(rider_id, None)
super().cancel_ride(rider_id)
if zone:
self.surge.relax(zone)
class AvailabilityCache:
def __init__(self) -> None:
self.store: Dict[str, Driver] = {}
self.lock = threading.Lock()
def upsert(self, driver: Driver) -> None:
with self.lock:
self.store[driver.driver_id] = Driver(driver.driver_id, driver.location, driver.available, driver.zone)
def reserve_by_id(self, driver_id: str) -> None:
with self.lock:
if driver_id in self.store:
self.store[driver_id].available = False
def reserve_best(self, zone: str, rider_loc: Tuple[float, float]) -> Optional[Driver]:
with self.lock:
best_id = None
best_distance = float("inf")
rx, ry = rider_loc
for driver in self.store.values():
if driver.zone != zone or not driver.available:
continue
dx, dy = driver.location
dist = math.hypot(rx - dx, ry - dy)
if dist < best_distance:
best_distance, best_id = dist, driver.driver_id
if best_id is None:
return None
driver = self.store[best_id]
driver.available = False
return Driver(driver.driver_id, driver.location, False, driver.zone)
def release(self, driver_id: str) -> None:
with self.lock:
if driver_id in self.store:
self.store[driver_id].available = True
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def allow(self) -> bool:
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "HALF_OPEN"
return True
return False
return True
def record_failure(self) -> None:
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
self.failure_count = 0
def record_success(self) -> None:
with self.lock:
self.failure_count = 0
self.state = "CLOSED"
class MobilityGateway:
def __init__(self, failure_rate: float = 0.3) -> None:
self.failure_rate = failure_rate
def reserve(self, zone: str) -> None:
if random.random() < self.failure_rate:
raise RuntimeError(f"Mobility gateway timeout for zone {zone}")
class ResilientRideOrchestrator:
def __init__(self,
service: ConcurrentRideSharingService,
primary_cache: AvailabilityCache,
fallback_cache: AvailabilityCache,
breaker: CircuitBreaker,
gateway: MobilityGateway):
self.service = service
self.primary_cache = primary_cache
self.fallback_cache = fallback_cache
self.breaker = breaker
self.gateway = gateway
self.assignments: Dict[str, Tuple[str, str]] = {}
self.metrics_lock = threading.Lock()
self.metrics = {"primary": 0, "fallback": 0, "rejected": 0}
self.dead_letters: Deque[str] = deque()
def register_driver(self, driver: Driver, *, primary: bool = True) -> None:
if primary:
self.service.add_driver(driver)
self.primary_cache.upsert(driver)
self.fallback_cache.upsert(driver)
def request_ride(self, rider_id: str, location: Tuple[float, float], zone: str) -> Tuple[str, float, str]:
if self.breaker.allow():
try:
self.gateway.reserve(zone)
driver_id, surge = self.service.request_ride(rider_id, location, zone)
self.primary_cache.reserve_by_id(driver_id)
self.breaker.record_success()
with self.metrics_lock:
self.metrics["primary"] += 1
self.assignments[rider_id] = ("primary", driver_id, zone)
return driver_id, surge, "primary"
except Exception:
self.breaker.record_failure()
self.service.cancel_ride(rider_id)
fallback_driver = self.fallback_cache.reserve_best(zone, location)
if fallback_driver:
with self.metrics_lock:
self.metrics["fallback"] += 1
self.assignments[rider_id] = ("fallback", fallback_driver.driver_id, zone)
return fallback_driver.driver_id, self.service.surge.current(zone), "fallback"
with self.metrics_lock:
self.metrics["rejected"] += 1
self.dead_letters.append(rider_id)
raise RuntimeError("No drivers available in any pool")
def complete_ride(self, rider_id: str) -> None:
source, driver_id, zone = self.assignments.pop(rider_id)
if source == "primary":
self.service.complete_ride(rider_id)
self.primary_cache.release(driver_id)
else:
self.fallback_cache.release(driver_id)
self.service.surge.relax(zone)
def cancel_ride(self, rider_id: str) -> None:
record = self.assignments.pop(rider_id, None)
if not record:
return
source, driver_id, zone = record
if source == "primary":
self.service.cancel_ride(rider_id)
self.primary_cache.release(driver_id)
else:
self.fallback_cache.release(driver_id)
self.service.surge.relax(zone)
def snapshot(self) -> Dict[str, int]:
with self.metrics_lock:
return dict(self.metrics)
def main() -> None:
random.seed(7)
zone_locks = ZoneLockManager()
surge = SurgeManager()
service = ConcurrentRideSharingService(NearestMatch(), zone_locks, surge)
primary_cache = AvailabilityCache()
fallback_cache = AvailabilityCache()
breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=0.4)
gateway = MobilityGateway(failure_rate=0.35)
orchestrator = ResilientRideOrchestrator(service, primary_cache, fallback_cache, breaker, gateway)
orchestrator.register_driver(Driver("D1", (12.9, 77.6), zone="central"))
orchestrator.register_driver(Driver("D2", (12.91, 77.61), zone="central"))
orchestrator.register_driver(Driver("FD1", (12.88, 77.58), zone="central"), primary=False)
def rider_task(rider_id: str) -> None:
try:
driver_id, surge_multiplier, source = orchestrator.request_ride(rider_id, (12.9, 77.6), "central")
print(f"{rider_id} → {driver_id} via {source} at {surge_multiplier}x")
time.sleep(random.uniform(0.05, 0.15))
orchestrator.complete_ride(rider_id)
except RuntimeError as exc:
print(f"{rider_id} failed: {exc}")
threads = [threading.Thread(target=rider_task, args=(f"R{i}",)) for i in range(1, 7)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Metrics:", orchestrator.snapshot())
if orchestrator.dead_letters:
print("Dead letters:", list(orchestrator.dead_letters))
if __name__ == "__main__":
main()
Progressively build the notification platform from synchronous fan-out to prioritized asynchronous delivery and finally to a resilient multi-provider gateway.
Notification Gateway ├─ Level 1: Publisher → Channel Observers ├─ Level 2: Queue → Worker Pool → Dead Letters └─ Level 3: Provider Bulkheads → Circuit Breakers
Level 1 — Event Fan-out
The foundational notification service demonstrates how to keep a publisher ignorant of actual delivery rules by leaning on the observer pattern. We register strategies for email, SMS, or push that translate a common message contract into channel-specific payloads while tracking delivery state. By running through happy-path flows developers learn to reason about fan-out without premature concerns about throughput.
from typing import Protocol, List
class ChannelStrategy(Protocol):
def send(self, recipient: str, message: str) -> None:
...
class EmailStrategy:
def send(self, recipient: str, message: str) -> None:
print(f"EMAIL->{recipient}: {message}")
class SmsStrategy:
def send(self, recipient: str, message: str) -> None:
print(f"SMS->{recipient}: {message}")
class NotificationService:
def __init__(self, strategies: List[ChannelStrategy]):
self.strategies = strategies
def notify(self, recipient: str, message: str) -> None:
for strategy in self.strategies:
strategy.send(recipient, message)
if __name__ == "__main__":
service = NotificationService([EmailStrategy(), SmsStrategy()])
service.notify('user@example.com', 'Welcome to the platform!')
Level 2 — Asynchronous Delivery Workers
At Level 2 we surface the realities of bursty traffic, so a queue absorbs inbound notifications and a worker pool pulls tasks to deliver them asynchronously. Retries with exponential backoff and poison message handling illustrate how to preserve ordering and durability when downstream channels misbehave. We keep instrumentation close to enqueue/dequeue points which makes debugging and capacity planning part of the exercise.
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from queue import Queue
from typing import List, Optional, Protocol
class ChannelStrategy(Protocol):
def send(self, recipient: str, message: str) -> None:
...
class EmailStrategy:
def send(self, recipient: str, message: str) -> None:
print(f"EMAIL->{recipient}: {message}")
class SmsStrategy:
def send(self, recipient: str, message: str) -> None:
print(f"SMS->{recipient}: {message}")
class NotificationService:
def __init__(self, strategies: List[ChannelStrategy]):
self.strategies = strategies
def notify(self, recipient: str, message: str) -> None:
for strategy in self.strategies:
strategy.send(recipient, message)
@dataclass
class NotificationJob:
recipient: str
message: str
attempt: int = 0
class AsyncNotificationGateway:
def __init__(self, service: NotificationService, max_workers: int = 3, max_retries: int = 2):
self.service = service
self.max_retries = max_retries
self.queue: Queue[NotificationJob] = Queue()
self.dead_letters: List[NotificationJob] = []
self.running = True
self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(max_workers)]
for worker in self.workers:
worker.start()
def enqueue(self, recipient: str, message: str) -> None:
self.queue.put(NotificationJob(recipient, message))
def _dispatch(self, job: NotificationJob) -> None:
if "FAIL" in job.message:
raise RuntimeError("channel failure")
self.service.notify(job.recipient, job.message)
def _worker(self) -> None:
while self.running:
try:
job = self.queue.get(timeout=0.5)
except Empty:
continue
try:
self._dispatch(job)
except Exception as exc:
job.attempt += 1
if job.attempt <= self.max_retries:
print(f"Retry {job.attempt} for {job.recipient}: {exc}")
time.sleep(0.1 * job.attempt)
self.queue.put(job)
else:
print(f"Dead-lettering {job.recipient}: {job.message}")
self.dead_letters.append(job)
finally:
self.queue.task_done()
def shutdown(self) -> None:
self.running = False
for worker in self.workers:
worker.join(timeout=0.2)
def main() -> None:
base_service = NotificationService([EmailStrategy(), SmsStrategy()])
gateway = AsyncNotificationGateway(base_service, max_workers=2, max_retries=2)
gateway.enqueue("user1", "Welcome!")
gateway.enqueue("user2", "FAIL-SEND")
gateway.enqueue("user3", "Daily digest")
time.sleep(1.5)
gateway.shutdown()
print("Dead letters:", [(job.recipient, job.message) for job in gateway.dead_letters])
if __name__ == "__main__":
main()
Level 3 — Multi-Provider Resiliency
Level 3 assumes providers fail independently, so we introduce provider pools isolated by bulkhead limits and guarded by circuit breakers. Fallback routing policies decide when to drain to secondary vendors, and metrics feed dashboards that surface saturation early. The walkthrough shows how to combine structural patterns with runtime safeguards instead of bolting resilience on afterwards.
import threading
import time
import random
from collections import deque
from typing import Callable, Protocol
class Provider(Protocol):
def send(self, recipient: str, message: str) -> None:
...
class PrimaryProvider:
def __init__(self):
self.counter = 0
def send(self, recipient: str, message: str) -> None:
self.counter += 1
if self.counter % 2 == 0:
raise RuntimeError('Primary provider outage')
print(f"PRIMARY->{recipient}: {message}")
class BackupProvider:
def send(self, recipient: str, message: str) -> None:
print(f"BACKUP->{recipient}: {message}")
class Bulkhead:
def __init__(self, provider: Provider, capacity: int):
self.provider = provider
self.semaphore = threading.Semaphore(capacity)
def execute(self, recipient: str, message: str) -> None:
with self.semaphore:
self.provider.send(recipient, message)
class Breaker:
def __init__(self, failure_threshold: int, recovery_time: float):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.failures = 0
self.last_failure = 0.0
self.state = 'CLOSED'
def invoke(self, func: Callable[[], None]) -> None:
now = time.time()
if self.state == 'OPEN' and now - self.last_failure < self.recovery_time:
raise RuntimeError('Breaker open')
try:
func()
self.failures = 0
self.state = 'CLOSED'
except Exception as exc:
self.failures += 1
self.last_failure = now
if self.failures >= self.failure_threshold:
self.state = 'OPEN'
raise exc
class ResilientGateway:
def __init__(self):
self.primary = Bulkhead(PrimaryProvider(), capacity=2)
self.backup = Bulkhead(BackupProvider(), capacity=4)
self.breaker = Breaker(2, 2.0)
def send(self, recipient: str, message: str) -> None:
try:
self.breaker.invoke(lambda: self.primary.execute(recipient, message))
except Exception:
print('Primary failed, using backup…')
self.backup.execute(recipient, message)
if __name__ == "__main__":
gateway = ResilientGateway()
for idx in range(1, 7):
gateway.send(f'user{idx}', 'Security alert')
time.sleep(0.4)
Progressively enhance the rate limiting system from an in-process token bucket to a distributed sliding window and finally a resilient API gateway that degrades gracefully under failure.
Rate Limiter Stack\n ├─ Level 1: Token Bucket Evaluator\n ├─ Level 2: Distributed Sliding Window + Coordination\n └─ Level 3: Gateway Orchestrator with Circuit Breaker & Fallback
Level 1 — Core Token Bucket
We start the rate limiter series by implementing the classic token bucket and instrumenting it so bursts can be visualised. The design emphasises separating the clock, storage, and policy which makes it easier to validate behaviour with deterministic tests. By replaying the provided sample workload learners can watch how refills, dequeues, and denials interplay over time.
import threading
import time
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self) -> None:
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = int(elapsed * self.refill_rate)
if tokens_to_add > 0:
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def try_consume(self) -> bool:
with self.lock:
self._refill()
if self.tokens > 0:
self.tokens -= 1
return True
return False
class RateLimiter:
def __init__(self, bucket: TokenBucket):
self.bucket = bucket
def allow(self) -> bool:
return self.bucket.try_consume()
if __name__ == "__main__":
limiter = RateLimiter(TokenBucket(capacity=5, refill_rate=2))
for _ in range(10):
print("Allowed" if limiter.allow() else "Throttled")
time.sleep(0.3)
Level 2 — Distributed Sliding Window
Level 2 moves from single-threaded arithmetic to shared state by spreading counters across threads and synchronising with locks. We model the rolling window as timestamped buckets so we can evict old events quickly and avoid unbounded memory growth. The sample harness demonstrates how to reason about fairness when multiple application servers rely on the same limiter view.
from __future__ import annotations
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Deque, Dict
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self) -> None:
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = int(elapsed * self.refill_rate)
if tokens_to_add > 0:
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def try_consume(self) -> bool:
with self.lock:
self._refill()
if self.tokens > 0:
self.tokens -= 1
return True
return False
class RateLimiter:
def __init__(self, name: str, bucket: TokenBucket):
self.name = name
self.bucket = bucket
def allow(self) -> bool:
return self.bucket.try_consume()
@dataclass
class WindowState:
events: Deque[float]
class SlidingWindowCoordinator:
def __init__(self, window_seconds: float, max_requests: int, replicas: Dict[str, RateLimiter]):
self.window_seconds = window_seconds
self.max_requests = max_requests
self.replicas = replicas
self.window: Dict[str, WindowState] = defaultdict(lambda: WindowState(deque()))
self.window_lock = threading.Lock()
def allow(self, tenant: str) -> bool:
now = time.time()
with self.window_lock:
state = self.window[tenant]
while state.events and now - state.events[0] > self.window_seconds:
state.events.popleft()
if len(state.events) >= self.max_requests:
return False
limiter = self.replicas[tenant]
if not limiter.allow():
return False
with self.window_lock:
state = self.window[tenant]
state.events.append(now)
return True
def release(self, tenant: str) -> None:
limiter = self.replicas[tenant]
with self.window_lock:
state = self.window[tenant]
if state.events:
state.events.pop()
with limiter.bucket.lock:
limiter.bucket.tokens = min(limiter.bucket.capacity, limiter.bucket.tokens + 1)
def main() -> None:
replicas = {
"tenant-a": RateLimiter("tenant-a", TokenBucket(capacity=5, refill_rate=3)),
"tenant-b": RateLimiter("tenant-b", TokenBucket(capacity=3, refill_rate=2)),
}
coordinator = SlidingWindowCoordinator(window_seconds=1.0, max_requests=4, replicas=replicas)
decisions: list[tuple[str, bool]] = []
def fire(tenant: str, idx: int) -> None:
allowed = coordinator.allow(tenant)
decisions.append((f"{tenant}-{idx}", allowed))
threads = [
threading.Thread(target=fire, args=("tenant-a", i))
for i in range(8)
] + [
threading.Thread(target=fire, args=("tenant-b", i))
for i in range(6)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for decision in sorted(decisions):
print(decision)
if __name__ == "__main__":
main()
Level 3 — Resilient Gateway
The advanced rate limiter treats downstream failures as a signal, blending fixed quotas with adaptive throttling that reacts to error rates. We integrate a circuit breaker that can trip when a dependency degrades, and the limiter publishes state changes for observability. By stepping through the logs you can see how protecting a partner involves both controlling volume and surfacing intent to operators.
from __future__ import annotations
import random
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, Dict, Optional
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self) -> None:
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = int(elapsed * self.refill_rate)
if tokens_to_add > 0:
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def try_consume(self) -> bool:
with self.lock:
self._refill()
if self.tokens > 0:
self.tokens -= 1
return True
return False
class RateLimiter:
def __init__(self, name: str, bucket: TokenBucket):
self.name = name
self.bucket = bucket
def allow(self) -> bool:
return self.bucket.try_consume()
@dataclass
class WindowState:
events: Deque[float]
class SlidingWindowCoordinator:
def __init__(self, window_seconds: float, max_requests: int, replicas: Dict[str, RateLimiter]):
self.window_seconds = window_seconds
self.max_requests = max_requests
self.replicas = replicas
self.window: Dict[str, WindowState] = defaultdict(lambda: WindowState(deque()))
self.window_lock = threading.Lock()
def allow(self, tenant: str) -> bool:
now = time.time()
with self.window_lock:
state = self.window[tenant]
while state.events and now - state.events[0] > self.window_seconds:
state.events.popleft()
if len(state.events) >= self.max_requests:
return False
limiter = self.replicas[tenant]
if not limiter.allow():
return False
with self.window_lock:
self.window[tenant].events.append(now)
return True
def release(self, tenant: str) -> None:
limiter = self.replicas[tenant]
with self.window_lock:
state = self.window[tenant]
if state.events:
state.events.pop()
with limiter.bucket.lock:
limiter.bucket.tokens = min(limiter.bucket.capacity, limiter.bucket.tokens + 1)
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func: Callable[[], str]) -> str:
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("Downstream circuit open")
try:
result = func()
except Exception:
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
self.failure_count = 0
raise
else:
with self.lock:
self.failure_count = 0
self.state = "CLOSED"
return result
class DownstreamService:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def call(self, payload: str) -> str:
if random.random() < self.failure_rate:
raise RuntimeError("backend timeout")
return f"200 OK for {payload}"
class ProtectiveGateway:
def __init__(self, coordinator: SlidingWindowCoordinator, breaker: CircuitBreaker, service: DownstreamService):
self.coordinator = coordinator
self.breaker = breaker
self.service = service
self.metrics_lock = threading.Lock()
self.metrics = {"throttled": 0, "success": 0, "fallback": 0}
def handle_request(self, tenant: str, payload: str) -> str:
if not self.coordinator.allow(tenant):
with self.metrics_lock:
self.metrics["throttled"] += 1
return f"{tenant}/{payload}: THROTTLED"
try:
response = self.breaker.call(lambda: self.service.call(payload))
with self.metrics_lock:
self.metrics["success"] += 1
return f"{tenant}/{payload}: {response}"
except Exception as exc:
self.coordinator.release(tenant)
with self.metrics_lock:
self.metrics["fallback"] += 1
return f"{tenant}/{payload}: FALLBACK ({exc})"
def snapshot(self) -> Dict[str, int]:
with self.metrics_lock:
return dict(self.metrics)
def main() -> None:
random.seed(17)
replicas = {
"tenant-a": RateLimiter("tenant-a", TokenBucket(capacity=6, refill_rate=4)),
"tenant-b": RateLimiter("tenant-b", TokenBucket(capacity=4, refill_rate=3)),
}
coordinator = SlidingWindowCoordinator(window_seconds=1.5, max_requests=5, replicas=replicas)
gateway = ProtectiveGateway(coordinator, CircuitBreaker(failure_threshold=3, recovery_timeout=0.8), DownstreamService(failure_rate=0.45))
for idx in range(12):
tenant = "tenant-a" if idx % 3 else "tenant-b"
result = gateway.handle_request(tenant, f"req-{idx}")
print(result)
time.sleep(0.2)
print("Gateway metrics:", gateway.snapshot())
if __name__ == "__main__":
main()
Incrementally build the URL shortener from a single-node service to a concurrent cache-backed implementation and finally a resilient multi-region deployment.
URL Shortener Stack ├─ Level 1: Encoder Strategy → Repository ├─ Level 2: Thread-Safe Cache + Persistence └─ Level 3: Multi-Region Replication & Resiliency
Level 1 — Core Service
The baseline shortener focuses on clean domain boundaries: a code generator, a repository, and a service that enforces idempotent lookups. We track hit counts and present a tiny CLI so you can exercise the flow end to end and explore edge cases such as duplicate URLs. Attention is paid to deterministic code generation so later replication scenarios have something solid to build on.
from __future__ import annotations
import hashlib
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Optional
import threading
@dataclass
class UrlRecord:
short_code: str
long_url: str
created_at: float
hits: int = 0
class CodeEncoder(ABC):
@abstractmethod
def encode(self, long_url: str) -> str:
...
class Md5Encoder(CodeEncoder):
def encode(self, long_url: str) -> str:
digest = hashlib.md5(long_url.encode("utf-8")).hexdigest()
return digest[:7]
class UrlRepository:
def __init__(self) -> None:
self._records: Dict[str, UrlRecord] = {}
self._lock = threading.RLock()
def save(self, record: UrlRecord) -> None:
with self._lock:
self._records[record.short_code] = record
def get(self, short_code: str) -> Optional[UrlRecord]:
with self._lock:
return self._records.get(short_code)
class UrlShortener:
def __init__(self, encoder: CodeEncoder, repository: UrlRepository) -> None:
self.encoder = encoder
self.repository = repository
def shorten(self, long_url: str) -> str:
code = self.encoder.encode(long_url)
if not self.repository.get(code):
self.repository.save(UrlRecord(code, long_url, time.time()))
return code
def resolve(self, code: str) -> str:
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
record.hits += 1
return record.long_url
def stats(self, code: str) -> UrlRecord:
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
return record
def main() -> None:
repo = UrlRepository()
service = UrlShortener(Md5Encoder(), repo)
code = service.shorten("https://example.com/docs")
print("Short code:", code)
print("Redirect to:", service.resolve(code))
print("Stats:", service.stats(code))
if __name__ == "__main__":
main()
Level 2 — Concurrent Cache
Level 2 introduces shared mutable state by letting several worker threads shorten URLs simultaneously while relying on a write-through cache. We guard the repository with locks, batch persistence to avoid thrashing disks, and expose metrics that prove the snapshot remains consistent. Running the demo shows how caching can boost reads yet still tolerate bursts of writes when structured carefully.
from __future__ import annotations
import threading
from collections import OrderedDict
from typing import Dict, Optional
import hashlib
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class UrlRecord:
short_code: str
long_url: str
created_at: float
hits: int = 0
class CodeEncoder(ABC):
@abstractmethod
def encode(self, long_url: str) -> str:
...
class Md5Encoder(CodeEncoder):
def encode(self, long_url: str) -> str:
digest = hashlib.md5(long_url.encode("utf-8")).hexdigest()
return digest[:7]
class UrlRepository:
def __init__(self) -> None:
self._records: Dict[str, UrlRecord] = {}
self._lock = threading.RLock()
def save(self, record: UrlRecord) -> None:
with self._lock:
self._records[record.short_code] = record
def get(self, short_code: str) -> Optional[UrlRecord]:
with self._lock:
return self._records.get(short_code)
def snapshot(self) -> Dict[str, UrlRecord]:
with self._lock:
return {code: record for code, record in self._records.items()}
class UrlShortener:
def __init__(self, encoder: CodeEncoder, repository: UrlRepository) -> None:
self.encoder = encoder
self.repository = repository
def shorten(self, long_url: str) -> str:
code = self.encoder.encode(long_url)
if not self.repository.get(code):
self.repository.save(UrlRecord(code, long_url, time.time()))
return code
def resolve(self, code: str) -> str:
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
record.hits += 1
return record.long_url
def stats(self, code: str) -> UrlRecord:
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
return record
class ThreadSafeLRUCache:
def __init__(self, capacity: int) -> None:
self.capacity = capacity
self._data: OrderedDict[str, UrlRecord] = OrderedDict()
self._lock = threading.RLock()
def get(self, key: str) -> Optional[UrlRecord]:
with self._lock:
record = self._data.get(key)
if record:
self._data.move_to_end(key)
return record
def put(self, key: str, record: UrlRecord) -> None:
with self._lock:
self._data[key] = record
self._data.move_to_end(key)
if len(self._data) > self.capacity:
self._data.popitem(last=False)
class CachedUrlShortener(UrlShortener):
def __init__(self, encoder: CodeEncoder, repository: UrlRepository, cache_capacity: int = 256) -> None:
super().__init__(encoder, repository)
self.cache = ThreadSafeLRUCache(cache_capacity)
self._locks: Dict[str, threading.Lock] = {}
self._locks_guard = threading.Lock()
def _lock_for(self, key: str) -> threading.Lock:
with self._locks_guard:
return self._locks.setdefault(key, threading.Lock())
def shorten(self, long_url: str) -> str:
code = super().shorten(long_url)
record = self.repository.get(code)
if record:
self.cache.put(code, record)
return code
def resolve(self, code: str) -> str:
lock = self._lock_for(code)
with lock:
cached = self.cache.get(code)
if cached:
cached.hits += 1
return cached.long_url
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
record.hits += 1
self.cache.put(code, record)
return record.long_url
def worker(shortener: CachedUrlShortener, url: str) -> None:
code = shortener.shorten(url)
resolved = shortener.resolve(code)
print(f"{url} -> {code} -> {resolved}")
def main() -> None:
repo = UrlRepository()
shortener = CachedUrlShortener(Md5Encoder(), repo, cache_capacity=4)
urls = [f"https://service.local/resource/{i}" for i in range(6)]
threads = [threading.Thread(target=worker, args=(shortener, url)) for url in urls]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Repository snapshot:")
for code, record in repo.snapshot().items():
print(code, "->", record.long_url, "hits:", record.hits)
if __name__ == "__main__":
main()
Level 3 — Resilient Multi-Region
The advanced level assumes the canonical store can fail, so the service now fronts two repositories and monitors the primary with circuit breakers. When the main store is unhealthy we serve reads from a warm replica and append intents to a replication log that can replay once the primary recovers. Learners get a feel for eventual consistency trade-offs by comparing timestamps of writes versus replay confirmations.
from __future__ import annotations
import random
import threading
import time
from queue import Queue
from typing import Optional
class UnstableRepository(UrlRepository):
def __init__(self, failure_rate: float = 0.35) -> None:
super().__init__()
self.failure_rate = failure_rate
def _maybe_fail(self) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("primary-region-unavailable")
def save(self, record: UrlRecord) -> None:
self._maybe_fail()
super().save(record)
def get(self, short_code: str) -> Optional[UrlRecord]:
self._maybe_fail()
return super().get(short_code)
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self.lock:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
self.failures = 0
raise
else:
with self.lock:
self.failures = 0
self.state = "CLOSED"
return result
class MultiRegionShortener:
def __init__(self, primary_repo: UnstableRepository, secondary_repo: UrlRepository) -> None:
self.primary = CachedUrlShortener(Md5Encoder(), primary_repo, cache_capacity=256)
self.secondary = CachedUrlShortener(Md5Encoder(), secondary_repo, cache_capacity=512)
self.breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1.0)
self.replay_queue: Queue[UrlRecord] = Queue()
threading.Thread(target=self._replay_loop, daemon=True).start()
def _replay_loop(self) -> None:
while True:
record = self.replay_queue.get()
try:
self.breaker.call(self.primary.repository.clone, record)
print("Replayed to primary:", record.short_code)
except Exception:
time.sleep(0.4)
self.replay_queue.put(record)
def shorten(self, url: str) -> str:
code = self.secondary.shorten(url)
record = self.secondary.repository.get(code)
if not record:
raise RuntimeError("unexpected-missing-record")
try:
self.breaker.call(self.primary.repository.clone, record)
except Exception:
print("Primary write failed, queueing for replay:", code)
self.replay_queue.put(record)
return code
def resolve(self, code: str) -> str:
try:
return self.breaker.call(self.primary.resolve, code)
except Exception as exc:
print("Primary resolve failed:", exc, "- using secondary")
return self.secondary.resolve(code)
def main() -> None:
random.seed(21)
service = MultiRegionShortener(UnstableRepository(0.4), UrlRepository())
urls = [f"https://docs.product/{i}" for i in range(6)]
codes = [service.shorten(url) for url in urls]
for code in codes:
print("Resolve", code, "->", service.resolve(code))
time.sleep(0.2)
if __name__ == "__main__":
main()
Grow the ticketing service from basic seat allocation to concurrent hold handling and finally a resilient saga-based workflow.
Ticketing Stack ├─ Level 1: Seat Selector → Repository ├─ Level 2: Hold Manager + Concurrent Workers └─ Level 3: Saga Orchestrator with Payment Integration
Level 1 — Core Seat Booking
The introductory ticketing exercise models shows, seats, and reservations explicitly so learners can reason about invariants like availability. We keep allocation logic in a dedicated service that selects the best contiguous seats and records reservations atomically. Walking through the sample I/O highlights how to maintain audit trails for bookings even before concurrency is introduced.
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Optional
import threading
@dataclass
class Seat:
seat_id: str
category: str
is_booked: bool = False
class SeatRepository:
def __init__(self) -> None:
self._shows: Dict[str, Dict[str, Seat]] = {}
self._lock = threading.RLock()
def add_show(self, show_id: str, seats: List[Seat]) -> None:
with self._lock:
self._shows[show_id] = {seat.seat_id: seat for seat in seats}
def list_available(self, show_id: str, category: Optional[str] = None) -> List[Seat]:
with self._lock:
seats = self._shows.get(show_id, {})
return [
seat for seat in seats.values()
if not seat.is_booked and (category is None or seat.category == category)
]
def mark_booked(self, show_id: str, seat_ids: List[str]) -> None:
with self._lock:
seats = self._shows.get(show_id)
if not seats:
raise KeyError(f"Unknown show {show_id}")
for seat_id in seat_ids:
seat = seats.get(seat_id)
if not seat:
raise KeyError(f"Unknown seat {seat_id}")
if seat.is_booked:
raise RuntimeError(f"Seat {seat_id} already booked")
seat.is_booked = True
class SeatSelector(ABC):
@abstractmethod
def select(self, seats: List[Seat], quantity: int) -> List[Seat]:
...
class BestAvailableSelector(SeatSelector):
def select(self, seats: List[Seat], quantity: int) -> List[Seat]:
seats_sorted = sorted(seats, key=lambda seat: seat.seat_id)
return seats_sorted[:quantity]
class BookingService:
def __init__(self, repository: SeatRepository, selector: SeatSelector) -> None:
self.repository = repository
self.selector = selector
def book(self, show_id: str, category: str, quantity: int) -> List[str]:
available = self.repository.list_available(show_id, category)
chosen = self.selector.select(available, quantity)
if len(chosen) < quantity:
raise RuntimeError("Insufficient seats")
seat_ids = [seat.seat_id for seat in chosen]
self.repository.mark_booked(show_id, seat_ids)
return seat_ids
def main() -> None:
repo = SeatRepository()
repo.add_show("S1", [
Seat("P1", "PREMIUM"),
Seat("P2", "PREMIUM"),
Seat("P3", "PREMIUM"),
Seat("G1", "GOLD"),
Seat("S1", "SILVER"),
])
service = BookingService(repo, BestAvailableSelector())
booked = service.book("S1", "PREMIUM", 2)
print("Booked seats:", booked)
print("Remaining premium seats:", [seat.seat_id for seat in repo.list_available("S1", "PREMIUM")])
if __name__ == "__main__":
main()
Level 2 — Concurrent Seat Holds
Level 2 expects multiple agents to compete for seats, so we add seat holds with expiry timers and guard mutations with locks. The design uses condition variables to wake waiting threads when a hold lapses, proving all paths eventually progress without deadlocks. Instrumentation around holds, confirmations, and releases aids in debugging race conditions during practice.
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class SeatHold:
show_id: str
seat_ids: List[str]
customer_id: str
expires_at: float
class SeatHoldManager:
def __init__(self, repository: SeatRepository, hold_seconds: float = 3.0) -> None:
self.repository = repository
self.hold_seconds = hold_seconds
self._holds: Dict[tuple[str, str], SeatHold] = {}
self._lock = threading.RLock()
self._sweeper = threading.Thread(target=self._sweep_loop, daemon=True)
self._sweeper.start()
def _sweep_loop(self) -> None:
while True:
time.sleep(self.hold_seconds / 2)
self._purge_expired()
def _purge_expired(self) -> None:
now = time.time()
with self._lock:
expired = [key for key, hold in self._holds.items() if hold.expires_at <= now]
for key in expired:
del self._holds[key]
def available(self, show_id: str, category: str) -> List[Seat]:
self._purge_expired()
with self._lock:
held = {
seat_id for (s_id, seat_id), hold in self._holds.items()
if s_id == show_id and hold.expires_at > time.time()
}
return [
seat for seat in self.repository.list_available(show_id, category)
if seat.seat_id not in held
]
def hold(self, show_id: str, seat_id: str, customer_id: str) -> bool:
now = time.time()
expires_at = now + self.hold_seconds
key = (show_id, seat_id)
with self._lock:
self._purge_expired()
existing = self._holds.get(key)
if existing and existing.expires_at > now:
return False
self._holds[key] = SeatHold(show_id, [seat_id], customer_id, expires_at)
return True
def release(self, show_id: str, seat_ids: List[str]) -> None:
with self._lock:
for seat_id in seat_ids:
self._holds.pop((show_id, seat_id), None)
class ConcurrentBookingService:
def __init__(self, repository: SeatRepository, holds: SeatHoldManager, selector: SeatSelector) -> None:
self.repository = repository
self.holds = holds
self.selector = selector
def hold(self, show_id: str, customer_id: str, category: str, quantity: int) -> SeatHold:
acquired: List[str] = []
for seat in self.selector.select(self.holds.available(show_id, category), quantity):
if self.holds.hold(show_id, seat.seat_id, customer_id):
acquired.append(seat.seat_id)
if len(acquired) == quantity:
break
if len(acquired) < quantity:
if acquired:
self.holds.release(show_id, acquired)
raise RuntimeError("Insufficient seats")
return SeatHold(show_id, acquired, customer_id, time.time() + self.holds.hold_seconds)
def confirm(self, hold: SeatHold) -> List[str]:
self.repository.mark_booked(hold.show_id, hold.seat_ids)
self.holds.release(hold.show_id, hold.seat_ids)
return hold.seat_ids
def release(self, hold: SeatHold) -> None:
self.holds.release(hold.show_id, hold.seat_ids)
def book(self, show_id: str, customer_id: str, category: str, quantity: int) -> List[str]:
hold = self.hold(show_id, customer_id, category, quantity)
try:
return self.confirm(hold)
except Exception:
self.release(hold)
raise
def worker(service: ConcurrentBookingService, customer: str, qty: int) -> None:
try:
seats = service.book("S1", customer, "PREMIUM", qty)
print(customer, "got", seats)
except Exception as exc:
print(customer, "failed:", exc)
def main() -> None:
repo = SeatRepository()
repo.add_show("S1", [Seat(f"P{i}", "PREMIUM") for i in range(1, 7)])
holds = SeatHoldManager(repo, hold_seconds=1.5)
service = ConcurrentBookingService(repo, holds, BestAvailableSelector())
threads = [threading.Thread(target=worker, args=(service, f"C{i}", 2)) for i in range(1, 5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Booking Saga
The advanced level layers a saga that coordinates payment, inventory, and notification steps, persisting every action so restarts are safe. Compensation logic releases seats and notifies a waitlist whenever downstream calls fail, demonstrating how to restore invariants without manual cleanup. Learners gain confidence by replaying the workflow logs and seeing how each step records both intent and outcome.
from __future__ import annotations
import random
import time
import threading
from dataclasses import dataclass, field
from typing import List
class PaymentGateway:
def __init__(self, failure_rate: float = 0.35) -> None:
self.failure_rate = failure_rate
def charge(self, order_id: str, amount: float) -> str:
if random.random() < self.failure_rate:
raise RuntimeError("payment-declined")
return f"rcpt-{order_id}-{int(time.time() * 1000)}"
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("payment-circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self.lock:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
self.failures = 0
raise
else:
with self.lock:
self.failures = 0
self.state = "CLOSED"
return result
@dataclass
class OrderResult:
order_id: str
status: str
seats: List[str] = field(default_factory=list)
class Waitlist:
def __init__(self) -> None:
self._orders: List[str] = []
self._lock = threading.Lock()
def add(self, order_id: str) -> None:
with self._lock:
self._orders.append(order_id)
def snapshot(self) -> List[str]:
with self._lock:
return list(self._orders)
class BookingSaga:
def __init__(self,
ticketing: ConcurrentBookingService,
payment: PaymentGateway,
breaker: CircuitBreaker,
waitlist: Waitlist,
max_payment_attempts: int = 3) -> None:
self.ticketing = ticketing
self.payment = payment
self.breaker = breaker
self.waitlist = waitlist
self.max_payment_attempts = max_payment_attempts
def execute(self, order_id: str, show_id: str, category: str, quantity: int, amount: float) -> OrderResult:
try:
hold = self.ticketing.hold(show_id, order_id, category, quantity)
except RuntimeError as exc:
self.waitlist.add(order_id)
return OrderResult(order_id, f"WAITLISTED ({exc})")
try:
attempt = 0
while attempt < self.max_payment_attempts:
try:
receipt = self.breaker.call(self.payment.charge, order_id, amount)
seats = self.ticketing.confirm(hold)
return OrderResult(order_id, f"CONFIRMED receipt={receipt}", seats)
except Exception as exc:
attempt += 1
if attempt >= self.max_payment_attempts:
raise
backoff = 0.2 * attempt
print(f"{order_id}: retrying payment in {backoff:.2f}s ({exc})")
time.sleep(backoff)
except Exception as exc:
self.ticketing.release(hold)
self.waitlist.add(order_id)
return OrderResult(order_id, f"FAILED payment ({exc})")
def main() -> None:
random.seed(10)
repo = SeatRepository()
repo.add_show("S1", [Seat(f"P{i}", "PREMIUM") for i in range(1, 7)])
holds = SeatHoldManager(repo, hold_seconds=2.0)
ticketing = ConcurrentBookingService(repo, holds, BestAvailableSelector())
saga = BookingSaga(
ticketing=ticketing,
payment=PaymentGateway(failure_rate=0.4),
breaker=CircuitBreaker(failure_threshold=2, recovery_timeout=1.0),
waitlist=Waitlist()
)
orders = [
("O1", "S1", "PREMIUM", 2, 120.0),
("O2", "S1", "PREMIUM", 2, 150.0),
("O3", "S1", "PREMIUM", 2, 110.0),
]
for order_id, show_id, category, qty, amount in orders:
result = saga.execute(order_id, show_id, category, qty, amount)
print(result)
print("Waitlist:", saga.waitlist.snapshot())
if __name__ == "__main__":
main()
Progressively enhance the library platform from core circulation to concurrent reservations and finally a resilient catalog tier with caching.
Library System Stack ├─ Level 1: Circulation Service → Repository ├─ Level 2: Reservation Manager with Locks └─ Level 3: Resilient Catalog & Cache
Level 1 — Core Circulation
We begin the library system by decomposing responsibilities into catalog search strategies, an inventory repository, and a circulation service. Multiple search adapters demonstrate how to swap implementations while keeping the checkout flow stable. By orchestrating due dates and availability updates in one transaction the learner sees how to keep domain rules enforceable.
from __future__ import annotations
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclasstime, timedelta
from typing import Dict, List, Optional, Optional
@dataclass
class Book:
isbn: str
title: str
author: str
copies: int
class CatalogRepository:
def __init__(self) -> None:
self._books: Dict[str, Book] = {}
self._lock = threading.RLock()
def add(self, book: Book) -> None:
with self._lock:
self._books[book.isbn] = book
def get(self, isbn: str) -> Optional[Book]:
with self._lock:
return self._books.get(isbn)
def all_books(self) -> List[Book]:
with self._lock:
return list(self._books.values())
class SearchStrategy(ABC):
@abstractmethod
def matches(self, book: Book, query: str) -> bool:
...
class TitleSearch(SearchStrategy):
def matches(self, book: Book, query: str) -> bool:
return query.lower() in book.title.lower()
class AuthorSearch(SearchStrategy):
def matches(self, book: Book, query: str) -> bool:
return query.lower() in book.author.lower()
@dataclass
class Loan:
isbn: str
patron: str
due_date: datetime
class LibraryService:
def __init__(self, catalog: CatalogRepository) -> None:
self.catalog = catalog
self.loans: Dict[str, Loan] = {}
def search(self, strategy: SearchStrategy, query: str) -> List[Book]:
return [book for book in self.catalog.all_books() if strategy.matches(book, query)]
def checkout(self, isbn: str, patron: str) -> Loan:
book = self.catalog.get(isbn)
if not book or book.copies == 0:
raise RuntimeError("Unavailable")
book.copies -= 1
loan = Loan(isbn, patron, datetime.utcnow() + timedelta(days=14))
self.loans[f"{isbn}:{patron}"] = loan
return loan
def checkin(self, isbn: str, patron: str) -> None:
key = f"{isbn}:{patron}"
loan = self.loans.pop(key, None)
if loan:
book = self.catalog.get(isbn)
if book:
book.copies += 1
def main() -> None:
catalog = CatalogRepository()
catalog.add(Book("9780132350884", "Clean Code", "Robert C. Martin", 2))
catalog.add(Book("9781617296086", "System Design Interview", "Alex Xu", 1))
service = LibraryService(catalog)
results = service.search(TitleSearch(), "clean")
print("Search results:", results)
loan = service.checkout("9780132350884", "patronA")
print("Loan:", loan)
print("Remaining copies:", catalog.get("9780132350884"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Reservations
Level 2 adds contested reservations, guarding each book with its own lock so popular titles do not block the whole catalog. We introduce a waitlist queue plus notification callbacks, illustrating how to recover gracefully when demand outstrips supply. Threaded tests showcase how to observe race conditions and ensure only one borrower owns a copy at any instant.
from __future__ import annotations
import threading
import time
from collections import defaultdict, deque
from typing import Callable, Deque, Dict, Optional
class ConcurrentReservationService:
def __init__(self, library: LibraryService, notifier: Callable[[str, str], None]) -> None:
self.library = library
self.notifier = notifier
self._locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)
self._waitlists: Dict[str, Deque[str]] = defaultdict(deque)
def reserve(self, isbn: str, patron: str) -> Optional[Loan]:
lock = self._locks[isbn]
with lock:
available = self.library.catalog.get(isbn)
if available and available.copies > 0:
loan = self.library.checkout(isbn, patron)
print(f"{patron} checked out {isbn}")
return loan
self._waitlists[isbn].append(patron)
print(f"{patron} waitlisted for {isbn}")
return None
def return_copy(self, loan: Loan) -> None:
lock = self._locks[loan.isbn]
next_patron: Optional[str] = None
with lock:
self.library.checkin(loan.isbn, loan.patron)
if self._waitlists[loan.isbn]:
next_patron = self._waitlists[loan.isbn].popleft()
if next_patron:
self.notifier(loan.isbn, next_patron)
def notifier(isbn: str, patron: str) -> None:
print(f"Notify {patron}: {isbn} is available")
def worker(service: ConcurrentReservationService, isbn: str, patron: str) -> None:
loan = service.reserve(isbn, patron)
if loan:
time.sleep(0.2)
service.return_copy(loan)
def main() -> None:
catalog = CatalogRepository()
catalog.add(Book("9780132350884", "Clean Code", "Robert C. Martin", 1))
catalog.add(Book("9781491950357", "Site Reliability Engineering", "Betsy Beyer", 2))
library = LibraryService(catalog)
service = ConcurrentReservationService(library, notifier)
threads = [
threading.Thread(target=worker, args=(service, "9780132350884", f"user{i}"))
for i in range(3)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Catalog Service
The final level wraps catalog lookups with an in-memory cache, retry budget, and fallback to stale-but-safe responses when the database is down. We log cache hits and retries so operators can see when the system has entered degraded mode. Replaying the sample scenario helps learners appreciate how read-only use cases can remain useful even during partial outages.
from __future__ import annotations
import random
import threading
import time
from typing import Callable, Dict, List, Optional, Tuple
class FlakyCatalogRepository(CatalogRepository):
def __init__(self, failure_rate: float = 0.3) -> None:
super().__init__()
self.failure_rate = failure_rate
def _maybe_fail(self) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("primary-unavailable")
def add(self, book: Book) -> None:
self._maybe_fail()
super().add(book)
def get(self, isbn: str) -> Optional[Book]:
self._maybe_fail()
return super().get(isbn)
def all_books(self) -> List[Book]:
self._maybe_fail()
return super().all_books()
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self.lock:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
self.failures = 0
raise
else:
with self.lock:
self.failures = 0
self.state = "CLOSED"
return result
class ResilientLibraryGateway:
def __init__(self, primary_repo: FlakyCatalogRepository, secondary_repo: CatalogRepository, notifier: Callable[[str, str], None]) -> None:
self.primary_repo = primary_repo
self.secondary_repo = secondary_repo
self.primary_service = LibraryService(self.primary_repo)
self.secondary_service = LibraryService(self.secondary_repo)
self.primary_reservations = ConcurrentReservationService(self.primary_service, notifier)
self.secondary_reservations = ConcurrentReservationService(self.secondary_service, notifier)
self.breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1.0)
self.cache: Dict[Tuple[str, str], List[Book]] = {}
def add_book(self, book: Book) -> None:
primary_copy = Book(book.isbn, book.title, book.author, book.copies)
secondary_copy = Book(book.isbn, book.title, book.author, book.copies)
try:
self.breaker.call(self.primary_repo.add, primary_copy)
except Exception as exc:
print("Primary add failed:", exc)
self.secondary_repo.add(secondary_copy)
def search(self, strategy: SearchStrategy, query: str) -> List[Book]:
key = (strategy.__class__.__name__, query.lower())
try:
results = self.breaker.call(self.primary_service.search, strategy, query)
self.cache[key] = results
return results
except Exception as exc:
print("Primary search failed:", exc)
if key in self.cache:
print("Serving cached results for", query)
return self.cache[key]
return self.secondary_service.search(strategy, query)
def checkout(self, isbn: str, patron: str, category: str) -> Optional[Loan]:
try:
loan = self.primary_reservations.reserve(isbn, patron)
if loan:
return loan
except Exception as exc:
print("Primary checkout error:", exc)
return self.secondary_reservations.reserve(isbn, patron)
def checkin(self, loan: Loan) -> None:
try:
self.primary_reservations.return_copy(loan)
except Exception as exc:
print("Primary checkin error:", exc)
self.secondary_reservations.return_copy(loan)
def main() -> None:
random.seed(5)
primary_repo = FlakyCatalogRepository(0.4)
secondary_repo = CatalogRepository()
gateway = ResilientLibraryGateway(primary_repo, secondary_repo, notifier)
gateway.add_book(Book("9780132350884", "Clean Code", "Robert C. Martin", 1))
gateway.add_book(Book("9781491950357", "Site Reliability Engineering", "Betsy Beyer", 2))
gateway.add_book(Book("9780134494166", "Clean Architecture", "Robert C. Martin", 1))
for term in ["clean", "reliability", "clean"]:
results = gateway.search(TitleSearch(), term)
print(term, "->", [book.title for book in results])
loans: List[Loan] = []
for patron in ["patronA", "patronB", "patronC"]:
loan = gateway.checkout("9780132350884", patron, "PREMIUM")
if loan:
loans.append(loan)
for loan in loans:
gateway.checkin(loan)
if __name__ == "__main__":
main()
Build the Splitwise-like system progressively from a basic ledger to concurrent settlement handling and resilient reconciliation.
Splitwise Platform ├─ Level 1: Ledger + Expense Aggregation ├─ Level 2: Concurrent Settlement Engine └─ Level 3: Resilient Settlement Coordinator
Level 1 — Core Implementation
We start by modelling groups, participants, and ledger entries so expense ingestion is explicit and auditable. Settlement suggestions are derived from the ledger and published through notifiers, demonstrating the separation between computation and delivery. This level encourages practising idempotent writes and clear data structures before concurrency is involved.
from __future__ import annotations
import threading
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
@dataclass
class Expense:
title: str
amount: float
participants: List[str]
class BalanceLedger:
def __init__(self) -> None:
self.expenses: List[Expense] = []
self.balances: Dict[str, float] = defaultdict(float)
self._lock = threading.RLock()
def record(self, expense: Expense) -> None:
with self._lock:
share = expense.amount / len(expense.participants)
payer = expense.participants[0]
for user in expense.participants:
if user == payer:
self.balances[user] += expense.amount - share
else:
self.balances[user] -= share
self.expenses.append(expense)
def summary(self) -> Dict[str, float]:
with self._lock:
return dict(self.balances)
def simplify(self) -> List[Tuple[str, str, float]]:
with self._lock:
debtors = [(user, -amount) for user, amount in self.balances.items() if amount < 0]
creditors = [(user, amount) for user, amount in self.balances.items() if amount > 0]
debtors.sort(key=lambda x: x[1], reverse=True)
creditors.sort(key=lambda x: x[1], reverse=True)
settlements: List[Tuple[str, str, float]] = []
i = j = 0
while i < len(debtors) and j < len(creditors):
debtor, debt = debtors[i]
creditor, credit = creditors[j]
pay = min(debt, credit)
settlements.append((debtor, creditor, pay))
debt -= pay
credit -= pay
if debt == 0:
i += 1
else:
debtors[i] = (debtor, debt)
if credit == 0:
j += 1
else:
creditors[j] = (creditor, credit)
return settlements
class Notifier:
def notify(self, message: str) -> None:
print("Notify:", message)
class SplitwiseService:
def __init__(self, ledger: BalanceLedger, notifier: Notifier) -> None:
self.ledger = ledger
self.notifier = notifier
def add_expense(self, title: str, amount: float, participants: List[str]) -> None:
expense = Expense(title, amount, participants)
self.ledger.record(expense)
self.notifier.notify(f"Expense {title} recorded for {participants}")
def balances(self) -> Dict[str, float]:
return self.ledger.summary()
def main() -> None:
service = SplitwiseService(BalanceLedger(), Notifier())
service.add_expense("Dinner", 120.0, ["A", "B", "C"])
service.add_expense("Cab", 60.0, ["B", "A"])
print("Balances:", service.balances())
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 invites multiple users to post at once, so we layer per-group locks around ledger updates and settlement runs. We validate that aggregate balances remain consistent even when expenses and repayments interleave. Thread-aware logging makes it easy to trace interleavings and confirm the invariants hold.
from __future__ import annotations
import threading
from typing import Dict, List, Optional, Tuple
class AccountLockManager:
def __init__(self) -> None:
self._locks: Dict[str, threading.Lock] = {}
self._guard = threading.Lock()
def acquire(self, users: List[str]) -> List[threading.Lock]:
ordered = sorted(set(users))
locks: List[threading.Lock] = []
with self._guard:
for user in ordered:
locks.append(self._locks.setdefault(user, threading.Lock()))
for lock in locks:
lock.acquire()
return locks
@staticmethod
def release(locks: List[threading.Lock]) -> None:
for lock in reversed(locks):
lock.release()
class ConcurrentSplitwiseService:
def __init__(self, service: SplitwiseService) -> None:
self.service = service
self.lock_manager = AccountLockManager()
def add_expense(self, title: str, amount: float, participants: List[str]) -> None:
locks = self.lock_manager.acquire(participants)
try:
self.service.add_expense(title, amount, participants)
finally:
AccountLockManager.release(locks)
def settlements(self) -> List[Tuple[str, str, float]]:
return self.service.ledger.simplify()
def worker(service: ConcurrentSplitwiseService, idx: int) -> None:
participants = ["A", "B", "C"]
service.add_expense(f"expense-{idx}", 40.0 + idx * 5, participants)
def main() -> None:
base_service = SplitwiseService(BalanceLedger(), Notifier())
concurrent_service = ConcurrentSplitwiseService(base_service)
threads = [threading.Thread(target=worker, args=(concurrent_service, i)) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Balances:", base_service.balances())
print("Settlements:", concurrent_service.settlements())
if __name__ == "__main__":
main()
Level 3 — Resilient Settlement Coordinator
The advanced Splitwise level assumes cash-outs talk to unreliable processors, so we buffer transfer commands in an outbox that survives restarts. Circuit breakers prevent hammering a failing provider, while retry policies replay buffered commands once health checks recover. The exercise demonstrates how durability, retries, and monitoring combine to keep group balances trustworthy.
from __future__ import annotations
import random
import threading
import time
from dataclasses import dataclass
from queue import Queue
from typing import List, Tuple
@dataclass
class SettlementCommand:
debtor: str
creditor: str
amount: float
class PaymentGateway:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def transfer(self, command: SettlementCommand) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("processor-down")
print(f"Transferred {command.amount:.2f} from {command.debtor} to {command.creditor}")
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure = 0.0
self.state = "CLOSED"
self._lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self._lock:
if self.state == "OPEN":
if time.time() - self.last_failure >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self._lock:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
self.failures = 0
raise
else:
with self._lock:
self.failures = 0
self.state = "CLOSED"
return result
class ResilientSettlementCoordinator:
def __init__(self, service: ConcurrentSplitwiseService, gateway: PaymentGateway, breaker: CircuitBreaker) -> None:
self.service = service
self.gateway = gateway
self.breaker = breaker
self.outbox: "Queue[SettlementCommand]" = Queue()
def enqueue_settlements(self) -> None:
for debtor, creditor, amount in self.service.settlements():
self.outbox.put(SettlementCommand(debtor, creditor, amount))
def process(self) -> None:
while not self.outbox.empty():
command = self.outbox.get()
attempt = 0
while attempt < 3:
try:
self.breaker.call(self.gateway.transfer, command)
break
except Exception as exc:
attempt += 1
backoff = 0.1 * (2 ** attempt)
print(f"Retrying {command} after {backoff:.2f}s ({exc})")
time.sleep(backoff)
else:
print("Requeue failed command:", command)
self.outbox.put(command)
break
def main() -> None:
random.seed(7)
base_service = SplitwiseService(BalanceLedger(), Notifier())
concurrent_service = ConcurrentSplitwiseService(base_service)
for idx in range(3):
concurrent_service.add_expense(f"group-{idx}", 100 + idx * 20, ["A", "B", "C"])
coordinator = ResilientSettlementCoordinator(
concurrent_service,
PaymentGateway(0.5),
CircuitBreaker(failure_threshold=2, recovery_timeout=0.5),
)
coordinator.enqueue_settlements()
coordinator.process()
if __name__ == "__main__":
main()
Progressively enhance hotel booking from simple inventory control to concurrent allocation management and a resilient saga-based flow.
Hotel Booking Stack ├─ Level 1: Inventory Repository ├─ Level 2: Concurrent Allocation Locks └─ Level 3: Resilient Microservice Saga
Level 1 — Core Implementation
The opening hotel scenario models room types, inventory, and reservations so capacity can be computed deterministically. We carve out a service that checks availability, books stays, and tracks confirmations while emitting events for downstream systems. The baseline emphasises clear separation between querying availability and performing the actual mutation.
from __future__ import annotations
import threading
from dataclasses import dataclass, timedelta
from typing import Dict, List, Optional
@dataclass
class RoomType:
code: str
total_rooms: int
@dataclass
class Reservation:
reservation_id: str
room_type: str
check_in: date
check_out: date
class RoomInventory:
def __init__(self) -> None:
self.availability: Dict[str, int] = {}
self._lock = threading.RLock()
def add_room_type(self, room_type: RoomType) -> None:
with self._lock:
self.availability[room_type.code] = room_type.total_rooms
def reserve(self, room_type: str, rooms: int) -> None:
with self._lock:
available = self.availability.get(room_type, 0)
if available < rooms:
raise RuntimeError("insufficient-inventory")
self.availability[room_type] = available - rooms
def release(self, room_type: str, rooms: int) -> None:
with self._lock:
self.availability[room_type] = self.availability.get(room_type, 0) + rooms
def snapshot(self) -> Dict[str, int]:
with self._lock:
return dict(self.availability)
class ReservationIdFactory:
def __init__(self) -> None:
self.counter = 0
def next_id(self) -> str:
self.counter += 1
return f"R{self.counter:04d}"
class HotelBookingService:
def __init__(self, inventory: RoomInventory, id_factory: ReservationIdFactory) -> None:
self.inventory = inventory
self.id_factory = id_factory
self.reservations: Dict[str, Reservation] = {}
def reserve(self, room_type: str, nights: int, start: date) -> Reservation:
self.inventory.reserve(room_type, 1)
reservation_id = self.id_factory.next_id()
reservation = Reservation(
reservation_id,
room_type,
start,
start + timedelta(days=nights),
)
self.reservations[reservation_id] = reservation
return reservation
def main() -> None:
inventory = RoomInventory()
inventory.add_room_type(RoomType("DLX", 5))
service = HotelBookingService(inventory, ReservationIdFactory())
reservation = service.reserve("DLX", 2, date.today())
print("Reservation:", reservation)
print("Remaining DLX:", inventory.availability["DLX"])
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 assumes guests book in parallel, so we partition locks by room type and date, and adopt optimistic validation before committing. Expired holds are cleaned up deterministically so capacity is returned, and threads waiting on availability are notified promptly. The logs included in the exercise show exactly when and why a booking is declined, reinforcing correctness reasoning.
from __future__ import annotations
import threading
from collections import defaultdict
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Dict, List, Optional
class AvailabilityCalendar:
def __init__(self, room_type: str, total_rooms: int) -> None:
self.room_type = room_type
self.total_rooms = total_rooms
self._locks: Dict[date, threading.Lock] = defaultdict(threading.Lock)
self._availability: Dict[date, int] = defaultdict(lambda: total_rooms)
def try_reserve(self, start: date, nights: int) -> bool:
dates = [start + timedelta(days=i) for i in range(nights)]
locks = [self._locks[d] for d in dates]
for lock in locks:
lock.acquire()
try:
if any(self._availability[d] <= 0 for d in dates):
return False
for d in dates:
self._availability[d] -= 1
return True
finally:
for lock in reversed(locks):
lock.release()
def release(self, start: date, nights: int) -> None:
dates = [start + timedelta(days=i) for i in range(nights)]
locks = [self._locks[d] for d in dates]
for lock in locks:
lock.acquire()
try:
for d in dates:
self._availability[d] += 1
finally:
for lock in reversed(locks):
lock.release()
class ConcurrentHotelBookingService:
def __init__(self, service: HotelBookingService, calendar: AvailabilityCalendar) -> None:
self.service = service
self.calendar = calendar
def reserve(self, guest: str, room_type: str, nights: int, start: date) -> Optional[Reservation]:
if not self.calendar.try_reserve(start, nights):
print(f"{guest} reservation failed")
return None
reservation = self.service.reserve(room_type, nights, start)
print(f"{guest} reservation succeeded: {reservation.reservation_id}")
return reservation
def release(self, reservation: Reservation) -> None:
nights = (reservation.check_out - reservation.check_in).days
self.calendar.release(reservation.check_in, nights)
self.service.inventory.release(reservation.room_type, 1)
self.service.reservations.pop(reservation.reservation_id, None)
def worker(service: ConcurrentHotelBookingService, guest: str) -> None:
service.reserve(guest, "DLX", 2, date.today())
def main() -> None:
inventory = RoomInventory()
inventory.add_room_type(RoomType("DLX", 3))
booking_service = HotelBookingService(inventory, ReservationIdFactory())
calendar = AvailabilityCalendar("DLX", total_rooms=3)
concurrent_service = ConcurrentHotelBookingService(booking_service, calendar)
threads = [threading.Thread(target=worker, args=(concurrent_service, f"G{i}")) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced level coordinates reservation, pricing, and payment through a saga so each side effect is recorded and compensated if needed. Circuit breakers, retry caps, and fallback price calculators help the service downgrade gracefully instead of failing mid-stay. Learners follow the saga log to understand how each step either commits or applies compensation to restore inventory.
from __future__ import annotations
import random
import time
from dataclasses import dataclass
from datetime import date
from typing import List, Optional
@dataclass
class BookingCommand:
guest: str
room_type: str
nights: int
class PricingService:
def __init__(self, failure_rate: float = 0.3) -> None:
self.failure_rate = failure_rate
def quote(self, room_type: str, nights: int) -> float:
if random.random() < self.failure_rate:
raise RuntimeError("pricing-down")
base = 120 if room_type == "DLX" else 90
return base * nights
class PaymentService:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def charge(self, guest: str, amount: float) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("payment-failed")
print(f"Charged {guest} amount {amount}")
class SimpleCircuitBreaker:
def __init__(self, threshold: int, cool_down: float) -> None:
self.threshold = threshold
self.cool_down = cool_down
self.failures = 0
self.open_until = 0.0
def call(self, func, *args, **kwargs):
now = time.time()
if now < self.open_until:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
self.failures = 0
return result
except Exception as exc:
self.failures += 1
if self.failures >= self.threshold:
self.open_until = now + self.cool_down
self.failures = 0
raise exc
class BookingSagaCoordinator:
def __init__(self, booking: ConcurrentHotelBookingService, pricing: PricingService, payment: PaymentService) -> None:
self.booking = booking
self.pricing = pricing
self.payment = payment
self.breaker = SimpleCircuitBreaker(threshold=2, cool_down=1.0)
def execute(self, command: BookingCommand) -> None:
try:
price = self.breaker.call(self.pricing.quote, command.room_type, command.nights)
except Exception:
price = 100.0 * command.nights
print("Pricing fallback for", command.guest)
reservation = self.booking.reserve(command.guest, command.room_type, command.nights, date.today())
if not reservation:
print("Reservation failed for", command.guest)
return
try:
self.payment.charge(command.guest, price)
print(f"Reservation {reservation.reservation_id} confirmed for {command.guest}")
except Exception as exc:
print(f"Payment failed for {command.guest}: {exc}")
self.booking.release(reservation)
def main() -> None:
random.seed(9)
inventory = RoomInventory()
inventory.add_room_type(RoomType("DLX", 3))
service = HotelBookingService(inventory, ReservationIdFactory())
calendar = AvailabilityCalendar("DLX", total_rooms=3)
concurrent_service = ConcurrentHotelBookingService(service, calendar)
saga = BookingSagaCoordinator(concurrent_service, PricingService(0.4), PaymentService(0.5))
commands = [
BookingCommand("Alice", "DLX", 2),
BookingCommand("Bob", "DLX", 3),
BookingCommand("Carol", "DLX", 1),
BookingCommand("Dave", "DLX", 2),
]
for command in commands:
saga.execute(command)
if __name__ == "__main__":
main()
Grow the movie ticketing service from layout modeling to seat hold concurrency and resilient payment orchestration.
Movie Ticketing Stack ├─ Level 1: Theatre Layout Builder ├─ Level 2: Concurrent Hold/Release └─ Level 3: Resilient Payment Orchestration
Level 1 — Core Implementation
This theatre build begins by defining seat maps, categories, and hold records so pre-payment reservations are first-class citizens. We expose APIs to place holds, confirm purchases, and release inventory, showing how to balance expressiveness with simple state machines. Running the sample makes the lifecycle tangible before concurrency is introduced.
from __future__ import annotations
from dataclasses import dataclass
import threading
from typing import Dict, List, Optional, Tuple
@dataclass
class SeatCell:
row: str
number: int
category: str
status: str = "AVAILABLE"
class TheatreLayout:
def __init__(self) -> None:
self.grid: Dict[str, Dict[int, SeatCell]] = {}
self._lock = threading.RLock()
def add_row(self, row: str, count: int, category: str) -> None:
with self._lock:
self.grid[row] = {col: SeatCell(row, col, category) for col in range(1, count + 1)}
def available(self, category: str) -> List[SeatCell]:
with self._lock:
return [
seat for row in self.grid.values() for seat in row.values()
if seat.category == category and seat.status == "AVAILABLE"
]
def mark(self, seats: List[Tuple[str, int]], status: str) -> None:
with self._lock:
for row, number in seats:
self.grid[row][number].status = status
class SeatHoldService:
def __init__(self, layout: TheatreLayout) -> None:
self.layout = layout
def hold(self, category: str, quantity: int) -> List[Tuple[str, int]]:
seats = self.layout.available(category)[:quantity]
if len(seats) < quantity:
raise RuntimeError("not enough seats")
chosen = [(seat.row, seat.number) for seat in seats]
self.layout.mark(chosen, "HOLD")
return chosen
def release(self, seats: List[Tuple[str, int]]) -> None:
self.layout.mark(seats, "AVAILABLE")
def confirm(self, seats: List[Tuple[str, int]]) -> None:
self.layout.mark(seats, "BOOKED")
def main() -> None:
layout = TheatreLayout()
layout.add_row("A", 10, "PREMIUM")
layout.add_row("B", 10, "PREMIUM")
layout.add_row("C", 10, "REGULAR")
hold_service = SeatHoldService(layout)
held = hold_service.hold("PREMIUM", 3)
print("Held seats:", held)
print("Remaining premium:", len(layout.available("PREMIUM")))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 simulates the rush for popular shows, coordinating per-row locks and timed expiries so no two patrons grab the same seat. Condition variables wake waiting threads when a hold expires, ensuring fairness without busy waiting. The trace output encourages learners to inspect interleavings and confirm that expired holds truly free inventory.
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
@dataclass
class SeatHoldTicket:
hold_id: str
seats: List[Tuple[str, int]]
customer: str
expires_at: float
class ConcurrentSeatHoldService:
def __init__(self, hold_service: SeatHoldService, ttl: float = 1.5) -> None:
self.hold_service = hold_service
self.ttl = ttl
self._holds: Dict[str, SeatHoldTicket] = {}
self._lock = threading.RLock()
self._counter = 0
threading.Thread(target=self._reaper, daemon=True).start()
def _next_id(self) -> str:
with self._lock:
self._counter += 1
return f"H{self._counter:04d}"
def _purge_expired(self) -> None:
now = time.time()
expired = [hold_id for hold_id, ticket in self._holds.items() if ticket.expires_at <= now]
for hold_id in expired:
ticket = self._holds.pop(hold_id)
self.hold_service.release(ticket.seats)
print("Expired hold released:", hold_id)
def _reaper(self) -> None:
while True:
time.sleep(self.ttl / 2)
with self._lock:
self._purge_expired()
def hold(self, category: str, quantity: int, customer: str) -> Optional[SeatHoldTicket]:
with self._lock:
self._purge_expired()
try:
seats = self.hold_service.hold(category, quantity)
except RuntimeError:
return None
ticket = SeatHoldTicket(self._next_id(), seats, customer, time.time() + self.ttl)
self._holds[ticket.hold_id] = ticket
return ticket
def release(self, hold_id: str) -> None:
with self._lock:
ticket = self._holds.pop(hold_id, None)
if ticket:
self.hold_service.release(ticket.seats)
def confirm(self, hold_id: str) -> Optional[List[Tuple[str, int]]]:
with self._lock:
ticket = self._holds.pop(hold_id, None)
if not ticket:
return None
self.hold_service.confirm(ticket.seats)
return ticket.seats
def worker(service: ConcurrentSeatHoldService, customer: str) -> None:
ticket = service.hold("PREMIUM", 1, customer)
print(customer, "hold ->", bool(ticket))
time.sleep(0.5)
if ticket:
service.release(ticket.hold_id)
def main() -> None:
layout = TheatreLayout()
layout.add_row("A", 3, "PREMIUM")
hold_service = SeatHoldService(layout)
concurrent_service = ConcurrentSeatHoldService(hold_service, ttl=0.8)
threads = [threading.Thread(target=worker, args=(concurrent_service, f"C{i}")) for i in range(4)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced level assumes payments can fail, so seat confirmation runs inside a saga that records every mutation and compensation. We wrap the gateway with retries and include a waitlist that automatically reclaims seats from failed purchases. Following the logs shows how the system maintains customer experience without manual intervention.
from __future__ import annotations
import random
import threading
import time
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class TicketOrder:
customer: str
category: str
quantity: int
class TicketPricingService:
def __init__(self, failure_rate: float = 0.3) -> None:
self.failure_rate = failure_rate
def quote(self, category: str, quantity: int) -> float:
if random.random() < self.failure_rate:
raise RuntimeError("pricing-down")
base = 150 if category == "PREMIUM" else 90
return base * quantity
class TicketPaymentService:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def charge(self, customer: str, amount: float) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("payment-failed")
print(f"Charged {customer} amount {amount}")
class TicketCircuitBreaker:
def __init__(self, threshold: int, cool_down: float) -> None:
self.threshold = threshold
self.cool_down = cool_down
self.failures = 0
self.open_until = 0.0
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
now = time.time()
if now < self.open_until:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self.lock:
self.failures += 1
if self.failures >= self.threshold:
self.open_until = time.time() + self.cool_down
self.failures = 0
raise
else:
with self.lock:
self.failures = 0
self.open_until = 0.0
return result
class TicketingSaga:
def __init__(self,
hold_service: ConcurrentSeatHoldService,
pricing: TicketPricingService,
payment: TicketPaymentService,
breaker: TicketCircuitBreaker) -> None:
self.hold_service = hold_service
self.pricing = pricing
self.payment = payment
self.breaker = breaker
def execute(self, order: TicketOrder) -> None:
ticket = self.hold_service.hold(order.category, order.quantity, order.customer)
if not ticket:
print(order.customer, "could not obtain hold")
return
try:
amount = self.breaker.call(self.pricing.quote, order.category, order.quantity)
except Exception:
amount = 100.0 * order.quantity
print("Pricing fallback used for", order.customer)
try:
self.payment.charge(order.customer, amount)
seats = self.hold_service.confirm(ticket.hold_id)
print(f"Order confirmed for {order.customer}: {seats}")
except Exception as exc:
print(f"Payment failed for {order.customer}: {exc}")
self.hold_service.release(ticket.hold_id)
def main() -> None:
random.seed(11)
layout = TheatreLayout()
layout.add_row("A", 4, "PREMIUM")
layout.add_row("B", 4, "REGULAR")
base_hold_service = SeatHoldService(layout)
concurrent_service = ConcurrentSeatHoldService(base_hold_service, ttl=1.0)
saga = TicketingSaga(
hold_service=concurrent_service,
pricing=TicketPricingService(0.4),
payment=TicketPaymentService(0.5),
breaker=TicketCircuitBreaker(2, 0.8),
)
orders = [
TicketOrder("Alice", "PREMIUM", 2),
TicketOrder("Bob", "PREMIUM", 2),
TicketOrder("Carol", "REGULAR", 2),
TicketOrder("Dave", "PREMIUM", 1),
]
for order in orders:
saga.execute(order)
if __name__ == "__main__":
main()
Enhance the caching service from a basic LRU map to a concurrent implementation and finally a resilient multi-tier architecture.
Cache Hierarchy ├─ Level 1: LRU Map ├─ Level 2: Concurrent LRU └─ Level 3: Resilient Multi-Tier Cache
Level 1 — Core Implementation
The cache series opens by explaining how to combine a hashmap with a doubly linked list to achieve O(1) operations. We walk through the handoff between inserts, promotions, and evictions so the eviction pointer always references the least recently used node. The sample script prints internal state so you can verify the structure matches the mental model.
from collections import OrderedDict
import threading
from typing import Optional
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self.store: OrderedDict[int, str] = OrderedDict()
self.lock = threading.RLock()
def get(self, key: int) -> Optional[str]:
with self.lock:
if key not in self.store:
return None
value = self.store.pop(key)
self.store[key] = value
return value
def put(self, key: int, value: str) -> None:
with self.lock:
if key in self.store:
self.store.pop(key)
self.store[key] = value
if len(self.store) > self.capacity:
self.store.popitem(last=False)
def items(self):
with self.lock:
return list(self.store.items())
def main() -> None:
cache = LRUCache(2)
cache.put(1, "A")
cache.put(2, "B")
print("Get 1:", cache.get(1))
cache.put(3, "C")
print("Cache items:", cache.items())
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 considers contention, dividing the keyspace into segments protected by fine-grained locks to improve throughput. Promotion and eviction logic remain O(1), but now we reason about lock acquisition order to avoid deadlocks. Running the threaded workload shows how to interpret interleaved log output to confirm correctness.
from __future__ import annotations
import threading
from typing import Dict
class SegmentedLRUCache:
def __init__(self, shard_count: int, capacity_per_shard: int) -> None:
self.shards: Dict[int, LRUCache] = {
shard: LRUCache(capacity_per_shard)
for shard in range(shard_count)
}
def _shard(self, key: int) -> LRUCache:
return self.shards[key % len(self.shards)]
def get(self, key: int):
return self._shard(key).get(key)
def put(self, key: int, value: str) -> None:
self._shard(key).put(key, value)
def snapshot(self):
return {idx: cache.items() for idx, cache in self.shards.items()}
def worker(cache: SegmentedLRUCache, idx: int) -> None:
cache.put(idx, f"V{idx}")
cache.get(idx)
def main() -> None:
cache = SegmentedLRUCache(2, 2)
threads = [threading.Thread(target=worker, args=(cache, i)) for i in range(6)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Snapshot:", cache.snapshot())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced cache design introduces a backing store that can fail, so writes queue for retry and reads consult both tiers. We capture metrics about cache hits, stale reads, and replay attempts to make degradation visible. Learners experience how layering caches in front of unreliable stores requires careful invalidation policies.
from __future__ import annotations
import random
import time
from typing import Dict, Optional
class FlakyDatastore:
def __init__(self, failure_rate: float = 0.3) -> None:
self.failure_rate = failure_rate
self.store: Dict[int, str] = {}
def get(self, key: int) -> Optional[str]:
if random.random() < self.failure_rate:
raise RuntimeError("datastore-down")
return self.store.get(key)
def put(self, key: int, value: str) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("datastore-down")
self.store[key] = value
class CircuitBreaker:
def __init__(self, threshold: int, cool_down: float) -> None:
self.threshold = threshold
self.cool_down = cool_down
self.failures = 0
self.open_until = 0.0
def call(self, func, *args, **kwargs):
now = time.time()
if now < self.open_until:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
self.failures = 0
return result
except Exception as exc:
self.failures += 1
if self.failures >= self.threshold:
self.open_until = now + self.cool_down
self.failures = 0
raise exc
class TieredCache:
def __init__(self, datastore: FlakyDatastore, l1_capacity: int = 2, l2_capacity: int = 4) -> None:
self.l1 = LRUCache(l1_capacity)
self.l2 = LRUCache(l2_capacity)
self.datastore = datastore
self.breaker = CircuitBreaker(2, 1.0)
def get(self, key: int) -> Optional[str]:
value = self.l1.get(key)
if value is not None:
return value
value = self.l2.get(key)
if value is not None:
self.l1.put(key, value)
return value
try:
value = self.breaker.call(self.datastore.get, key)
if value is not None:
self.l2.put(key, value)
self.l1.put(key, value)
return value
except Exception as exc:
print("Serving stale due to", exc)
return self.l2.get(key)
def put(self, key: int, value: str) -> None:
self.l1.put(key, value)
self.l2.put(key, value)
try:
self.breaker.call(self.datastore.put, key, value)
except Exception as exc:
print("Write deferred due to", exc)
def main() -> None:
random.seed(8)
cache = TieredCache(FlakyDatastore(0.5))
cache.put(1, "A")
cache.put(2, "B")
print("Get 1:", cache.get(1))
print("Get 2:", cache.get(2))
print("Get 3:", cache.get(3))
if __name__ == "__main__":
main()
Progressively enhance the elevator system from basic control to coordinated multi-elevator scheduling and resilient health-aware dispatch.
Elevator Control Stack ├─ Level 1: Basic Controller ├─ Level 2: Multi-Elevator Dispatch └─ Level 3: Resilient Dispatch with Health Checks
Level 1 — Eventful Single Cab
We start the elevator exercises by modelling the cab's lifecycle as a finite state machine that reacts to call requests and direction changes. Strategy interfaces decide which request to service next, while an EventBus broadcasts door and movement events for observability. This foundational level trains you to separate decision making from state transitions before scaling to fleets.
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional
@dataclass
class CallRequest:
floor: int
direction: str = "IDLE"
class EventBus:
def __init__(self) -> None:
self._subscribers: Dict[str, List[Callable[[dict], None]]] = {}
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers.setdefault(event, []).append(handler)
def publish(self, event: str, payload: dict) -> None:
for handler in self._subscribers.get(event, []):
handler(payload)
class StateMachine:
def __init__(self, initial_state: str):
self.state = initial_state
self._transitions: Dict[str, set[str]] = {}
def add_transition(self, origin: str, target: str) -> None:
self._transitions.setdefault(origin, set()).add(target)
def transition(self, target: str) -> None:
allowed = self._transitions.get(self.state, set())
if target != self.state and target not in allowed:
raise RuntimeError(f"Invalid transition {self.state} → {target}")
self.state = target
class ElevatorCab(StateMachine):
def __init__(self, cab_id: str, max_floor: int, bus: EventBus):
super().__init__("IDLE")
self.cab_id = cab_id
self.max_floor = max_floor
self.current_floor = 1
self.queue: List[int] = []
self.bus = bus
self.add_transition("IDLE", "MOVING")
self.add_transition("MOVING", "DOORS_OPEN")
self.add_transition("DOORS_OPEN", "IDLE")
def enqueue(self, floor: int) -> None:
if not 1 <= floor <= self.max_floor:
raise ValueError(f"Floor {floor} outside bounds")
if floor not in self.queue:
self.queue.append(floor)
self.bus.publish("cab.queue", {"cab": self.cab_id, "queue": list(self.queue)})
def step(self) -> None:
if not self.queue:
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
return
target = self.queue[0]
if self.current_floor < target:
self.current_floor += 1
self.transition("MOVING")
elif self.current_floor > target:
self.current_floor -= 1
self.transition("MOVING")
else:
self.queue.pop(0)
self.transition("DOORS_OPEN")
self.bus.publish("cab.arrived", {"cab": self.cab_id, "floor": self.current_floor})
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
class DispatchStrategy:
def choose(self, cab: ElevatorCab, request: CallRequest) -> ElevatorCab:
return cab
class ElevatorController:
def __init__(self, cab: ElevatorCab, strategy: DispatchStrategy, bus: EventBus):
self.cab = cab
self.strategy = strategy
self.bus = bus
self.bus.subscribe("call.requested", self._handle_request)
def request(self, floor: int, direction: str = "IDLE") -> None:
self.bus.publish("call.requested", {"request": CallRequest(floor, direction)})
def _handle_request(self, payload: dict) -> None:
request: CallRequest = payload["request"]
cab = self.strategy.choose(self.cab, request)
cab.enqueue(request.floor)
self.bus.publish("call.assigned", {"cab": cab.cab_id, "floor": request.floor})
def tick(self) -> None:
self.cab.step()
def main() -> None:
bus = EventBus()
bus.subscribe("cab.status", lambda event: print("[status]", event))
bus.subscribe("cab.arrived", lambda event: print("[arrived]", event))
cab = ElevatorCab("CAR-1", max_floor=12, bus=bus)
controller = ElevatorController(cab, DispatchStrategy(), bus)
controller.request(7, "UP")
controller.request(3, "DOWN")
for _ in range(8):
controller.tick()
if __name__ == "__main__":
main()
Level 2 — Fleet Dispatch Bus
Level 2 expands to multiple cars that subscribe to a shared EventBus, receiving commands from a central dispatcher. The dispatcher uses strategies to balance load and produces projections so you can inspect assignments historically. Even without threads the design reads like production code, making the eventual jump to concurrency straightforward.
from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Protocol, Tuple
@dataclass
class CallRequest:
floor: int
direction: str
class EventBus:
def __init__(self) -> None:
self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
self._queue: Deque[Tuple[str, dict]] = deque()
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers[event].append(handler)
def publish(self, event: str, payload: dict) -> None:
self._queue.append((event, payload))
def pump(self) -> None:
while self._queue:
event, payload = self._queue.popleft()
for handler in list(self._subscribers.get(event, [])):
handler(payload)
class StateMachine:
def __init__(self, initial_state: str):
self.state = initial_state
self._transitions: Dict[str, set[str]] = {}
def add_transition(self, origin: str, target: str) -> None:
self._transitions.setdefault(origin, set()).add(target)
def transition(self, target: str) -> None:
allowed = self._transitions.get(self.state, set())
if target != self.state and target not in allowed:
raise RuntimeError(f"Invalid transition {self.state} → {target}")
self.state = target
class ElevatorCab(StateMachine):
def __init__(self, cab_id: str, max_floor: int, bus: EventBus):
super().__init__("IDLE")
self.cab_id = cab_id
self.max_floor = max_floor
self.current_floor = 1
self.queue: List[int] = []
self.bus = bus
self.add_transition("IDLE", "MOVING")
self.add_transition("MOVING", "DOORS_OPEN")
self.add_transition("DOORS_OPEN", "IDLE")
def enqueue(self, floor: int) -> None:
if not 1 <= floor <= self.max_floor:
raise ValueError(f"Floor {floor} outside bounds")
if floor not in self.queue:
self.queue.append(floor)
self.bus.publish("cab.queue", {"cab": self.cab_id, "queue": list(self.queue)})
def step(self) -> None:
if not self.queue:
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
return
target = self.queue[0]
if self.current_floor < target:
self.current_floor += 1
self.transition("MOVING")
elif self.current_floor > target:
self.current_floor -= 1
self.transition("MOVING")
else:
self.queue.pop(0)
self.transition("DOORS_OPEN")
self.bus.publish("cab.arrived", {"cab": self.cab_id, "floor": self.current_floor})
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
class DispatchStrategy:
def choose(self, request: CallRequest, cabs: Iterable[ElevatorCab]) -> ElevatorCab:
def score(cab: ElevatorCab) -> Tuple[int, int, int, str]:
distance = abs(cab.current_floor - request.floor)
load = len(cab.queue)
direction_penalty = 0 if (request.direction == "UP" and cab.current_floor <= request.floor) or (
request.direction == "DOWN" and cab.current_floor >= request.floor
) else 1
return (distance, load, direction_penalty, cab.cab_id)
return min(cabs, key=score)
class Command(Protocol):
def execute(self) -> None:
...
class RequestElevatorCommand:
def __init__(self, bus: EventBus, request: CallRequest):
self.bus = bus
self.request = request
def execute(self) -> None:
self.bus.publish("call.received", {"request": self.request})
class ElevatorFleetController:
def __init__(self, cabs: List[ElevatorCab], strategy: DispatchStrategy, bus: EventBus):
self.cabs = {cab.cab_id: cab for cab in cabs}
self.strategy = strategy
self.bus = bus
self.bus.subscribe("call.received", self._assign_call)
def request(self, floor: int, direction: str) -> Command:
return RequestElevatorCommand(self.bus, CallRequest(floor, direction))
def _assign_call(self, payload: dict) -> None:
request: CallRequest = payload["request"]
cab = self.strategy.choose(request, self.cabs.values())
cab.enqueue(request.floor)
self.bus.publish("call.assigned", {"cab": cab.cab_id, "floor": request.floor, "direction": request.direction})
def step_all(self) -> None:
for cab in self.cabs.values():
cab.step()
class AssignmentBoard:
def __init__(self, bus: EventBus) -> None:
self.assignments: Dict[str, List[int]] = defaultdict(list)
self.served: List[Tuple[str, int]] = []
bus.subscribe("call.assigned", self._on_assigned)
bus.subscribe("cab.arrived", self._on_arrived)
def _on_assigned(self, payload: dict) -> None:
cab = payload["cab"]
floor = payload["floor"]
if floor not in self.assignments[cab]:
self.assignments[cab].append(floor)
def _on_arrived(self, payload: dict) -> None:
cab = payload["cab"]
floor = payload["floor"]
self.served.append((cab, floor))
if floor in self.assignments.get(cab, []):
self.assignments[cab].remove(floor)
def main() -> None:
bus = EventBus()
cabs = [
ElevatorCab("CAR-A", max_floor=20, bus=bus),
ElevatorCab("CAR-B", max_floor=20, bus=bus),
ElevatorCab("CAR-C", max_floor=20, bus=bus),
]
controller = ElevatorFleetController(cabs, DispatchStrategy(), bus)
board = AssignmentBoard(bus)
for floor, direction in [(7, "UP"), (3, "DOWN"), (14, "DOWN"), (9, "UP"), (2, "DOWN")]:
controller.request(floor, direction).execute()
bus.pump()
for _ in range(12):
controller.step_all()
bus.pump()
print("Outstanding assignments:", board.assignments)
print("Served stops:", board.served)
if __name__ == "__main__":
main()
Level 3 — Resilient Fleet Orchestrator
The final elevator level introduces fault scenarios such as stuck cars, so health monitors publish alerts and a repository tracks durable assignments. A saga coordinates requeueing requests when a car fails mid-service, ensuring passengers are not stranded. You learn how to extend existing event-driven flows with resilience features while keeping the model transparent.
from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Protocol, Tuple
@dataclass
class CallRequest:
floor: int
direction: str = "IDLE"
class EventBus:
def __init__(self) -> None:
self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
self._queue: Deque[Tuple[str, dict]] = deque()
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers[event].append(handler)
def publish(self, event: str, payload: dict) -> None:
self._queue.append((event, payload))
def pump(self) -> None:
while self._queue:
event, payload = self._queue.popleft()
for handler in list(self._subscribers.get(event, [])):
handler(payload)
class StateMachine:
def __init__(self, initial_state: str):
self.state = initial_state
self._transitions: Dict[str, set[str]] = {}
def add_transition(self, origin: str, target: str) -> None:
self._transitions.setdefault(origin, set()).add(target)
def transition(self, target: str) -> None:
allowed = self._transitions.get(self.state, set())
if target != self.state and target not in allowed:
raise RuntimeError(f"Invalid transition {self.state} → {target}")
self.state = target
class ElevatorCab(StateMachine):
def __init__(self, cab_id: str, max_floor: int, bus: EventBus):
super().__init__("IDLE")
self.cab_id = cab_id
self.max_floor = max_floor
self.current_floor = 1
self.queue: List[int] = []
self.bus = bus
self.add_transition("IDLE", "MOVING")
self.add_transition("MOVING", "DOORS_OPEN")
self.add_transition("DOORS_OPEN", "IDLE")
def enqueue(self, floor: int) -> None:
if not 1 <= floor <= self.max_floor:
raise ValueError(f"Floor {floor} outside bounds")
if floor not in self.queue:
self.queue.append(floor)
self.bus.publish("cab.queue", {"cab": self.cab_id, "queue": list(self.queue)})
def step(self) -> None:
if not self.queue:
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
return
target = self.queue[0]
if self.current_floor < target:
self.current_floor += 1
self.transition("MOVING")
elif self.current_floor > target:
self.current_floor -= 1
self.transition("MOVING")
else:
self.queue.pop(0)
self.transition("DOORS_OPEN")
self.bus.publish("cab.arrived", {"cab": self.cab_id, "floor": self.current_floor})
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
class DispatchStrategy:
def choose(self, request: CallRequest, cabs: Iterable[ElevatorCab]) -> ElevatorCab:
def score(cab: ElevatorCab) -> Tuple[int, int, int, str]:
distance = abs(cab.current_floor - request.floor)
load = len(cab.queue)
direction_penalty = 0 if (request.direction == "UP" and cab.current_floor <= request.floor) or (
request.direction == "DOWN" and cab.current_floor >= request.floor
) else 1
return (distance, load, direction_penalty, cab.cab_id)
return min(cabs, key=score)
class Command(Protocol):
def execute(self) -> None:
...
class RequestElevatorCommand:
def __init__(self, bus: EventBus, request: CallRequest):
self.bus = bus
self.request = request
def execute(self) -> None:
self.bus.publish("call.requested", {"request": self.request})
@dataclass
class CabRecord:
cab: ElevatorCab
online: bool = True
class CabRepository:
def __init__(self, records: Dict[str, CabRecord]):
self._records = records
def set_online(self, cab_id: str) -> None:
self._records[cab_id].online = True
def set_offline(self, cab_id: str) -> None:
self._records[cab_id].online = False
def online_cabs(self) -> List[ElevatorCab]:
return [record.cab for record in self._records.values() if record.online]
def snapshot(self) -> Dict[str, bool]:
return {cab_id: record.online for cab_id, record in self._records.items()}
class AssignmentJournal:
def __init__(self) -> None:
self._assignments: Dict[str, List[CallRequest]] = defaultdict(list)
def assign(self, cab_id: str, request: CallRequest) -> None:
if all(req.floor != request.floor or req.direction != request.direction for req in self._assignments[cab_id]):
self._assignments[cab_id].append(request)
def complete(self, cab_id: str, floor: int) -> None:
self._assignments[cab_id] = [req for req in self._assignments[cab_id] if req.floor != floor]
def failover(self, cab_id: str) -> List[CallRequest]:
return self._assignments.pop(cab_id, [])
def snapshot(self) -> Dict[str, List[int]]:
return {cab: [req.floor for req in requests] for cab, requests in self._assignments.items()}
class ElevatorOrchestrator:
def __init__(self, repository: CabRepository, journal: AssignmentJournal, strategy: DispatchStrategy, bus: EventBus):
self.repository = repository
self.journal = journal
self.strategy = strategy
self.bus = bus
self.pending: Deque[CallRequest] = deque()
bus.subscribe("call.requested", self._handle_request)
bus.subscribe("cab.arrived", self._handle_arrival)
bus.subscribe("cab.online", self._handle_online)
bus.subscribe("cab.offline", self._handle_offline)
def request(self, floor: int, direction: str) -> Command:
return RequestElevatorCommand(self.bus, CallRequest(floor, direction))
def _handle_request(self, payload: dict) -> None:
self.pending.append(payload["request"])
self._assign_pending()
def _handle_arrival(self, payload: dict) -> None:
cab_id = payload["cab"]
floor = payload["floor"]
self.journal.complete(cab_id, floor)
self.bus.publish("call.completed", {"cab": cab_id, "floor": floor})
self._assign_pending()
def _handle_online(self, _: dict) -> None:
self._assign_pending()
def _handle_offline(self, _: dict) -> None:
self._assign_pending()
def _assign_pending(self) -> None:
while self.pending:
candidates = self.repository.online_cabs()
if not candidates:
return
request = self.pending[0]
cab = self.strategy.choose(request, candidates)
if request.floor not in cab.queue:
cab.enqueue(request.floor)
self.journal.assign(cab.cab_id, request)
self.bus.publish("call.assigned", {"cab": cab.cab_id, "floor": request.floor, "direction": request.direction})
self.pending.popleft()
class HealthMonitor:
def __init__(self, repository: CabRepository, bus: EventBus):
self.repository = repository
self.bus = bus
self.last_floor: Dict[str, int] = {}
bus.subscribe("cab.status", self._capture_status)
bus.subscribe("cab.health", self._handle_health)
def _capture_status(self, payload: dict) -> None:
self.last_floor[payload["cab"]] = payload["floor"]
def _handle_health(self, payload: dict) -> None:
cab_id = payload["cab"]
online = payload["online"]
floor = self.last_floor.get(cab_id, 1)
if online:
self.repository.set_online(cab_id)
self.bus.publish("cab.online", {"cab": cab_id, "floor": floor})
else:
self.repository.set_offline(cab_id)
self.bus.publish("cab.offline", {"cab": cab_id, "floor": floor})
class FailoverSaga:
def __init__(self, bus: EventBus, journal: AssignmentJournal):
self.bus = bus
self.journal = journal
bus.subscribe("cab.offline", self._requeue)
def _requeue(self, payload: dict) -> None:
cab_id = payload["cab"]
floor = payload["floor"]
for request in self.journal.failover(cab_id):
self.bus.publish("call.requeued", {"from": cab_id, "floor": request.floor})
direction = request.direction
if direction == "IDLE":
direction = "UP" if request.floor >= floor else "DOWN"
self.bus.publish("call.requested", {"request": CallRequest(request.floor, direction)})
class MetricsProjection:
def __init__(self, bus: EventBus) -> None:
self.counters: Dict[str, int] = {"completed": 0, "requeued": 0, "degraded": 0}
self.degraded: set[str] = set()
bus.subscribe("call.completed", self._on_completed)
bus.subscribe("call.requeued", self._on_requeued)
bus.subscribe("cab.offline", self._on_offline)
def _on_completed(self, _: dict) -> None:
self.counters["completed"] += 1
def _on_requeued(self, _: dict) -> None:
self.counters["requeued"] += 1
def _on_offline(self, payload: dict) -> None:
cab = payload["cab"]
if cab not in self.degraded:
self.degraded.add(cab)
self.counters["degraded"] += 1
class FleetSupervisor:
def __init__(self, cabs: Iterable[ElevatorCab], bus: EventBus):
self.cabs = list(cabs)
self.bus = bus
def tick(self) -> None:
for cab in self.cabs:
cab.step()
self.bus.pump()
def main() -> None:
bus = EventBus()
cabs = [
ElevatorCab("CAR-A", max_floor=20, bus=bus),
ElevatorCab("CAR-B", max_floor=20, bus=bus),
ElevatorCab("CAR-C", max_floor=20, bus=bus),
]
repository = CabRepository({cab.cab_id: CabRecord(cab) for cab in cabs})
journal = AssignmentJournal()
orchestrator = ElevatorOrchestrator(repository, journal, DispatchStrategy(), bus)
HealthMonitor(repository, bus)
FailoverSaga(bus, journal)
metrics = MetricsProjection(bus)
supervisor = FleetSupervisor(cabs, bus)
bus.subscribe("call.assigned", lambda event: print("[assigned]", event))
bus.subscribe("call.completed", lambda event: print("[completed]", event))
bus.subscribe("call.requeued", lambda event: print("[requeued]", event))
for floor, direction in [(6, "UP"), (14, "DOWN"), (3, "UP"), (18, "DOWN")]:
orchestrator.request(floor, direction).execute()
bus.pump()
for _ in range(6):
supervisor.tick()
bus.publish("cab.health", {"cab": "CAR-B", "online": False})
bus.pump()
for _ in range(4):
supervisor.tick()
bus.publish("cab.health", {"cab": "CAR-B", "online": True})
bus.pump()
for _ in range(4):
supervisor.tick()
print("Assignments snapshot:", journal.snapshot())
print("Cab availability:", repository.snapshot())
print("Metrics:", metrics.counters)
if __name__ == "__main__":
main()
Progressively build the producer-consumer pipeline from basic bounded queues to worker pools and resilient retry/dead-letter handling.
Stream Processing Stack ├─ Level 1: Bounded Queue ├─ Level 2: Worker Pool └─ Level 3: Resilient Stream Processor
Level 1 — Core Implementation
We start the concurrency primitives by building a bounded queue that coordinates producers and consumers with condition variables. Logging around waits and notifications clarifies how threads coordinate access without busy looping. Running the sample shows exactly when producers block and consumers resume, anchoring the mental model.
import threading
from collections import deque
from typing import Deque, Optional
class BoundedQueue:
def __init__(self, capacity: int) -> None:
self.capacity = capacity
self.items: Deque[int] = deque()
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def put(self, item: int) -> None:
with self.not_full:
while len(self.items) >= self.capacity:
self.not_full.wait()
self.items.append(item)
self.not_empty.notify()
def get(self) -> int:
with self.not_empty:
while not self.items:
self.not_empty.wait()
item = self.items.popleft()
self.not_full.notify()
return item
def producer(queue: BoundedQueue, name: str, numbers: range) -> None:
for number in numbers:
queue.put(number)
print(f"{name} produced {number}")
def consumer(queue: BoundedQueue, name: str, consume: int) -> None:
for _ in range(consume):
item = queue.get()
print(f"{name} consumed {item}")
def main() -> None:
queue = BoundedQueue(2)
prod = threading.Thread(target=producer, args=(queue, "P1", range(1, 6)))
cons = threading.Thread(target=consumer, args=(queue, "C1", 5))
prod.start()
cons.start()
prod.join()
cons.join()
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 layers a worker pool atop the blocking queue, highlighting how to reuse primitives instead of rewriting them. We implement graceful shutdown so queued tasks finish and new submissions are rejected cleanly, covering the full lifecycle. Instrumentation demonstrates how backpressure protects the system when tasks arrive faster than they can be processed.
import threading
import time
from queue import Queue, Empty
from typing import Callable
class WorkerPool:
def __init__(self, workers: int) -> None:
self.tasks: "Queue[Callable[[], None]]" = Queue()
self.threads = [threading.Thread(target=self._worker, daemon=True) for _ in range(workers)]
self.stop_flag = threading.Event()
for thread in self.threads:
thread.start()
def submit(self, task: Callable[[], None]) -> None:
self.tasks.put(task)
def _worker(self) -> None:
while not self.stop_flag.is_set():
try:
task = self.tasks.get(timeout=0.2)
except Empty:
continue
try:
task()
finally:
self.tasks.task_done()
def shutdown(self) -> None:
self.tasks.join()
self.stop_flag.set()
for thread in self.threads:
thread.join(timeout=0.1)
def main() -> None:
pool = WorkerPool(3)
for idx in range(6):
pool.submit(lambda idx=idx: print(f"Worker executed job {idx}") or time.sleep(0.1))
pool.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced primitive stages messages through retries, exponential backoff, and a dead-letter queue to reflect real stream processing concerns. We separate retry policy, handler, and storage so each concern can be tested independently. Operators can follow the logs to see exactly why a message was retried or quarantined, reinforcing production-ready thinking.
import random
import time
from dataclasses import dataclass
from queue import Queue, Empty
from typing import Callable
@dataclass
class StreamMessage:
payload: str
attempts: int = 0
class DeadLetterQueue:
def __init__(self) -> None:
self.messages = []
def record(self, message: StreamMessage, reason: str) -> None:
print("Dead letter:", message.payload, reason)
self.messages.append((message, reason))
class ResilientStreamProcessor:
def __init__(self, handler: Callable[[StreamMessage], None], max_attempts: int = 3) -> None:
self.queue: "Queue[StreamMessage]" = Queue()
self.handler = handler
self.max_attempts = max_attempts
self.dead_letters = DeadLetterQueue()
def publish(self, payload: str) -> None:
self.queue.put(StreamMessage(payload))
def start(self) -> None:
while True:
try:
message = self.queue.get(timeout=0.2)
except Empty:
break
try:
self.handler(message)
print("Processed:", message.payload)
except Exception as exc:
message.attempts += 1
if message.attempts >= self.max_attempts:
self.dead_letters.record(message, str(exc))
else:
backoff = 0.1 * (2 ** message.attempts)
print(f"Retrying {message.payload} in {backoff:.2f}s")
time.sleep(backoff)
self.queue.put(message)
finally:
self.queue.task_done()
def flaky_handler(message: StreamMessage) -> None:
if random.random() < 0.4:
raise RuntimeError("transient failure")
def main() -> None:
random.seed(6)
processor = ResilientStreamProcessor(flaky_handler)
for payload in ["A", "B", "C", "D"]:
processor.publish(payload)
processor.start()
print("Dead letters:", processor.dead_letters.messages)
if __name__ == "__main__":
main()
Progressively enhance the task scheduler from delayed-job execution to recurring scheduling and a resilient cron engine with persistence.
Task Scheduler Stack ├─ Level 1: Delayed Job Scheduler ├─ Level 2: Recurring Jobs Coordinator └─ Level 3: Resilient Cron Engine
Level 1 — Core Implementation
The scheduling series begins with a min-heap of due times that wakes worker threads only when the next job is ready. We cover clock abstraction for deterministic tests and show how to coalesce wake-ups to avoid spurious work. The initial workload illustrates how precision and ordering play together when tasks share timestamps.
import heapq
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, List
@dataclass(order=True)
class ScheduledTask:
run_at: float
action: Callable[[], None] = field(compare=False)
class TaskScheduler:
def __init__(self) -> None:
self.tasks: List[ScheduledTask] = []
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.worker = threading.Thread(target=self._run, daemon=True)
self.worker.start()
def schedule(self, delay: float, action: Callable[[], None]) -> None:
with self.condition:
heapq.heappush(self.tasks, ScheduledTask(time.time() + delay, action))
self.condition.notify()
def _run(self) -> None:
while True:
with self.condition:
while not self.tasks:
self.condition.wait()
task = self.tasks[0]
now = time.time()
if task.run_at > now:
self.condition.wait(task.run_at - now)
continue
heapq.heappop(self.tasks)
task.action()
def main() -> None:
scheduler = TaskScheduler()
scheduler.schedule(0.2, lambda: print("Task A at", time.time()))
scheduler.schedule(0.4, lambda: print("Task B at", time.time()))
time.sleep(1)
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 adds recurring jobs that reschedule themselves, teaching how to avoid drift and double-booking. A worker pool executes tasks in parallel while the scheduler remains responsible for sequencing, separating concerns cleanly. Metrics reveal when jobs start falling behind, an important signal for capacity planning.
import heapq
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, List, Optional
@dataclass(order=True)
class RecurringTask:
next_run: float
interval: float = field(compare=False)
action: Callable[[], None] = field(compare=False)
class RecurringScheduler:
def __init__(self, workers: int = 2) -> None:
self.queue: List[RecurringTask] = []
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.stop = False
self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(workers)]
for worker in self.workers:
worker.start()
def schedule(self, interval: float, action: Callable[[], None]) -> None:
with self.condition:
heapq.heappush(self.queue, RecurringTask(time.time() + interval, interval, action))
self.condition.notify()
def _worker(self) -> None:
while True:
with self.condition:
while not self.queue and not self.stop:
self.condition.wait()
if self.stop and not self.queue:
return
task = self.queue[0]
now = time.time()
if task.next_run > now:
self.condition.wait(task.next_run - now)
continue
heapq.heappop(self.queue)
try:
task.action()
finally:
with self.condition:
heapq.heappush(self.queue, RecurringTask(time.time() + task.interval, task.interval, task.action))
self.condition.notify()
def shutdown(self) -> None:
with self.condition:
self.stop = True
self.condition.notify_all()
for worker in self.workers:
worker.join()
def main() -> None:
scheduler = RecurringScheduler(workers=2)
scheduler.schedule(0.3, lambda: print("Heartbeat at", time.time()))
scheduler.schedule(0.5, lambda: print("Cleanup at", time.time()))
time.sleep(1.5)
scheduler.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The final scheduler level serialises the job queue to durable storage and rehydrates it on restart, treating persistence as a first-class feature. Failed executions are retried with jittered delays so the fleet does not stampede upon recovery. Learners witness how crash recovery and retry semantics intertwine when uptime is non-negotiable.
import json
import os
import random
import threading
import time
from dataclasses import dataclass, asdict
from typing import Callable, Dict, List
@dataclass
class PersistentJob:
job_id: str
interval: float
next_run: float
class JsonJobStore:
def __init__(self, path: str) -> None:
self.path = path
if not os.path.exists(path):
with open(path, "w", encoding="utf-8") as handle:
json.dump([], handle)
def load(self) -> List[PersistentJob]:
with open(self.path, "r", encoding="utf-8") as handle:
data = json.load(handle)
return [PersistentJob(**entry) for entry in data]
def save(self, jobs: List[PersistentJob]) -> None:
with open(self.path, "w", encoding="utf-8") as handle:
json.dump([asdict(job) for job in jobs], handle)
class ResilientCronEngine:
def __init__(self, store: JsonJobStore, handlers: Dict[str, Callable[[], None]]) -> None:
self.store = store
self.handlers = handlers
self.jobs = {job.job_id: job for job in store.load()}
self.lock = threading.Lock()
self.stop = False
threading.Thread(target=self._loop, daemon=True).start()
def register(self, job_id: str, interval: float) -> None:
with self.lock:
job = PersistentJob(job_id, interval, time.time() + interval)
self.jobs[job_id] = job
self.store.save(list(self.jobs.values()))
def _loop(self) -> None:
while not self.stop:
now = time.time()
due = []
with self.lock:
for job in self.jobs.values():
if job.next_run <= now:
due.append(job)
for job in due:
self._execute(job)
time.sleep(0.1)
def _execute(self, job: PersistentJob) -> None:
handler = self.handlers[job.job_id]
attempt = 0
while True:
try:
handler()
break
except Exception as exc:
attempt += 1
if attempt >= 3:
print(f"Job {job.job_id} failed permanently:", exc)
break
backoff = 0.2 * (2 ** attempt) + random.random() * 0.1
print(f"Retrying job {job.job_id} in {backoff:.2f}s due to {exc}")
time.sleep(backoff)
with self.lock:
job.next_run = time.time() + job.interval
self.store.save(list(self.jobs.values()))
def shutdown(self) -> None:
self.stop = True
def flaky_job() -> None:
if random.random() < 0.3:
raise RuntimeError("intermittent failure")
print("Flaky job executed at", time.time())
def heartbeat_job() -> None:
print("Heartbeat job at", time.time())
def main() -> None:
random.seed(1)
store = JsonJobStore("/tmp/cron-jobs.json")
engine = ResilientCronEngine(store, {"flaky": flaky_job, "heartbeat": heartbeat_job})
if "flaky" not in engine.jobs:
engine.register("flaky", 0.4)
engine.register("heartbeat", 0.6)
time.sleep(2)
engine.shutdown()
if __name__ == "__main__":
main()
Grow the order management platform from basic state handling to concurrent orchestration and a resilient saga-based workflow.
Order Management Stack ├─ Level 1: State Machine ├─ Level 2: Concurrent Orchestrator └─ Level 3: Resilient Event-Sourced Saga
Level 1 — Core Implementation
This commerce workflow begins by expressing the order lifecycle as an explicit state machine so invalid transitions are impossible. We surface events for each transition, encouraging integration with analytics or downstream systems from day one. Practitioners learn to separate entities from services, making the model testable and extensible.
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum, auto
from typing import Dict, Set
class OrderState(Enum):
CREATED = auto()
PACKED = auto()
SHIPPED = auto()
DELIVERED = auto()
CANCELLED = auto()
ALLOWED: Dict[OrderState, Set[OrderState]] = {
OrderState.CREATED: {OrderState.PACKED, OrderState.CANCELLED},
OrderState.PACKED: {OrderState.SHIPPED},
OrderState.SHIPPED: {OrderState.DELIVERED},
}
@dataclass
class Order:
order_id: str
state: OrderState = OrderState.CREATED
def advance(self, next_state: OrderState) -> None:
allowed = ALLOWED.get(self.state, set())
if next_state not in allowed:
raise RuntimeError(f"Invalid transition {self.state} -> {next_state}")
self.state = next_state
def cancel(self) -> None:
if self.state in (OrderState.DELIVERED, OrderState.CANCELLED):
raise RuntimeError("Cannot cancel final order")
self.state = OrderState.CANCELLED
def main() -> None:
order = Order("O1")
print(order)
order.advance(OrderState.PACKED)
order.advance(OrderState.SHIPPED)
print("Current state:", order.state)
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 introduces concurrent updates, protecting each order with locks and version checks to block conflicting writes. Event publication remains asynchronous yet idempotent, showing how to avoid duplicate notifications. Threaded scenarios illustrate how to trace and resolve conflicts systematically.
from __future__ import annotations
import threading
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum, auto
from typing import Callable, DefaultDict, Dict, List
class OrderState(Enum):
CREATED = auto()
PACKED = auto()
SHIPPED = auto()
DELIVERED = auto()
TRANSITIONS = {
OrderState.CREATED: {OrderState.PACKED},
OrderState.PACKED: {OrderState.SHIPPED},
OrderState.SHIPPED: {OrderState.DELIVERED},
}
@dataclass
class Order:
order_id: str
state: OrderState = OrderState.CREATED
class EventBus:
def __init__(self) -> None:
self.subscribers: DefaultDict[str, List[Callable[[Order], None]]] = defaultdict(list)
def subscribe(self, event: str, handler: Callable[[Order], None]) -> None:
self.subscribers[event].append(handler)
def publish(self, event: str, order: Order) -> None:
for handler in self.subscribers[event]:
handler(order)
class OrderService:
def __init__(self, bus: EventBus) -> None:
self.bus = bus
self.orders: Dict[str, Order] = {}
self.locks: DefaultDict[str, threading.Lock] = defaultdict(threading.Lock)
def create(self, order_id: str) -> Order:
order = Order(order_id)
self.orders[order_id] = order
self.bus.publish("created", order)
return order
def transition(self, order_id: str, next_state: OrderState) -> None:
lock = self.locks[order_id]
with lock:
order = self.orders[order_id]
allowed = TRANSITIONS.get(order.state, set())
if next_state not in allowed:
raise RuntimeError("Invalid transition")
order.state = next_state
self.bus.publish(next_state.name.lower(), order)
def main() -> None:
bus = EventBus()
bus.subscribe("shipped", lambda order: print("Order shipped:", order.order_id))
service = OrderService(bus)
service.create("O1")
def worker():
service.transition("O1", OrderState.PACKED)
service.transition("O1", OrderState.SHIPPED)
threads = [threading.Thread(target=worker) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join()
print("Final state:", service.orders["O1"].state)
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced level links order, payment, and inventory through a saga orchestrator that records intent in an outbox. We implement compensation steps that restock items and cancel payments when any stage fails. Running the sample demonstrates how durable messaging plus clear state logs keep the business process auditable.
from __future__ import annotations
import random
import time
from dataclasses import dataclass
from enum import Enum, auto
from typing import List
class OrderState(Enum):
CREATED = auto()
CONFIRMED = auto()
FAILED = auto()
@dataclass
class Order:
order_id: str
state: OrderState = OrderState.CREATED
class PaymentService:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def charge(self, order_id: str) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("payment failure")
class InventoryService:
def reserve(self, order_id: str) -> None:
print("Inventory reserved for", order_id)
def release(self, order_id: str) -> None:
print("Inventory released for", order_id)
class Outbox:
def __init__(self) -> None:
self.events: List[str] = []
def record(self, event: str) -> None:
self.events.append(event)
print("Recorded event:", event)
class OrderSaga:
def __init__(self, payments: PaymentService, inventory: InventoryService, outbox: Outbox) -> None:
self.payments = payments
self.inventory = inventory
self.outbox = outbox
def execute(self, order: Order) -> None:
try:
self.inventory.reserve(order.order_id)
attempts = 0
while attempts < 3:
try:
self.payments.charge(order.order_id)
break
except Exception as exc:
attempts += 1
if attempts >= 3:
raise
backoff = 0.2 * attempts
print("Retry payment in", backoff)
time.sleep(backoff)
order.state = OrderState.CONFIRMED
self.outbox.record(f"order.confirmed:{order.order_id}")
except Exception as exc:
order.state = OrderState.FAILED
self.inventory.release(order.order_id)
self.outbox.record(f"order.failed:{order.order_id}:{exc}")
def main() -> None:
random.seed(12)
saga = OrderSaga(PaymentService(0.5), InventoryService(), Outbox())
for idx in range(3):
order = Order(f"O{idx}")
saga.execute(order)
print(order)
if __name__ == "__main__":
main()
Progressively enhance the auction system from simple bid tracking to concurrent bidding with fallback flows.
Auction Platform ├─ Level 1: Highest Bid Wins ├─ Level 2: Concurrent Bid Engine └─ Level 3: Resilient Auction Orchestrator
Level 1 — Core Implementation
We kick off the auction series with a straightforward domain model: auctions, bids, and closing rules expressed through clear classes. The engine validates bid increments, records provenance, and emits notifications when the auction closes. By stepping through the sample timeline you practise reasoning about ordering without concurrency yet.
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, List, Optional
@dataclass
class Bid:
bidder: str
amount: float
timestamp: float
class HighestBidStrategy:
def select(self, bids: List[Bid]) -> Optional[Bid]:
if not bids:
return None
return max(bids, key=lambda bid: (bid.amount, -bid.timestamp))
class Auction:
def __init__(self, strategy: HighestBidStrategy, notifier: Callable[[str], None]) -> None:
self.bids: List[Bid] = []
self.strategy = strategy
self.notifier = notifier
def place_bid(self, bid: Bid) -> None:
self.bids.append(bid)
def close(self) -> Optional[Bid]:
winner = self.strategy.select(self.bids)
if winner:
self.notifier(f"Auction won by {winner.bidder} for {winner.amount}")
return winner
def main() -> None:
auction = Auction(HighestBidStrategy(), print)
auction.place_bid(Bid("Alice", 100.0, 1.0))
auction.place_bid(Bid("Bob", 120.0, 2.0))
auction.place_bid(Bid("Carol", 110.0, 3.0))
winner = auction.close()
print("Winner:", winner)
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 invites multiple bidders to compete at once, so we guard state with locks and use timed events to close auctions automatically. We capture concurrent placements and ensure only bids submitted before the deadline are accepted. Logging of timestamps and winning bids helps diagnose edge cases such as simultaneous offers.
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class Bid:
bidder: str
amount: float
class TimedAuction:
def __init__(self, duration: float) -> None:
self.duration = duration
self._lock = threading.Lock()
self._winner: Optional[Bid] = None
self._closed = False
threading.Thread(target=self._close_after_timeout, daemon=True).start()
def _close_after_timeout(self) -> None:
time.sleep(self.duration)
with self._lock:
self._closed = True
def place_bid(self, bid: Bid) -> None:
with self._lock:
if self._closed:
raise RuntimeError("Auction closed")
if not self._winner or bid.amount > self._winner.amount:
self._winner = bid
def winner(self) -> Optional[Bid]:
with self._lock:
return self._winner
def bidder_thread(auction: TimedAuction, name: str, amount: float) -> None:
try:
auction.place_bid(Bid(name, amount))
except RuntimeError as exc:
print(name, "failed:", exc)
def main() -> None:
auction = TimedAuction(0.5)
threads = [threading.Thread(target=bidder_thread, args=(auction, f"B{i}", 100 + i * 10)) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
time.sleep(0.6)
print("Auction winner:", auction.winner())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced auction layer distributes the engine across replicas that agree via quorum writes and keep write-ahead logs. We simulate node failures and recovery to illustrate how replication maintains consistency and availability. Learners connect how consensus protocols and durable logs keep financial workflows reliable.
from __future__ import annotations
import random
from dataclasses import dataclass
from typing import Dict, List, Optional, Optional
@dataclass
class Bid:
bidder: str
amount: float
class Replica:
def __init__(self, name: str, failure_rate: float = 0.2) -> None:
self.name = name
self.failure_rate = failure_rate
self.log: List[Bid] = []
self.available = True
def append(self, bid: Bid) -> None:
if not self.available or random.random() < self.failure_rate:
self.available = False
raise RuntimeError(f"{self.name} unavailable")
self.log.append(bid)
def recover(self) -> None:
self.available = True
class AuctionCoordinator:
def __init__(self, replicas: List[Replica], quorum: int) -> None:
self.replicas = replicas
self.quorum = quorum
def submit_bid(self, bid: Bid) -> None:
successes = 0
failures = []
for replica in self.replicas:
try:
replica.append(bid)
successes += 1
except Exception as exc:
failures.append((replica, exc))
if successes < self.quorum:
raise RuntimeError("Not enough replicas")
for replica, _ in failures:
replica.recover()
def winner(self) -> Optional[Bid]:
merged: List[Bid] = []
for replica in self.replicas:
merged.extend(replica.log)
if not merged:
return None
return max(merged, key=lambda bid: bid.amount)
def main() -> None:
random.seed(3)
replicas = [Replica("R1", 0.3), Replica("R2", 0.1), Replica("R3", 0.2)]
coordinator = AuctionCoordinator(replicas, quorum=2)
for amount in [120.0, 150.0, 140.0]:
try:
coordinator.submit_bid(Bid(f"Bidder{amount}", amount))
except Exception as exc:
print("Submit failed:", exc)
print("Winning bid:", coordinator.winner())
if __name__ == "__main__":
main()
Build the wallet service from ledger recording to concurrent transactions and resilient transfer orchestration.
Wallet Service Stack ├─ Level 1: Core Ledger ├─ Level 2: Concurrent Transactions └─ Level 3: Resilient Transfers
Level 1 — Core Implementation
This wallet implementation begins with ledgers and transaction records that enforce idempotency, preventing duplicates from skewing balances. We separate read models from command handlers so querying balances stays fast while writes remain auditable. Exercises show how replaying the same transaction ID leaves the ledger unchanged, cementing the concept.
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Set
@dataclass
class LedgerEntry:
user_id: str
amount: float
txn_id: str
class WalletService:
def __init__(self) -> None:
self.balances: Dict[str, float] = {}
self.seen_txn: Set[str] = set()
def credit(self, user_id: str, amount: float, txn_id: str) -> None:
if txn_id in self.seen_txn:
return
self.balances[user_id] = self.balances.get(user_id, 0.0) + amount
self.seen_txn.add(txn_id)
def debit(self, user_id: str, amount: float, txn_id: str) -> None:
if txn_id in self.seen_txn:
return
balance = self.balances.get(user_id, 0.0)
if balance < amount:
raise RuntimeError("insufficient funds")
self.balances[user_id] = balance - amount
self.seen_txn.add(txn_id)
def balance(self, user_id: str) -> float:
return self.balances.get(user_id, 0.0)
def main() -> None:
wallet = WalletService()
wallet.credit("U1", 100.0, "txn1")
wallet.debit("U1", 40.0, "txn2")
wallet.credit("U1", 100.0, "txn1")
print("Balance:", wallet.balance("U1"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 considers simultaneous deposits and withdrawals, so each account receives its own lock and optimistic balance checks. We discuss starvation risks and log contention windows to help reason about throughput. The provided workload highlights how to verify invariants even when operations interleave aggressively.
from __future__ import annotations
import threading
from collections import defaultdict
from typing import Dict, Set
class AccountLocks:
def __init__(self) -> None:
self._locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)
def acquire(self, user_id: str) -> threading.Lock:
lock = self._locks[user_id]
lock.acquire()
return lock
class ConcurrentWallet:
def __init__(self) -> None:
self.balances: Dict[str, float] = defaultdict(float)
self.seen_txn: Set[str] = set()
self.account_locks = AccountLocks()
self.txn_lock = threading.Lock()
def credit(self, user_id: str, amount: float, txn_id: str) -> None:
with self.txn_lock:
if txn_id in self.seen_txn:
return
self.seen_txn.add(txn_id)
lock = self.account_locks.acquire(user_id)
try:
self.balances[user_id] += amount
finally:
lock.release()
def debit(self, user_id: str, amount: float, txn_id: str) -> None:
with self.txn_lock:
if txn_id in self.seen_txn:
return
self.seen_txn.add(txn_id)
lock = self.account_locks.acquire(user_id)
try:
if self.balances[user_id] < amount:
raise RuntimeError("insufficient funds")
self.balances[user_id] -= amount
finally:
lock.release()
def worker(wallet: ConcurrentWallet, user_id: str, amount: float, txn_prefix: str) -> None:
wallet.credit(user_id, amount, f"{txn_prefix}-credit")
try:
wallet.debit(user_id, amount / 2, f"{txn_prefix}-debit")
except Exception as exc:
print("Debit failed:", exc)
def main() -> None:
wallet = ConcurrentWallet()
threads = [threading.Thread(target=worker, args=(wallet, "U1", 50.0, f"T{i}")) for i in range(4)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Final balance:", wallet.balances["U1"])
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced wallet connects to an unreliable payment network, so debits are recorded in an outbox before we attempt the external call. Circuit breakers stop repeated failures, while retry workers drain the outbox when the dependency recovers. Learners can inspect the logs to see exactly when a payout succeeded, retried, or was parked for later.
from __future__ import annotations
import random
import time
from dataclasses import dataclass
from queue import Queue, Empty
from typing import Dict
@dataclass
class Payout:
user_id: str
amount: float
txn_id: str
class ExternalNetwork:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def payout(self, user_id: str, amount: float) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("network failure")
print(f"Payout to {user_id} for {amount}")
class CircuitBreaker:
def __init__(self, threshold: int, cool_down: float) -> None:
self.threshold = threshold
self.cool_down = cool_down
self.failures = 0
self.open_until = 0.0
def call(self, func, *args, **kwargs):
now = time.time()
if now < self.open_until:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
self.failures = 0
return result
except Exception as exc:
self.failures += 1
if self.failures >= self.threshold:
self.open_until = now + self.cool_down
self.failures = 0
raise exc
class ResilientWallet:
def __init__(self, network: ExternalNetwork) -> None:
self.balances: Dict[str, float] = {"U1": 200.0}
self.breaker = CircuitBreaker(2, 1.0)
self.outbox: "Queue[Payout]" = Queue()
self.network = network
def payout(self, user_id: str, amount: float, txn_id: str) -> None:
if self.balances.get(user_id, 0.0) < amount:
raise RuntimeError("insufficient funds")
self.balances[user_id] -= amount
self.outbox.put(Payout(user_id, amount, txn_id))
def process_outbox(self) -> None:
pending = []
while True:
try:
payout = self.outbox.get_nowait()
except Empty:
break
try:
self.breaker.call(self.network.payout, payout.user_id, payout.amount)
print("Payout success:", payout)
except Exception as exc:
print("Payout failed:", exc, "requeueing")
pending.append(payout)
finally:
self.outbox.task_done()
for payout in pending:
self.outbox.put(payout)
def main() -> None:
random.seed(10)
wallet = ResilientWallet(ExternalNetwork(0.5))
wallet.payout("U1", 50.0, "tx1")
wallet.payout("U1", 30.0, "tx2")
wallet.payout("U1", 40.0, "tx3")
for _ in range(5):
wallet.process_outbox()
time.sleep(0.2)
print("Remaining balance:", wallet.balances["U1"])
if __name__ == "__main__":
main()
Grow the tic-tac-toe system from a game engine to a concurrent match server with resilient matchmaking orchestration.
Tic-Tac-Toe Platform ├─ Level 1: Core Game Engine ├─ Level 2: Concurrent Match Server └─ Level 3: Resilient Matchmaking
Level 1 — Core Implementation
This classic kata reinforces modelling domain rules explicitly: boards, moves, and win detection algorithms live in focused classes. We validate coordinates, enforce turn order, and compute winners immediately after each move. Because the objects are pure and side-effect free, unit tests mirror the game logic one-to-one.
from typing import List, Optional
class TicTacToe:
def __init__(self) -> None:
self.board: List[List[str]] = [["" for _ in range(3)] for _ in range(3)]
self.current = "X"
def move(self, row: int, col: int) -> Optional[str]:
if self.board[row][col]:
raise RuntimeError("Cell occupied")
self.board[row][col] = self.current
winner = self._winner()
if winner or self._is_draw():
return winner or "DRAW"
self.current = "O" if self.current == "X" else "X"
return None
def _winner(self) -> Optional[str]:
lines = []
lines.extend(self.board)
lines.extend([[self.board[r][c] for r in range(3)] for c in range(3)])
lines.append([self.board[i][i] for i in range(3)])
lines.append([self.board[i][2 - i] for i in range(3)])
for line in lines:
if line[0] and line.count(line[0]) == 3:
return line[0]
return None
def _is_draw(self) -> bool:
return all(cell for row in self.board for cell in row)
def main() -> None:
game = TicTacToe()
moves = [(0, 0), (1, 0), (1, 1), (2, 0), (2, 2)]
for move in moves:
result = game.move(*move)
print("Move", move, "result:", result)
if result:
break
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 spins up several matches simultaneously, teaching how to isolate game state per session while sharing the underlying engine. We synchronise access to shared registries while leaving per-game logic lock-free. Threaded simulations demonstrate how isolation keeps results deterministic even under concurrency.
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Dict, List, Optional, Optional
class TicTacToeEngine:
def __init__(self) -> None:
self.board = [["" for _ in range(3)] for _ in range(3)]
self.current = "X"
def move(self, row: int, col: int) -> Optional[str]:
if self.board[row][col]:
raise RuntimeError("Invalid move")
self.board[row][col] = self.current
if self._winner():
return self.current
if all(cell for row in self.board for cell in row):
return "DRAW"
self.current = "O" if self.current == "X" else "X"
return None
def _winner(self) -> bool:
lines = []
lines.extend(self.board)
lines.extend([[self.board[r][c] for r in range(3)] for c in range(3)])
lines.append([self.board[i][i] for i in range(3)])
lines.append([self.board[i][2 - i] for i in range(3)])
return any(line[0] and line.count(line[0]) == 3 for line in lines)
@dataclass
class MatchSession:
match_id: str
engine: TicTacToeEngine
lock: threading.Lock
class MatchServer:
def __init__(self) -> None:
self.sessions: Dict[str, MatchSession] = {}
def create_match(self, match_id: str) -> None:
self.sessions[match_id] = MatchSession(match_id, TicTacToeEngine(), threading.Lock())
def play(self, match_id: str, row: int, col: int) -> Optional[str]:
session = self.sessions[match_id]
with session.lock:
return session.engine.move(row, col)
def match_runner(server: MatchServer, match_id: str, moves: List[tuple[int, int]]) -> None:
for move in moves:
result = server.play(match_id, *move)
if result:
print(match_id, "ended with", result)
break
def main() -> None:
server = MatchServer()
server.create_match("M1")
server.create_match("M2")
thread1 = threading.Thread(target=match_runner, args=(server, "M1", [(0, 0), (0, 1), (1, 1), (0, 2), (2, 2)]))
thread2 = threading.Thread(target=match_runner, args=(server, "M2", [(1, 1), (0, 0), (2, 2), (0, 1), (0, 2), (2, 0), (1, 0)]))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced level records every move as an event stream and replays it to rebuild boards, illustrating event sourcing concepts. We persist logs, handle idempotent replays, and verify the restored game state matches expectations. This teaches how lightweight persistence can give small games durable history without complex infrastructure.
import json
import os
from typing import List, Optional, Tuple
class EventStore:
def __init__(self, path: str) -> None:
self.path = path
if not os.path.exists(path):
with open(path, "w", encoding="utf-8") as handle:
json.dump([], handle)
def append(self, match_id: str, move: Tuple[int, int]) -> None:
events = self.load_all()
events.append({"match_id": match_id, "move": move})
with open(self.path, "w", encoding="utf-8") as handle:
json.dump(events, handle)
def load_all(self) -> List[dict]:
with open(self.path, "r", encoding="utf-8") as handle:
return json.load(handle)
class ReplayableTicTacToe:
def __init__(self) -> None:
self.board = [["" for _ in range(3)] for _ in range(3)]
self.current = "X"
def apply(self, row: int, col: int) -> Optional[str]:
self.board[row][col] = self.current
winner = self._winner()
self.current = "O" if self.current == "X" else "X"
return winner
def _winner(self) -> Optional[str]:
lines = []
lines.extend(self.board)
lines.extend([[self.board[r][c] for r in range(3)] for c in range(3)])
lines.append([self.board[i][i] for i in range(3)])
lines.append([self.board[i][2 - i] for i in range(3)])
for line in lines:
if line[0] and line.count(line[0]) == 3:
return line[0]
return None
def main() -> None:
store = EventStore("/tmp/tictactoe-events.json")
game = ReplayableTicTacToe()
moves = [(0, 0), (0, 1), (1, 1), (2, 0), (2, 2)]
for move in moves:
winner = game.apply(*move)
store.append("match-1", move)
replay = ReplayableTicTacToe()
for event in store.load_all():
replay.apply(*event["move"])
print("Original board:", game.board)
print("Replayed board:", replay.board)
if __name__ == "__main__":
main()
Enhance the chat system from simple chat rooms to concurrent messaging infrastructure with delivery guarantees.
Chat Service Stack ├─ Level 1: Basic Chat Room ├─ Level 2: Concurrent Messaging └─ Level 3: Resilient Message Delivery
Level 1 — Core Implementation
The messaging series opens with user, conversation, and message entities along with a service that persists and emits notifications. We highlight idempotent message IDs and clear repository interfaces so later scaling remains possible. The sample interaction proves that storage and notification concerns are already decoupled.
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, DefaultDict, List
@dataclass
class Message:
sender: str
recipient: str
body: str
message_id: str | None = None
channel: str | None = None
class ConversationRepository:
def __init__(self) -> None:
self._direct: DefaultDict[tuple[str, str], List[Message]] = defaultdict(list)
def add_direct(self, message: Message) -> None:
key = tuple(sorted([message.sender, message.recipient]))
self._direct[key].append(message)
def history(self, user_a: str, user_b: str) -> List[Message]:
key = tuple(sorted([user_a, user_b]))
return list(self._direct[key])
class ChatService:
def __init__(self, repo: ConversationRepository, notifier: Callable[[Message], None]) -> None:
self.repo = repo
self.notifier = notifier
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body)
def send(self, sender: str, recipient: str, body: str) -> Message:
message = self._create_direct_message(sender, recipient, body)
self.repo.add_direct(message)
self.notifier(message)
return message
def main() -> None:
repo = ConversationRepository()
service = ChatService(repo, lambda msg: print("Notify", msg.recipient, ":", msg.body))
service.send("alice", "bob", "hello bob")
service.send("bob", "alice", "hey alice")
print("History:", [(item.sender, item.body) for item in repo.history("alice", "bob")])
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 expands to group channels, introducing fan-out queues that deliver messages asynchronously to subscribers. Producers enqueue payloads without blocking while worker threads acknowledge delivery, showcasing backpressure handling. Tracing deliveries helps learners understand exactly when a message leaves the durable log and reaches recipients.
from __future__ import annotations
import asyncio
import itertools
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, DefaultDict, Dict, List
# --- Level 1 foundation ---
@dataclass
class Message:
sender: str
recipient: str | None
body: str
message_id: str | None = None
channel: str | None = None
class ConversationRepository:
def __init__(self) -> None:
self._direct: DefaultDict[tuple[str, str], List[Message]] = defaultdict(list)
def add_direct(self, message: Message) -> None:
assert message.recipient is not None
key = tuple(sorted([message.sender, message.recipient]))
self._direct[key].append(message)
def history(self, user_a: str, user_b: str) -> List[Message]:
key = tuple(sorted([user_a, user_b]))
return list(self._direct[key])
class ChatService:
def __init__(self, repo: ConversationRepository, notifier: Callable[[Message], None]) -> None:
self.repo = repo
self.notifier = notifier
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body)
def send(self, sender: str, recipient: str, body: str) -> Message:
message = self._create_direct_message(sender, recipient, body)
self.repo.add_direct(message)
self.notifier(message)
return message
# --- Level 2 concurrency extensions ---
class ChannelRepository(ConversationRepository):
def __init__(self) -> None:
super().__init__()
self._channels: DefaultDict[str, List[Message]] = defaultdict(list)
def add_channel(self, channel: str, message: Message) -> None:
self._channels[channel].append(message)
def channel_history(self, channel: str) -> List[Message]:
return list(self._channels[channel])
class ConcurrentChatService(ChatService):
def __init__(self, repo: ChannelRepository, notifier: Callable[[Message], None]) -> None:
super().__init__(repo, notifier)
self.repo = repo
self._ids = (str(value) for value in itertools.count(1))
def _next_id(self) -> str:
return next(self._ids)
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body, message_id=self._next_id())
def send_to_channel(self, sender: str, channel: str, body: str) -> Message:
message = Message(sender, None, body, message_id=self._next_id(), channel=channel)
self.repo.add_channel(channel, message)
self.notifier(message)
return message
class AsyncChannelDispatcher:
def __init__(self, service: ConcurrentChatService) -> None:
self.service = service
self.subscribers: Dict[str, Dict[str, asyncio.Queue[Message]]] = defaultdict(dict)
async def subscribe(self, user: str, channel: str) -> asyncio.Queue[Message]:
queue: asyncio.Queue[Message] = asyncio.Queue()
self.subscribers[channel][user] = queue
return queue
async def publish(self, sender: str, channel: str, body: str) -> Message:
message = self.service.send_to_channel(sender, channel, body)
for queue in self.subscribers[channel].values():
await queue.put(message)
return message
async def main_async() -> None:
repo = ChannelRepository()
dispatcher = AsyncChannelDispatcher(ConcurrentChatService(repo, lambda message: None))
general_alice = await dispatcher.subscribe("alice", "#general")
general_bob = await dispatcher.subscribe("bob", "#general")
async def alice_consumer() -> None:
msg = await general_alice.get()
print("alice received", msg.body)
async def bob_consumer() -> None:
msg = await general_bob.get()
print("bob received", msg.body)
await dispatcher.publish("service-bot", "#general", "Welcome to the channel!")
await asyncio.gather(alice_consumer(), bob_consumer())
if __name__ == "__main__":
asyncio.run(main_async())
Level 3 — Resilient Architecture
The advanced chat layer introduces acknowledgement tracking per subscriber, ensuring at-least-once delivery even when clients misbehave. Retries, dead-letter queues, and monitoring metrics make delivery guarantees explicit rather than aspirational. Learners can unplug a subscriber mid-run and watch how the dispatcher compensates until the message is acknowledged or quarantined.
from __future__ import annotations
import asyncio
import itertools
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, DefaultDict, Dict, List
# --- Level 1 foundation ---
@dataclass
class Message:
sender: str
recipient: str | None
body: str
message_id: str | None = None
channel: str | None = None
class ConversationRepository:
def __init__(self) -> None:
self._direct: DefaultDict[tuple[str, str], List[Message]] = defaultdict(list)
def add_direct(self, message: Message) -> None:
assert message.recipient is not None
key = tuple(sorted([message.sender, message.recipient]))
self._direct[key].append(message)
def history(self, user_a: str, user_b: str) -> List[Message]:
key = tuple(sorted([user_a, user_b]))
return list(self._direct[key])
class ChatService:
def __init__(self, repo: ConversationRepository, notifier: Callable[[Message], None]) -> None:
self.repo = repo
self.notifier = notifier
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body)
def send(self, sender: str, recipient: str, body: str) -> Message:
message = self._create_direct_message(sender, recipient, body)
self.repo.add_direct(message)
self.notifier(message)
return message
# --- Level 2 concurrency extensions ---
class ChannelRepository(ConversationRepository):
def __init__(self) -> None:
super().__init__()
self._channels: DefaultDict[str, List[Message]] = defaultdict(list)
def add_channel(self, channel: str, message: Message) -> None:
self._channels[channel].append(message)
def channel_history(self, channel: str) -> List[Message]:
return list(self._channels[channel])
class ConcurrentChatService(ChatService):
def __init__(self, repo: ChannelRepository, notifier: Callable[[Message], None]) -> None:
super().__init__(repo, notifier)
self.repo = repo
self._ids = (str(value) for value in itertools.count(1))
def _next_id(self) -> str:
return next(self._ids)
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body, message_id=self._next_id())
def send_to_channel(self, sender: str, channel: str, body: str) -> Message:
message = Message(sender, None, body, message_id=self._next_id(), channel=channel)
self.repo.add_channel(channel, message)
self.notifier(message)
return message
class AsyncChannelDispatcher:
def __init__(self, service: ConcurrentChatService) -> None:
self.service = service
self.subscribers: Dict[str, Dict[str, asyncio.Queue[Message]]] = defaultdict(dict)
async def subscribe(self, user: str, channel: str) -> asyncio.Queue[Message]:
queue: asyncio.Queue[Message] = asyncio.Queue()
self.subscribers[channel][user] = queue
return queue
async def publish(self, sender: str, channel: str, body: str) -> Message:
message = self.service.send_to_channel(sender, channel, body)
for queue in self.subscribers[channel].values():
await queue.put(message)
return message
# --- Level 3 resiliency additions ---
@dataclass
class PendingDelivery:
message: Message
user: str
attempts: int = 0
class ReliableChannelDispatcher(AsyncChannelDispatcher):
def __init__(self, service: ConcurrentChatService, max_attempts: int = 3) -> None:
super().__init__(service)
self.max_attempts = max_attempts
self.pending: Dict[str, PendingDelivery] = {}
self.dead_letters: List[PendingDelivery] = []
def _key(self, message_id: str, user: str) -> str:
return f"{message_id}:{user}"
async def publish(self, sender: str, channel: str, body: str) -> Message:
message = await super().publish(sender, channel, body)
for user in self.subscribers[channel]:
if message.message_id is None:
continue
self.pending[self._key(message.message_id, user)] = PendingDelivery(message, user)
return message
def ack(self, message_id: str, user: str) -> None:
self.pending.pop(self._key(message_id, user), None)
async def deliver_pending(self) -> None:
for key, delivery in list(self.pending.items()):
channel = delivery.message.channel
assert channel is not None
queue = self.subscribers[channel][delivery.user]
await queue.put(delivery.message)
delivery.attempts += 1
if delivery.attempts >= self.max_attempts:
self.dead_letters.append(delivery)
del self.pending[key]
async def main_async() -> None:
repo = ChannelRepository()
dispatcher = ReliableChannelDispatcher(ConcurrentChatService(repo, lambda message: None))
incidents_alice = await dispatcher.subscribe("alice", "#incidents")
incidents_bob = await dispatcher.subscribe("bob", "#incidents")
async def alice_consumer() -> None:
msg = await incidents_alice.get()
print("alice acked", msg.body)
if msg.message_id is not None:
dispatcher.ack(msg.message_id, "alice")
async def bob_consumer() -> None:
deliveries = 0
while deliveries < 2:
msg = await incidents_bob.get()
deliveries += 1
print("bob attempt", deliveries, "for", msg.body)
if deliveries == 2 and msg.message_id is not None:
dispatcher.ack(msg.message_id, "bob")
await dispatcher.publish("service-bot", "#incidents", "incident #1234")
await asyncio.gather(alice_consumer(), bob_consumer())
await dispatcher.deliver_pending()
if dispatcher.dead_letters:
print("Dead letters:", [(d.user, d.message.body) for d in dispatcher.dead_letters])
if __name__ == "__main__":
asyncio.run(main_async())
Progressively build the file storage system from a core store to concurrent upload handling and resilient replication layers.
File Storage Stack ├─ Level 1: Core Store ├─ Level 2: Concurrent Uploads └─ Level 3: Resilient Replication
Level 1 — Core Implementation
We open the file-sync trilogy by modelling files, versions, and metadata repositories so uploads, reads, and history queries are explicit. Hashing content and storing version manifests teaches tamper detection and rollback. The interactive demo illustrates how users can retrieve any version without worrying about concurrency yet.
from __future__ import annotations
from dataclasses import dataclass
from typing import DefaultDict, Dict, List
@dataclass
class FileVersion:
version: int
content: str
class FileRepository:
def __init__(self) -> None:
self.store: DefaultDict[str, List[FileVersion]] = DefaultDict(list)
def add_version(self, path: str, content: str) -> FileVersion:
versions = self.store[path]
version = FileVersion(len(versions) + 1, content)
versions.append(version)
return version
def latest(self, path: str) -> FileVersion:
return self.store[path][-1]
def history(self, path: str) -> List[FileVersion]:
return self.store[path]
def main() -> None:
repo = FileRepository()
repo.add_version("notes.txt", "Version 1")
repo.add_version("notes.txt", "Version 2")
print("Latest:", repo.latest("notes.txt"))
print("History:", repo.history("notes.txt"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 brings in multiple devices editing the same document, so optimistic version checks detect conflicts early. We surface merge contexts and demonstrate how to branch new versions while preserving history. Logging of version IDs lets learners follow how the server arbitrates between racing uploads.
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class VersionedDocument:
version: int
content: str
class VersionRepository:
def __init__(self) -> None:
self.versions: Dict[str, List[VersionedDocument]] = {"notes.txt": [VersionedDocument(1, "Base")]}
self.lock = threading.Lock()
def latest(self, path: str) -> VersionedDocument:
return self.versions[path][-1]
def append(self, path: str, content: str) -> VersionedDocument:
with self.lock:
latest = self.latest(path)
new_version = VersionedDocument(latest.version + 1, content)
self.versions[path].append(new_version)
return new_version
class ConflictResolver:
def merge(self, base: str, change_a: str, change_b: str) -> str:
return base + "\n" + change_a + "\n" + change_b
def worker(repo: VersionRepository, resolver: ConflictResolver, change: str) -> None:
latest = repo.latest("notes.txt")
merged = resolver.merge(latest.content, change, "")
repo.append("notes.txt", merged)
def main() -> None:
repo = VersionRepository()
resolver = ConflictResolver()
threads = [
threading.Thread(target=worker, args=(repo, resolver, "DeviceA change")),
threading.Thread(target=worker, args=(repo, resolver, "DeviceB change")),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for doc in repo.versions["notes.txt"]:
print(doc)
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced file-sync level distributes versions across regions, verifying each transfer with checksums and retrying failed replications. We maintain an intent log so the system can resume outstanding transfers after a crash. Observing the replication status output teaches how durability, validation, and backoff interact.
from __future__ import annotations
import hashlib
import random
import threading
import time
from dataclasses import dataclass
from queue import Queue
from typing import Dict
@dataclass
class FileBlob:
version: int
content: str
checksum: str
class PrimaryStore:
def __init__(self) -> None:
self.files: Dict[str, FileBlob] = {}
def save(self, path: str, content: str) -> FileBlob:
checksum = hashlib.sha256(content.encode()).hexdigest()
version = self.files.get(path, FileBlob(0, "", "")).version + 1
blob = FileBlob(version, content, checksum)
self.files[path] = blob
return blob
class ReplicaStore:
def __init__(self, name: str, failure_rate: float = 0.3) -> None:
self.name = name
self.failure_rate = failure_rate
self.files: Dict[str, FileBlob] = {}
def replicate(self, path: str, blob: FileBlob) -> None:
if random.random() < self.failure_rate:
raise RuntimeError(f"{self.name} unavailable")
self.files[path] = blob
class Replicator:
def __init__(self, replicas: Dict[str, ReplicaStore]) -> None:
self.replicas = replicas
self.queue: "Queue[tuple[str, FileBlob]]" = Queue()
self._start()
def _start(self) -> None:
def loop() -> None:
while True:
path, blob = self.queue.get()
for replica in self.replicas.values():
try:
replica.replicate(path, blob)
print(replica.name, "replicated", path, "v", blob.version)
except Exception as exc:
print("Replication failed:", exc)
time.sleep(0.2)
self.queue.put((path, blob))
self.queue.task_done()
threading.Thread(target=loop, daemon=True).start()
def enqueue(self, path: str, blob: FileBlob) -> None:
self.queue.put((path, blob))
def main() -> None:
random.seed(14)
primary = PrimaryStore()
replicator = Replicator({
"replica-a": ReplicaStore("ReplicaA", 0.5),
"replica-b": ReplicaStore("ReplicaB", 0.1),
})
blob = primary.save("notes.txt", "critical data v1")
replicator.enqueue("notes.txt", blob)
time.sleep(1)
if __name__ == "__main__":
main()
Progressively enhance the logging framework from basic appenders to async logging with resilient fallback pipelines.
Logging Framework Stack\n ├─ Level 1: Core Logger\n ├─ Level 2: Asynchronous Logger\n └─ Level 3: Resilient Logging Pipeline
Level 1 — Core Implementation
The logging journey starts with a minimal core that filters by severity and fans out to pluggable appenders. We highlight formatter abstractions so structured and plain-text logs can coexist. Sample usage shows how configuration drives behaviour rather than hard-coded branching.
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum, auto
from typing import List
class Level(Enum):
DEBUG = auto()
INFO = auto()
WARN = auto()
ERROR = auto()
@dataclass
class LogRecord:
level: Level
message: str
class Appender:
def append(self, record: LogRecord) -> None:
raise NotImplementedError
class ConsoleAppender(Appender):
def append(self, record: LogRecord) -> None:
print(f"[{record.level.name}] {record.message}")
class MemoryAppender(Appender):
def __init__(self) -> None:
self.records: List[LogRecord] = []
def append(self, record: LogRecord) -> None:
self.records.append(record)
class Logger:
def __init__(self, level: Level, appenders: List[Appender]) -> None:
self.level = level
self.appenders = appenders
def log(self, level: Level, message: str) -> None:
if level.value < self.level.value:
return
record = LogRecord(level, message)
for appender in self.appenders:
appender.append(record)
def info(self, message: str) -> None:
self.log(Level.INFO, message)
def main() -> None:
memory = MemoryAppender()
logger = Logger(Level.DEBUG, [ConsoleAppender(), memory])
logger.info("Hello logging")
print("Memory records:", memory.records)
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 buffers log events in a queue and drains them with worker threads to keep application threads responsive. We talk through shutdown hooks, flush semantics, and backpressure so logs are not lost on exit. The demonstration makes it visible when producers block because the dispatcher is saturated.
import threading
from enum import Enum, auto
from queue import Queue
from typing import Callable
class Level(Enum):
DEBUG = auto()
INFO = auto()
WARN = auto()
ERROR = auto()
class AsyncLogger:
def __init__(self, sink: Callable[[Level, str], None]) -> None:
self.queue: "Queue[tuple[Level, str]]" = Queue()
self.sink = sink
self.stop = threading.Event()
threading.Thread(target=self._worker, daemon=True).start()
def log(self, level: Level, message: str) -> None:
self.queue.put((level, message))
def _worker(self) -> None:
while not self.stop.is_set():
try:
level, message = self.queue.get(timeout=0.2)
except Exception:
continue
self.sink(level, message)
self.queue.task_done()
def shutdown(self) -> None:
self.queue.join()
self.stop.set()
def sink(level: Level, message: str) -> None:
print(f"[{level.name}] {message}")
def worker(logger: AsyncLogger, index: int) -> None:
for i in range(3):
logger.log(Level.INFO, f"Thread {index} message {i}")
def main() -> None:
logger = AsyncLogger(sink)
threads = [threading.Thread(target=worker, args=(logger, i)) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
logger.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced logging level assumes the main sink can fail, so we add retry logic, bounded buffers, and a disk-backed fallback. Backpressure signals bubble up when buffers fill, prompting producers to slow down. Watching the failover sequence teaches how reliability and observability work hand in hand.
import random
import time
from collections import deque
from enum import Enum, auto
class Level(Enum):
INFO = auto()
ERROR = auto()
class PrimarySink:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def write(self, level: Level, message: str) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("primary sink down")
print("Primary:", level.name, message)
class FallbackSink:
def __init__(self) -> None:
self.storage = []
def write(self, level: Level, message: str) -> None:
print("Fallback:", level.name, message)
self.storage.append((level, message))
class ResilientLogger:
def __init__(self, primary: PrimarySink, fallback: FallbackSink) -> None:
self.primary = primary
self.fallback = fallback
self.buffer: deque[tuple[Level, str]] = deque(maxlen=50)
def log(self, level: Level, message: str) -> None:
self.buffer.append((level, message))
self._drain()
def _drain(self) -> None:
pending = len(self.buffer)
for _ in range(pending):
level, message = self.buffer.popleft()
try:
self.primary.write(level, message)
except Exception as exc:
print("Primary failed:", exc)
self.fallback.write(level, message)
def main() -> None:
random.seed(15)
logger = ResilientLogger(PrimarySink(0.5), FallbackSink())
for i in range(10):
logger.log(Level.INFO, f"event {i}")
time.sleep(0.05)
if __name__ == "__main__":
main()
Build out the event bus from simple pub/sub to concurrent handlers and resilient delivery with retries.
Event Bus Stack\n ├─ Level 1: Basic Publisher/Subscriber\n ├─ Level 2: Concurrent Event Processing\n └─ Level 3: Resilient Delivery Service
Level 1 — Core Implementation
The pub/sub sequence starts with a straightforward event bus that manages subscriptions and delivers synchronous callbacks. We explore how to prevent subscriber failures from crashing the bus, and document contracts clearly. Sample interactions underline the loose coupling benefits before durability enters the scene.
from collections import defaultdict
from typing import Callable, DefaultDict, List
class EventBus:
def __init__(self) -> None:
self.subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
def subscribe(self, topic: str, handler: Callable[[dict], None]) -> None:
self.subscribers[topic].append(handler)
def publish(self, topic: str, payload: dict) -> None:
for handler in self.subscribers[topic]:
handler(payload)
def main() -> None:
bus = EventBus()
bus.subscribe("order.created", lambda payload: print("Analytics saw", payload))
bus.subscribe("order.created", lambda payload: print("Billing saw", payload))
bus.publish("order.created", {"order_id": "O1", "amount": 100})
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 adds persistence by storing events per topic and requiring acknowledgements before deletion. We simulate slow or failing subscribers to show how unacked messages get redelivered. Instrumentation on ack latency familiarises learners with diagnosing consumer behaviour.
from collections import defaultdict, deque
from typing import Callable, Deque, Dict, Tuple
class DurableBus:
def __init__(self) -> None:
self.queues: Dict[str, Deque[Tuple[int, dict]]] = defaultdict(deque)
self.handlers: Dict[str, Callable[[int, dict], bool]] = {}
self.offsets: Dict[str, int] = defaultdict(int)
def subscribe(self, topic: str, handler: Callable[[int, dict], bool]) -> None:
self.handlers[topic] = handler
def publish(self, topic: str, payload: dict) -> None:
offset = self.offsets[topic]
self.offsets[topic] += 1
self.queues[topic].append((offset, payload))
self._dispatch(topic)
def _dispatch(self, topic: str) -> None:
handler = self.handlers.get(topic)
if not handler:
return
queue = self.queues[topic]
pending = len(queue)
for _ in range(pending):
offset, payload = queue[0]
ack = handler(offset, payload)
if ack:
queue.popleft()
else:
break
def main() -> None:
bus = DurableBus()
def handler(offset: int, payload: dict) -> bool:
print("Processing", offset, payload)
return offset % 2 == 0
bus.subscribe("metrics", handler)
for idx in range(5):
bus.publish("metrics", {"value": idx})
print("Remaining queue:", list(bus.queues["metrics"]))
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced bus introduces retry policies and dead-letter topics for messages that repeatedly fail. We support exponential backoff so retries do not overwhelm handlers, and we surface metrics to alert on poison messages. By observing the event flow you see how to maintain both reliability and transparency.
import random
import time
from collections import defaultdict, deque
from typing import Callable, Deque, Dict, Tuple
class ResilientBroker:
def __init__(self) -> None:
self.handlers: Dict[str, Callable[[dict], None]] = {}
self.queues: Dict[str, Deque[Tuple[int, dict, int]]] = defaultdict(deque)
self.dead_letters: Dict[str, list] = defaultdict(list)
def subscribe(self, topic: str, handler: Callable[[dict], None]) -> None:
self.handlers[topic] = handler
def publish(self, topic: str, payload: dict) -> None:
queue = self.queues[topic]
queue.append((len(queue), payload, 0))
self._drain(topic)
def _drain(self, topic: str) -> None:
queue = self.queues[topic]
handler = self.handlers.get(topic)
pending = len(queue)
for _ in range(pending):
offset, payload, attempt = queue.popleft()
try:
handler(payload)
except Exception as exc:
attempt += 1
if attempt >= 3:
print("Dead-lettering", payload)
self.dead_letters[topic].append((payload, str(exc)))
else:
backoff = 0.1 * (2 ** attempt)
print("Retrying", payload, "in", backoff)
time.sleep(backoff)
queue.append((offset, payload, attempt))
def main() -> None:
random.seed(16)
broker = ResilientBroker()
def handler(payload: dict) -> None:
if random.random() < 0.5:
raise RuntimeError("handler failure")
print("Handled", payload)
broker.subscribe("notifications", handler)
for idx in range(4):
broker.publish("notifications", {"id": idx})
print("Dead letters:", broker.dead_letters["notifications"])
if __name__ == "__main__":
main()
Grow the workflow orchestrator from basic sequencing to concurrent coordination and resilient recovery.
Workflow Orchestrator Stack\n ├─ Level 1: Core Orchestrator\n ├─ Level 2: Concurrent Workflow Engine\n └─ Level 3: Resilient Workflow Service
Level 1 — Core Implementation
The workflow series begins with representing tasks and dependencies as a directed acyclic graph and performing a topological traversal. We emit lifecycle events for start and completion so orchestration is observable. Stepping through the sample illustrates how prerequisite tracking keeps execution honest.
from collections import defaultdict, deque
from typing import Callable, Dict, List, Set
class Workflow:
def __init__(self) -> None:
self.graph: Dict[str, Set[str]] = defaultdict(set)
self.reverse: Dict[str, Set[str]] = defaultdict(set)
self.tasks: Dict[str, Callable[[], None]] = {}
def add_task(self, name: str, fn: Callable[[], None]) -> None:
self.tasks[name] = fn
def add_edge(self, prereq: str, task: str) -> None:
self.graph[prereq].add(task)
self.reverse[task].add(prereq)
def run(self) -> None:
indegree: Dict[str, int] = {task: len(self.reverse[task]) for task in self.tasks}
ready = deque([task for task, deg in indegree.items() if deg == 0])
while ready:
task = ready.popleft()
self.tasks[task]()
for neighbor in self.graph[task]:
indegree[neighbor] -= 1
if indegree[neighbor] == 0:
ready.append(neighbor)
def main() -> None:
wf = Workflow()
wf.add_task("A", lambda: print("A"))
wf.add_task("B", lambda: print("B"))
wf.add_task("C", lambda: print("C"))
wf.add_task("D", lambda: print("D"))
wf.add_edge("A", "B")
wf.add_edge("A", "C")
wf.add_edge("B", "D")
wf.run()
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 brings concurrency by dispatching ready tasks to a worker pool while still guarding dependency checks. We synchronise updates to shared dependency counters and capture traces to visualise parallel execution. The exercise highlights how concurrency can coexist with determinism when data structures are chosen carefully.
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from typing import Callable, Dict, Set
class ConcurrentWorkflow:
def __init__(self) -> None:
self.dependencies: Dict[str, Set[str]] = defaultdict(set)
self.dependents: Dict[str, Set[str]] = defaultdict(set)
self.tasks: Dict[str, Callable[[], None]] = {}
def add_task(self, name: str, fn: Callable[[], None]) -> None:
self.tasks[name] = fn
def add_edge(self, prereq: str, task: str) -> None:
self.dependencies[task].add(prereq)
self.dependents[prereq].add(task)
def run(self) -> None:
completed: Set[str] = set()
executor = ThreadPoolExecutor(max_workers=4)
def schedule(task: str) -> None:
executor.submit(execute_task, task)
def execute_task(task: str) -> None:
self.tasks[task]()
completed.add(task)
for dependent in self.dependents[task]:
if self.dependencies[dependent].issubset(completed):
schedule(dependent)
for task in self.tasks:
if not self.dependencies[task]:
schedule(task)
executor.shutdown(wait=True)
def main() -> None:
wf = ConcurrentWorkflow()
wf.add_task("A", lambda: print("A"))
wf.add_task("B", lambda: print("B"))
wf.add_task("C", lambda: print("C"))
wf.add_task("D", lambda: print("D"))
wf.add_edge("A", "B")
wf.add_edge("A", "C")
wf.add_edge("B", "D")
wf.add_edge("C", "D")
wf.run()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced workflow layer persists execution state so runs can resume after a crash and keeps history for auditing. We add retry budgets and compensation handlers to roll back or remediate partial work. Learners see how durable state and careful bookkeeping turn an in-memory runner into an operations-friendly system.
import json
import os
import random
import time
from typing import Callable, Dict
class StateStore:
def __init__(self, path: str) -> None:
self.path = path
if not os.path.exists(path):
self.save({})
def load(self) -> Dict[str, str]:
with open(self.path, "r", encoding="utf-8") as handle:
return json.load(handle)
def save(self, state: Dict[str, str]) -> None:
with open(self.path, "w", encoding="utf-8") as handle:
json.dump(state, handle)
class SagaStep:
def __init__(self, execute: Callable[[], None], compensate: Callable[[], None]) -> None:
self.execute = execute
self.compensate = compensate
class ResilientSaga:
def __init__(self, steps: Dict[str, SagaStep], store: StateStore) -> None:
self.steps = steps
self.store = store
self.state = store.load()
def run(self) -> None:
for name, step in self.steps.items():
status = self.state.get(name)
if status == "done":
continue
attempt = 0
while attempt < 3:
try:
step.execute()
self.state[name] = "done"
self.store.save(self.state)
break
except Exception as exc:
attempt += 1
if attempt >= 3:
self.state[name] = "failed"
self.store.save(self.state)
step.compensate()
raise
time.sleep(0.2 * attempt)
def main() -> None:
random.seed(17)
def flaky_step():
if random.random() < 0.5:
raise RuntimeError("flaky")
print("Executed flaky step")
steps = {
"reserve": SagaStep(lambda: print("Reserved inventory"), lambda: print("Released inventory")),
"pay": SagaStep(flaky_step, lambda: print("Refunded payment")),
"notify": SagaStep(lambda: print("Notification sent"), lambda: print("Notification compensating")),
}
saga = ResilientSaga(steps, StateStore("/tmp/workflow-state.json"))
try:
saga.run()
except Exception as exc:
print("Saga ended with failure:", exc)
if __name__ == "__main__":
main()
Progressively enhance the feed aggregation system from simple RSS pulling to concurrent pipelines and resilient aggregation with fallbacks.
Feed Aggregator Stack ├─ Level 1: RSS Puller ├─ Level 2: Concurrent Fetch Pipeline └─ Level 3: Resilient Aggregation Service
Level 1 — Core Implementation
The feed exercises start with normalising posts into append-only timelines and merging them efficiently for each user. We focus on clear storage abstractions and deterministic ordering so pagination becomes predictable. Sample data shows how following relationships translate into the final feed without caching yet.
import heapq
from typing import Dict, List, Optional, Tuple
class FeedService:
def __init__(self, follows: Dict[str, List[str]], posts: Dict[str, List[Tuple[int, str]]]) -> None:
self.follows = follows
self.posts = posts
def feed(self, user: str) -> List[Tuple[int, str]]:
heap = []
for followee in self.follows.get(user, []):
for timestamp, content in self.posts.get(followee, []):
heapq.heappush(heap, (-timestamp, content))
return [(-ts, content) for ts, content in heapq.nsmallest(len(heap), heap)]
def main() -> None:
follows = {"alice": ["bob", "carol"]}
posts = {
"bob": [(3, "bob post 1"), (1, "bob post 0")],
"carol": [(2, "carol post")],
}
service = FeedService(follows, posts)
print("Feed:", service.feed("alice"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 introduces fan-out-on-write where publishing a post updates follower caches immediately while a background refresher keeps them warm. We discuss cache invalidation, TTLs, and fallbacks so stale entries are minimised. Observing cache hit logs highlights the trade-offs between write amplification and read latency.
import threading
import time
from collections import defaultdict, deque
from typing import Deque, Dict, List, Tuple
class FanoutFeed:
def __init__(self, follows: Dict[str, List[str]]) -> None:
self.follows = follows
self.cache: Dict[str, Deque[Tuple[int, str]]] = defaultdict(deque)
self.lock = threading.Lock()
def publish(self, author: str, content: str, timestamp: int) -> None:
followers = [user for user, followees in self.follows.items() if author in followees]
for follower in followers:
with self.lock:
self.cache[follower].appendleft((timestamp, content))
while len(self.cache[follower]) > 10:
self.cache[follower].pop()
def timeline(self, user: str) -> List[Tuple[int, str]]:
with self.lock:
return list(self.cache[user])
def main() -> None:
follows = {"alice": ["bob"], "eve": ["bob", "alice"]}
feed = FanoutFeed(follows)
feed.publish("bob", "hello world", 3)
feed.publish("alice", "hi there", 4)
print("Alice timeline:", feed.timeline("alice"))
print("Eve timeline:", feed.timeline("eve"))
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced feed level assumes outages, so requests hit caches first, fall back to stale-but-safe copies, and enqueue refresh jobs once storage recovers. We surface degradation warnings to operators and include reconciliation steps to heal caches afterwards. Learners understand how maintaining a good user experience often means transparently serving older data with clear telemetry.
import random
import time
from typing import Dict, List, Optional, Tuple
class PrimaryFeedStore:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
self.feeds: Dict[str, List[Tuple[int, str]]] = {}
def get(self, user: str) -> List[Tuple[int, str]]:
if random.random() < self.failure_rate:
raise RuntimeError("feed store down")
return self.feeds.get(user, [])
class ResilientFeedService:
def __init__(self, primary: PrimaryFeedStore) -> None:
self.primary = primary
self.cache: Dict[str, List[Tuple[int, str]]] = {}
self.stale: Dict[str, List[Tuple[int, str]]] = {}
def feed(self, user: str) -> List[Tuple[int, str]]:
try:
feed = self.primary.get(user)
self.cache[user] = feed
self.stale[user] = feed
return feed
except Exception as exc:
print("Primary failed, serving stale feed:", exc)
return self.cache.get(user) or self.stale.get(user, [])
def main() -> None:
random.seed(19)
store = PrimaryFeedStore(0.6)
store.feeds["alice"] = [(5, "new post"), (3, "older post")]
service = ResilientFeedService(store)
for _ in range(4):
print("Feed response:", service.feed("alice"))
time.sleep(0.1)
if __name__ == "__main__":
main()
Improve the metrics collector from a simple in-memory store to concurrent shards and a resilient pipeline with buffering/fallback.
Metrics Collector Stack ├─ Level 1: In-Memory Collector ├─ Level 2: Concurrent Sharded Collector └─ Level 3: Resilient Metrics Pipeline
Level 1 — Core Implementation
The metrics series starts with a minimal counter registry that tracks named metrics and exposes read-friendly snapshots. We design the API to be idempotent and safe for rapid increments, laying groundwork for richer aggregations. Sample interactions emphasise clarity of contract before concurrency or distribution is introduced.
from collections import defaultdict
from typing import Dict
class CounterService:
def __init__(self) -> None:
self.counters: Dict[str, int] = defaultdict(int)
def increment(self, name: str, value: int = 1) -> None:
self.counters[name] += value
def get(self, name: str) -> int:
return self.counters[name]
def main() -> None:
counters = CounterService()
counters.increment("requests")
counters.increment("requests", 2)
print("Requests:", counters.get("requests"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 tracks metrics over sliding windows, partitioning counts by timestamp buckets and updating them under locks. We prune stale buckets efficiently to keep memory bounded, even at high write rates. Threaded tests show how to trust rolling aggregations when events interleave across CPUs.
from __future__ import annotations
import threading
import time
from collections import deque
from typing import Deque, Tuple
class RollingCounter:
def __init__(self, window_seconds: int) -> None:
self.window = window_seconds
self.events: Deque[Tuple[float, int]] = deque()
self.lock = threading.Lock()
def increment(self, value: int = 1) -> None:
now = time.time()
with self.lock:
self.events.append((now, value))
self._trim(now)
def total(self) -> int:
now = time.time()
with self.lock:
self._trim(now)
return sum(value for _, value in self.events)
def _trim(self, now: float) -> None:
while self.events and now - self.events[0][0] > self.window:
self.events.popleft()
def worker(counter: RollingCounter) -> None:
for _ in range(5):
counter.increment()
time.sleep(0.05)
def main() -> None:
counter = RollingCounter(1)
threads = [threading.Thread(target=worker, args=(counter,)) for _ in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Rolling total:", counter.total())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced level models many edge collectors pushing to central storage, so we batch payloads, retry with backoff, and spill to disk if the network is unreachable. We annotate batches with metadata so reconciliations can deduplicate after recovery. Running the scenario demonstrates how resilient telemetry pipelines keep observability intact.
import json
import os
import random
import time
from typing import Dict, List, Optional
class CentralTransport:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def ship(self, batch: List[Dict[str, int]]) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("transport failure")
print("Shipped batch:", batch)
class FallbackDisk:
def __init__(self, path: str) -> None:
self.path = path
def persist(self, batch: List[Dict[str, int]]) -> None:
with open(self.path, "a", encoding="utf-8") as handle:
handle.write(json.dumps(batch) + "\n")
class ResilientAggregator:
def __init__(self, transport: CentralTransport, fallback: FallbackDisk, batch_size: int = 3) -> None:
self.transport = transport
self.fallback = fallback
self.batch_size = batch_size
self.buffer: List[Dict[str, int]] = []
def record(self, metrics: Dict[str, int]) -> None:
self.buffer.append(metrics)
if len(self.buffer) >= self.batch_size:
self._flush()
def _flush(self) -> None:
batch = self.buffer[: self.batch_size]
try:
self.transport.ship(batch)
except Exception as exc:
print("Ship failed:", exc, "persisting locally")
self.fallback.persist(batch)
finally:
self.buffer = self.buffer[self.batch_size :]
def main() -> None:
random.seed(20)
aggregator = ResilientAggregator(CentralTransport(0.5), FallbackDisk("/tmp/metrics.log"))
for idx in range(7):
aggregator.record({"requests": idx})
aggregator._flush()
if __name__ == "__main__":
main()
Build the document collaboration platform from basic storage to concurrent sessions and resilient synchronization.
Doc Collaboration Stack ├─ Level 1: Core Document Store ├─ Level 2: Concurrent Editing Sessions └─ Level 3: Resilient Collaboration Service
Level 1 — Core Implementation
The collaboration series begins with an operational log that applies inserts and deletes in order, emphasising deterministic replay. We track cursor positions and authorship so concurrent editing feels tangible even before conflicts arise. Practitioners get comfortable with representing text updates as operations rather than raw buffers.
from dataclasses import dataclass
from typing import List
@dataclass
class Operation:
kind: str
index: int
payload: str = ""
class Document:
def __init__(self) -> None:
self.text = ""
self.log: List[Operation] = []
def insert(self, index: int, payload: str) -> None:
self.text = self.text[:index] + payload + self.text[index:]
self.log.append(Operation("insert", index, payload))
def delete(self, index: int) -> None:
self.text = self.text[:index] + self.text[index + 1 :]
self.log.append(Operation("delete", index))
def replay(self) -> str:
text = ""
for op in self.log:
if op.kind == "insert":
text = text[:op.index] + op.payload + text[op.index:]
elif op.kind == "delete":
text = text[:op.index] + text[op.index + 1 :]
return text
def main() -> None:
doc = Document()
doc.insert(0, "H")
doc.insert(1, "i")
doc.insert(2, "!")
doc.delete(2)
print("Text:", doc.text)
print("Replay:", doc.replay())
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 introduces a character-wise CRDT so edits from different clients merge without central coordination. We explain how identifiers and tombstones guarantee convergence and highlight trade-offs in memory usage. The demo shows identical final documents regardless of edit ordering, grounding the theory in practice.
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Dict, Tuple
@dataclass
class CharacterVersion:
char: str
clock: Tuple[str, int]
tombstone: bool = False
class CRDTDocument:
def __init__(self) -> None:
self.characters: Dict[int, CharacterVersion] = {}
self.lock = threading.Lock()
def apply(self, position: int, char: str, clock: Tuple[str, int]) -> None:
with self.lock:
existing = self.characters.get(position)
if not existing or clock > existing.clock:
self.characters[position] = CharacterVersion(char, clock)
def delete(self, position: int, clock: Tuple[str, int]) -> None:
with self.lock:
existing = self.characters.get(position)
if not existing or clock >= existing.clock:
self.characters[position] = CharacterVersion("", clock, tombstone=True)
def materialize(self) -> str:
with self.lock:
return "".join(
version.char for pos, version in sorted(self.characters.items()) if not version.tombstone
)
def worker(doc: CRDTDocument, author: str, edits: Tuple[int, str]) -> None:
pos, char = edits
doc.apply(pos, char, (author, pos))
def main() -> None:
doc = CRDTDocument()
threads = [
threading.Thread(target=worker, args=(doc, "siteA", (0, "H"))),
threading.Thread(target=worker, args=(doc, "siteB", (1, "i"))),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Merged doc:", doc.materialize())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced level accepts network partitions, buffering operations locally and replaying them with proper causal ordering once connectivity returns. We persist outbound queues to disk so power loss does not destroy unsynced work. Learners observe how acknowledgements and retries maintain a consistent shared document despite offline stretches.
import random
import time
from dataclasses import dataclass
from queue import Queue, Empty
@dataclass
class EditOp:
op_id: str
position: int
payload: str
class RemoteStore:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
self.buffer = ""
def apply(self, op: EditOp) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("network error")
self.buffer = self.buffer[: op.position] + op.payload + self.buffer[op.position :]
print("Remote applied", op.op_id, "->", self.buffer)
class OfflineClient:
def __init__(self, store: RemoteStore) -> None:
self.store = store
self.queue: "Queue[EditOp]" = Queue()
def edit(self, op: EditOp) -> None:
print("Queued", op)
self.queue.put(op)
def sync(self) -> None:
pending = []
while True:
try:
op = self.queue.get_nowait()
except Empty:
break
try:
self.store.apply(op)
except Exception as exc:
print("Sync failed for", op.op_id, exc)
pending.append(op)
finally:
self.queue.task_done()
for op in pending:
self.queue.put(op)
def main() -> None:
random.seed(21)
store = RemoteStore(0.5)
client = OfflineClient(store)
client.edit(EditOp("op1", 0, "Hi"))
client.edit(EditOp("op2", 2, "!"))
for _ in range(5):
client.sync()
time.sleep(0.1)
if __name__ == "__main__":
main()
Progressively enhance the rule engine from core evaluation to concurrent processing and a resilient rules service.
Rule Engine Stack ├─ Level 1: Core Rule Evaluator ├─ Level 2: Concurrent Rule Processing └─ Level 3: Resilient Rules Service
Level 1 — Core Implementation
We launch the rule-engine series by modelling conditions and actions as first-class objects and executing them against sample payloads. Evaluation contexts expose facts and collect side effects, making it straightforward to test rules in isolation. The walkthrough explains how to debug rule ordering and short-circuit behaviour.
from dataclasses import dataclass
from typing import Callable, Dict, List
@dataclass
class Rule:
name: str
condition: Callable[[Dict], bool]
action: Callable[[Dict], None]
class RuleEngine:
def __init__(self) -> None:
self.rules: List[Rule] = []
def add_rule(self, rule: Rule) -> None:
self.rules.append(rule)
def evaluate(self, context: Dict) -> None:
for rule in self.rules:
if rule.condition(context):
rule.action(context)
def main() -> None:
engine = RuleEngine()
engine.add_rule(Rule("high-value", lambda ctx: ctx["amount"] > 100, lambda ctx: print("Flag high value")))
engine.add_rule(Rule("vip", lambda ctx: ctx["customer"] == "vip", lambda ctx: print("Apply vip perks")))
engine.evaluate({"amount": 150, "customer": "vip"})
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 enables hot reloading by guarding rule sets with reader-writer locks so evaluations keep using a consistent snapshot. We stage updates, swap them atomically, and expose version metadata so operators know which rules executed. The exercise showcases how to mix responsiveness with safety when configuration evolves live.
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Callable, Dict, List
@dataclass
class Rule:
name: str
condition: Callable[[Dict], bool]
action: Callable[[Dict], None]
class HotReloadEngine:
def __init__(self) -> None:
self.rules: List[Rule] = []
self.lock = threading.RLock()
def evaluate(self, context: Dict) -> None:
with self.lock:
for rule in self.rules:
if rule.condition(context):
rule.action(context)
def reload(self, rules: List[Rule]) -> None:
with self.lock:
self.rules = rules
def evaluator(engine: HotReloadEngine, context: Dict) -> None:
engine.evaluate(context)
def main() -> None:
engine = HotReloadEngine()
engine.reload([Rule("gt100", lambda ctx: ctx["amount"] > 100, lambda ctx: print("Large order"))])
threads = [
threading.Thread(target=evaluator, args=(engine, {"amount": 150})),
threading.Thread(target=evaluator, args=(engine, {"amount": 80})),
]
for thread in threads:
thread.start()
engine.reload([Rule("gt50", lambda ctx: ctx["amount"] > 50, lambda ctx: print("Order > 50"))])
for thread in threads:
thread.join()
engine.evaluate({"amount": 60})
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced rule engine treats rule definitions as remote resources, layering retries, backoff, and local caching around fetches. We log degradation events and refresh successes to keep operations transparent. Learners see how to maintain deterministic behaviour even while rule sources flicker.
import random
import time
from dataclasses import dataclass
from typing import Callable, Dict, List
@dataclass
class Rule:
name: str
condition: Callable[[Dict], bool]
action: Callable[[Dict], None]
class RemoteRuleSource:
def __init__(self, failure_rate: float = 0.5) -> None:
self.failure_rate = failure_rate
def fetch(self) -> List[Rule]:
if random.random() < self.failure_rate:
raise RuntimeError("rule source down")
return [
Rule("gt200", lambda ctx: ctx["amount"] > 200, lambda ctx: print("Audit order")),
]
class ResilientRuleEngine:
def __init__(self, source: RemoteRuleSource) -> None:
self.source = source
self.cache: List[Rule] = []
def refresh(self) -> None:
try:
rules = self.source.fetch()
self.cache = rules
print("Rule cache refreshed")
except Exception as exc:
print("Rule refresh failed:", exc)
def evaluate(self, context: Dict) -> None:
for rule in self.cache:
if rule.condition(context):
rule.action(context)
def main() -> None:
random.seed(22)
engine = ResilientRuleEngine(RemoteRuleSource(0.6))
for _ in range(4):
engine.refresh()
engine.evaluate({"amount": 250})
time.sleep(0.2)
if __name__ == "__main__":
main()
Grow the leaderboard functionality from single-threaded ranking to concurrent updates and resilient aggregation with caches.
Leaderboard Stack ├─ Level 1: Core Leaderboard ├─ Level 2: Concurrent Leaderboard Updates └─ Level 3: Resilient Leaderboard Service
Level 1 — Core Implementation
The leaderboard journey begins with a balanced tree or heap to maintain ordering while updates arrive. We keep player profiles separate from ranking logic so lookups stay cheap and expressive. Sample commands demonstrate how updates rebalance positions instantly.
import heapq
from typing import Dict, List, Optional, Tuple
class Leaderboard:
def __init__(self) -> None:
self.scores: Dict[str, int] = {}
def update(self, player: str, score: int) -> None:
self.scores[player] = score
def top(self, k: int) -> List[Tuple[int, str]]:
return heapq.nlargest(k, [(score, player) for player, score in self.scores.items()])
def main() -> None:
lb = Leaderboard()
lb.update("alice", 100)
lb.update("bob", 90)
lb.update("carol", 120)
print("Top2:", lb.top(2))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 handles rapid writes by sharding state and protecting each shard with its own lock, improving throughput. We add score decay jobs to simulate games where ranking should drift over time, highlighting maintenance tasks. Threaded stress tests make it clear how to spot hotspots and tune shard counts.
from __future__ import annotations
import threading
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
class ShardedLeaderboard:
def __init__(self, shards: int = 4) -> None:
self.shards = shards
self.segment_scores: Dict[int, Dict[str, int]] = defaultdict(dict)
self.locks: Dict[int, threading.Lock] = defaultdict(threading.Lock)
def _shard(self, player: str) -> int:
return hash(player) % self.shards
def update(self, player: str, delta: int) -> None:
shard = self._shard(player)
with self.locks[shard]:
self.segment_scores[shard][player] = self.segment_scores[shard].get(player, 0) + delta
def snapshot(self) -> Dict[str, int]:
combined: Dict[str, int] = {}
for shard, scores in self.segment_scores.items():
with self.locks[shard]:
for player, score in scores.items():
combined[player] = combined.get(player, 0) + score
return combined
def worker(lb: ShardedLeaderboard, player: str) -> None:
for _ in range(10):
lb.update(player, 1)
def main() -> None:
lb = ShardedLeaderboard()
threads = [threading.Thread(target=worker, args=(lb, f"player{i}")) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Snapshot:", lb.snapshot())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced level layers a distributed cache over a durable store so reads stay fast while writes remain authoritative. Cache misses fall back to the store, and successful reads repopulate the cache, all while instrumentation reports hit rates. Learners gain intuition for balancing latency and correctness in highly read-heavy workloads.
import random
from typing import Dict, List, Optional, Tuple
class ShardedLeaderboard:
def __init__(self, shards: int = 4) -> None:
self.shards = shards
self.segment_scores: Dict[int, Dict[str, int]] = defaultdict(dict)
self.locks: Dict[int, threading.Lock] = defaultdict(threading.Lock)
def _shard(self, player: str) -> int:
return hash(player) % self.shards
def update(self, player: str, delta: int) -> None:
shard = self._shard(player)
with self.locks[shard]:
self.segment_scores[shard][player] = self.segment_scores[shard].get(player, 0) + delta
def snapshot(self) -> Dict[str, int]:
combined: Dict[str, int] = {}
for shard, scores in self.segment_scores.items():
with self.locks[shard]:
for player, score in scores.items():
combined[player] = combined.get(player, 0) + score
return combined
class CacheLayer:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
self.cache: Dict[str, List[Tuple[int, str]]] = {}
def get(self, key: str) -> List[Tuple[int, str]]:
if random.random() < self.failure_rate:
raise RuntimeError("cache down")
return self.cache.get(key, [])
def set(self, key: str, value: List[Tuple[int, str]]) -> None:
self.cache[key] = value
class ResilientLeaderboard:
def __init__(self, store: ShardedLeaderboard, cache: CacheLayer) -> None:
self.store = store
self.cache = cache
def update(self, player: str, score: int) -> None:
self.store.update(player, score)
def top(self, k: int) -> List[Tuple[int, str]]:
key = f"top:{k}"
try:
result = self.cache.get(key)
if result:
return result
except Exception as exc:
print("Cache miss due to", exc)
scores = self.store.snapshot()
result = heapq.nlargest(k, [(score, player) for player, score in scores.items()])
try:
self.cache.set(key, result)
except Exception as exc:
print("Cache set failed:", exc)
return result
def main() -> None:
random.seed(23)
store = ShardedLeaderboard()
cache = CacheLayer(0.5)
lb = ResilientLeaderboard(store, cache)
lb.update("alice", 120)
lb.update("bob", 110)
lb.update("carol", 140)
for _ in range(3):
print("Top:", lb.top(2))
if __name__ == "__main__":
main()
Progressively build the matchmaking system from simple queues to concurrent matchmakers and resilient session allocation.
Matchmaking Stack ├─ Level 1: Core Queue ├─ Level 2: Concurrent Matchmaker └─ Level 3: Resilient Match Service
Level 1 — Core Implementation
The matchmaking track starts by placing players into skill-bucketed queues and popping them in FIFO order to create fair matches. We define match objects and callbacks so downstream services can react to creation events. The sample session clarifies how queue discipline keeps latency and fairness predictable.
from collections import defaultdict, deque
from typing import Deque, Dict, List, Tuple
class Matchmaker:
def __init__(self) -> None:
self.buckets: Dict[int, Deque[str]] = defaultdict(deque)
def enqueue(self, player: str, skill: int) -> None:
bucket = skill // 100
self.buckets[bucket].append(player)
def match(self) -> List[Tuple[str, str]]:
matches = []
for queue in self.buckets.values():
while len(queue) >= 2:
matches.append((queue.popleft(), queue.popleft()))
return matches
def main() -> None:
mm = Matchmaker()
mm.enqueue("alice", 120)
mm.enqueue("bob", 130)
mm.enqueue("carol", 205)
mm.enqueue("dave", 215)
print("Matches:", mm.match())
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Level 2 introduces background workers that drain queues while multiple threads enqueue players, relying on locks and conditions to stay consistent. We expose metrics for queue length and match throughput so you can see how load affects wait times. Running the concurrent simulation demonstrates how to keep shared queues healthy under pressure.
from __future__ import annotations
import threading
import time
from collections import defaultdict, deque
from typing import Deque, Dict
class ConcurrentMatchmaker:
def __init__(self) -> None:
self.buckets: Dict[int, Deque[str]] = defaultdict(deque)
self.locks: Dict[int, threading.Lock] = defaultdict(threading.Lock)
self.stop = False
threading.Thread(target=self._loop, daemon=True).start()
def enqueue(self, player: str, skill: int) -> None:
bucket = skill // 100
lock = self.locks[bucket]
with lock:
self.buckets[bucket].append(player)
def _loop(self) -> None:
while not self.stop:
for bucket, queue in list(self.buckets.items()):
lock = self.locks[bucket]
with lock:
while len(queue) >= 2:
p1 = queue.popleft()
p2 = queue.popleft()
print("Match:", p1, p2)
time.sleep(0.1)
def shutdown(self) -> None:
self.stop = True
def worker(mm: ConcurrentMatchmaker, player: str, skill: int) -> None:
mm.enqueue(player, skill)
def main() -> None:
mm = ConcurrentMatchmaker()
threads = [threading.Thread(target=worker, args=(mm, f"player{i}", 100 + i * 10)) for i in range(6)]
for t in threads:
t.start()
for t in threads:
t.join()
time.sleep(0.5)
mm.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
The advanced level guards the external session allocator with retry policies, backoff, and a fallback queue for matches awaiting resources. We persist pending matches so they survive restarts and surface alarms when retries exhaust. Learners watch how graceful degradation keeps player experience stable even while dependencies recover.
import random
import time
from collections import deque
from typing import Deque, Tuple
class SessionAllocator:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def allocate(self, players: Tuple[str, str]) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("allocation failed")
print("Session allocated for", players)
class ResilientMatchmaker:
def __init__(self, allocator: SessionAllocator) -> None:
self.allocator = allocator
self.pending: Deque[Tuple[str, str]] = deque()
def submit_match(self, players: Tuple[str, str]) -> None:
self.pending.append(players)
self._process()
def _process(self) -> None:
retries = []
while self.pending:
players = self.pending.popleft()
try:
self.allocator.allocate(players)
except Exception as exc:
print("Allocation failed:", exc)
retries.append(players)
for players in retries:
self.pending.append(players)
time.sleep(0.1)
def main() -> None:
random.seed(24)
matchmaker = ResilientMatchmaker(SessionAllocator(0.5))
matches = [("alice", "bob"), ("carol", "dave"), ("eve", "frank")]
for match in matches:
matchmaker.submit_match(match)
matchmaker._process()
if __name__ == "__main__":
main()